YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/backend/access/spgist/spgxlog.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * spgxlog.c
4
 *    WAL replay logic for SP-GiST
5
 *
6
 *
7
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * IDENTIFICATION
11
 *       src/backend/access/spgist/spgxlog.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
#include "postgres.h"
16
17
#include "access/bufmask.h"
18
#include "access/spgist_private.h"
19
#include "access/spgxlog.h"
20
#include "access/transam.h"
21
#include "access/xlog.h"
22
#include "access/xlogutils.h"
23
#include "storage/standby.h"
24
#include "utils/memutils.h"
25
26
27
static MemoryContext opCtx;   /* working memory for operations */
28
29
30
/*
31
 * Prepare a dummy SpGistState, with just the minimum info needed for replay.
32
 *
33
 * At present, all we need is enough info to support spgFormDeadTuple(),
34
 * plus the isBuild flag.
35
 */
36
static void
37
fillFakeState(SpGistState *state, spgxlogState stateSrc)
38
0
{
39
0
  memset(state, 0, sizeof(*state));
40
41
0
  state->myXid = stateSrc.myXid;
42
0
  state->isBuild = stateSrc.isBuild;
43
0
  state->deadTupleStorage = palloc0(SGDTSIZE);
44
0
}
45
46
/*
47
 * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
48
 * to replay SpGistPageAddNewItem() operations.  If the offset points at an
49
 * existing tuple, it had better be a placeholder tuple.
50
 */
51
static void
52
addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
53
0
{
54
0
  if (offset <= PageGetMaxOffsetNumber(page))
55
0
  {
56
0
    SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
57
0
                               PageGetItemId(page, offset));
58
59
0
    if (dt->tupstate != SPGIST_PLACEHOLDER)
60
0
      elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
61
62
0
    Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
63
0
    SpGistPageGetOpaque(page)->nPlaceholder--;
64
65
0
    PageIndexTupleDelete(page, offset);
66
0
  }
67
68
0
  Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
69
70
0
  if (PageAddItem(page, tuple, size, offset, false, false) != offset)
71
0
    elog(ERROR, "failed to add item of size %u to SPGiST index page",
72
0
       size);
73
0
}
74
75
static void
76
spgRedoCreateIndex(XLogReaderState *record)
77
0
{
78
0
  XLogRecPtr  lsn = record->EndRecPtr;
79
0
  Buffer    buffer;
80
0
  Page    page;
81
82
0
  buffer = XLogInitBufferForRedo(record, 0);
83
0
  Assert(BufferGetBlockNumber(buffer) == SPGIST_METAPAGE_BLKNO);
84
0
  page = (Page) BufferGetPage(buffer);
85
0
  SpGistInitMetapage(page);
86
0
  PageSetLSN(page, lsn);
87
0
  MarkBufferDirty(buffer);
88
0
  UnlockReleaseBuffer(buffer);
89
90
0
  buffer = XLogInitBufferForRedo(record, 1);
91
0
  Assert(BufferGetBlockNumber(buffer) == SPGIST_ROOT_BLKNO);
92
0
  SpGistInitBuffer(buffer, SPGIST_LEAF);
93
0
  page = (Page) BufferGetPage(buffer);
94
0
  PageSetLSN(page, lsn);
95
0
  MarkBufferDirty(buffer);
96
0
  UnlockReleaseBuffer(buffer);
97
98
0
  buffer = XLogInitBufferForRedo(record, 2);
99
0
  Assert(BufferGetBlockNumber(buffer) == SPGIST_NULL_BLKNO);
100
0
  SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
101
0
  page = (Page) BufferGetPage(buffer);
102
0
  PageSetLSN(page, lsn);
103
0
  MarkBufferDirty(buffer);
104
0
  UnlockReleaseBuffer(buffer);
105
0
}
106
107
static void
108
spgRedoAddLeaf(XLogReaderState *record)
109
0
{
110
0
  XLogRecPtr  lsn = record->EndRecPtr;
111
0
  char     *ptr = XLogRecGetData(record);
112
0
  spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
113
0
  char     *leafTuple;
114
0
  SpGistLeafTupleData leafTupleHdr;
115
0
  Buffer    buffer;
116
0
  Page    page;
117
0
  XLogRedoAction action;
118
119
0
  ptr += sizeof(spgxlogAddLeaf);
120
0
  leafTuple = ptr;
121
  /* the leaf tuple is unaligned, so make a copy to access its header */
122
0
  memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
123
124
  /*
125
   * In normal operation we would have both current and parent pages locked
126
   * simultaneously; but in WAL replay it should be safe to update the leaf
127
   * page before updating the parent.
128
   */
129
0
  if (xldata->newPage)
130
0
  {
131
0
    buffer = XLogInitBufferForRedo(record, 0);
132
0
    SpGistInitBuffer(buffer,
133
0
             SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
134
0
    action = BLK_NEEDS_REDO;
135
0
  }
136
0
  else
137
0
    action = XLogReadBufferForRedo(record, 0, &buffer);
138
139
0
  if (action == BLK_NEEDS_REDO)
140
0
  {
141
0
    page = BufferGetPage(buffer);
142
143
    /* insert new tuple */
144
0
    if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
145
0
    {
146
      /* normal cases, tuple was added by SpGistPageAddNewItem */
147
0
      addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
148
0
                xldata->offnumLeaf);
149
150
      /* update head tuple's chain link if needed */
151
0
      if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
152
0
      {
153
0
        SpGistLeafTuple head;
154
155
0
        head = (SpGistLeafTuple) PageGetItem(page,
156
0
                           PageGetItemId(page, xldata->offnumHeadLeaf));
157
0
        Assert(head->nextOffset == leafTupleHdr.nextOffset);
158
0
        head->nextOffset = xldata->offnumLeaf;
159
0
      }
160
0
    }
161
0
    else
162
0
    {
163
      /* replacing a DEAD tuple */
164
0
      PageIndexTupleDelete(page, xldata->offnumLeaf);
165
0
      if (PageAddItem(page,
166
0
              (Item) leafTuple, leafTupleHdr.size,
167
0
              xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
168
0
        elog(ERROR, "failed to add item of size %u to SPGiST index page",
169
0
           leafTupleHdr.size);
170
0
    }
171
172
0
    PageSetLSN(page, lsn);
173
0
    MarkBufferDirty(buffer);
174
0
  }
175
0
  if (BufferIsValid(buffer))
176
0
    UnlockReleaseBuffer(buffer);
177
178
  /* update parent downlink if necessary */
179
0
  if (xldata->offnumParent != InvalidOffsetNumber)
180
0
  {
181
0
    if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
182
0
    {
183
0
      SpGistInnerTuple tuple;
184
0
      BlockNumber blknoLeaf;
185
186
0
      XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
187
188
0
      page = BufferGetPage(buffer);
189
190
0
      tuple = (SpGistInnerTuple) PageGetItem(page,
191
0
                           PageGetItemId(page, xldata->offnumParent));
192
193
0
      spgUpdateNodeLink(tuple, xldata->nodeI,
194
0
                blknoLeaf, xldata->offnumLeaf);
195
196
0
      PageSetLSN(page, lsn);
197
0
      MarkBufferDirty(buffer);
198
0
    }
199
0
    if (BufferIsValid(buffer))
200
0
      UnlockReleaseBuffer(buffer);
201
0
  }
202
0
}
203
204
static void
205
spgRedoMoveLeafs(XLogReaderState *record)
206
0
{
207
0
  XLogRecPtr  lsn = record->EndRecPtr;
208
0
  char     *ptr = XLogRecGetData(record);
209
0
  spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
210
0
  SpGistState state;
211
0
  OffsetNumber *toDelete;
212
0
  OffsetNumber *toInsert;
213
0
  int     nInsert;
214
0
  Buffer    buffer;
215
0
  Page    page;
216
0
  XLogRedoAction action;
217
0
  BlockNumber blknoDst;
218
219
0
  XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
220
221
0
  fillFakeState(&state, xldata->stateSrc);
222
223
0
  nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
224
225
0
  ptr += SizeOfSpgxlogMoveLeafs;
226
0
  toDelete = (OffsetNumber *) ptr;
227
0
  ptr += sizeof(OffsetNumber) * xldata->nMoves;
228
0
  toInsert = (OffsetNumber *) ptr;
229
0
  ptr += sizeof(OffsetNumber) * nInsert;
230
231
  /* now ptr points to the list of leaf tuples */
232
233
  /*
234
   * In normal operation we would have all three pages (source, dest, and
235
   * parent) locked simultaneously; but in WAL replay it should be safe to
236
   * update them one at a time, as long as we do it in the right order.
237
   */
238
239
  /* Insert tuples on the dest page (do first, so redirect is valid) */
240
0
  if (xldata->newPage)
241
0
  {
242
0
    buffer = XLogInitBufferForRedo(record, 1);
243
0
    SpGistInitBuffer(buffer,
244
0
             SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
245
0
    action = BLK_NEEDS_REDO;
246
0
  }
247
0
  else
248
0
    action = XLogReadBufferForRedo(record, 1, &buffer);
249
250
0
  if (action == BLK_NEEDS_REDO)
251
0
  {
252
0
    int     i;
253
254
0
    page = BufferGetPage(buffer);
255
256
0
    for (i = 0; i < nInsert; i++)
257
0
    {
258
0
      char     *leafTuple;
259
0
      SpGistLeafTupleData leafTupleHdr;
260
261
      /*
262
       * the tuples are not aligned, so must copy to access the size
263
       * field.
264
       */
265
0
      leafTuple = ptr;
266
0
      memcpy(&leafTupleHdr, leafTuple,
267
0
           sizeof(SpGistLeafTupleData));
268
269
0
      addOrReplaceTuple(page, (Item) leafTuple,
270
0
                leafTupleHdr.size, toInsert[i]);
271
0
      ptr += leafTupleHdr.size;
272
0
    }
273
274
0
    PageSetLSN(page, lsn);
275
0
    MarkBufferDirty(buffer);
276
0
  }
277
0
  if (BufferIsValid(buffer))
278
0
    UnlockReleaseBuffer(buffer);
279
280
  /* Delete tuples from the source page, inserting a redirection pointer */
281
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
282
0
  {
283
0
    page = BufferGetPage(buffer);
284
285
0
    spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
286
0
                state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
287
0
                SPGIST_PLACEHOLDER,
288
0
                blknoDst,
289
0
                toInsert[nInsert - 1]);
290
291
0
    PageSetLSN(page, lsn);
292
0
    MarkBufferDirty(buffer);
293
0
  }
294
0
  if (BufferIsValid(buffer))
295
0
    UnlockReleaseBuffer(buffer);
296
297
  /* And update the parent downlink */
298
0
  if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
299
0
  {
300
0
    SpGistInnerTuple tuple;
301
302
0
    page = BufferGetPage(buffer);
303
304
0
    tuple = (SpGistInnerTuple) PageGetItem(page,
305
0
                         PageGetItemId(page, xldata->offnumParent));
306
307
0
    spgUpdateNodeLink(tuple, xldata->nodeI,
308
0
              blknoDst, toInsert[nInsert - 1]);
309
310
0
    PageSetLSN(page, lsn);
311
0
    MarkBufferDirty(buffer);
312
0
  }
313
0
  if (BufferIsValid(buffer))
314
0
    UnlockReleaseBuffer(buffer);
315
0
}
316
317
static void
318
spgRedoAddNode(XLogReaderState *record)
319
0
{
320
0
  XLogRecPtr  lsn = record->EndRecPtr;
321
0
  char     *ptr = XLogRecGetData(record);
322
0
  spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
323
0
  char     *innerTuple;
324
0
  SpGistInnerTupleData innerTupleHdr;
325
0
  SpGistState state;
326
0
  Buffer    buffer;
327
0
  Page    page;
328
0
  XLogRedoAction action;
329
330
0
  ptr += sizeof(spgxlogAddNode);
331
0
  innerTuple = ptr;
332
  /* the tuple is unaligned, so make a copy to access its header */
333
0
  memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
334
335
0
  fillFakeState(&state, xldata->stateSrc);
336
337
0
  if (!XLogRecHasBlockRef(record, 1))
338
0
  {
339
    /* update in place */
340
0
    Assert(xldata->parentBlk == -1);
341
0
    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
342
0
    {
343
0
      page = BufferGetPage(buffer);
344
345
0
      PageIndexTupleDelete(page, xldata->offnum);
346
0
      if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
347
0
              xldata->offnum,
348
0
              false, false) != xldata->offnum)
349
0
        elog(ERROR, "failed to add item of size %u to SPGiST index page",
350
0
           innerTupleHdr.size);
351
352
0
      PageSetLSN(page, lsn);
353
0
      MarkBufferDirty(buffer);
354
0
    }
355
0
    if (BufferIsValid(buffer))
356
0
      UnlockReleaseBuffer(buffer);
357
0
  }
358
0
  else
359
0
  {
360
0
    BlockNumber blkno;
361
0
    BlockNumber blknoNew;
362
363
0
    XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
364
0
    XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
365
366
    /*
367
     * In normal operation we would have all three pages (source, dest,
368
     * and parent) locked simultaneously; but in WAL replay it should be
369
     * safe to update them one at a time, as long as we do it in the right
370
     * order. We must insert the new tuple before replacing the old tuple
371
     * with the redirect tuple.
372
     */
373
374
    /* Install new tuple first so redirect is valid */
375
0
    if (xldata->newPage)
376
0
    {
377
      /* AddNode is not used for nulls pages */
378
0
      buffer = XLogInitBufferForRedo(record, 1);
379
0
      SpGistInitBuffer(buffer, 0);
380
0
      action = BLK_NEEDS_REDO;
381
0
    }
382
0
    else
383
0
      action = XLogReadBufferForRedo(record, 1, &buffer);
384
0
    if (action == BLK_NEEDS_REDO)
385
0
    {
386
0
      page = BufferGetPage(buffer);
387
388
0
      addOrReplaceTuple(page, (Item) innerTuple,
389
0
                innerTupleHdr.size, xldata->offnumNew);
390
391
      /*
392
       * If parent is in this same page, update it now.
393
       */
394
0
      if (xldata->parentBlk == 1)
395
0
      {
396
0
        SpGistInnerTuple parentTuple;
397
398
0
        parentTuple = (SpGistInnerTuple) PageGetItem(page,
399
0
                               PageGetItemId(page, xldata->offnumParent));
400
401
0
        spgUpdateNodeLink(parentTuple, xldata->nodeI,
402
0
                  blknoNew, xldata->offnumNew);
403
0
      }
404
0
      PageSetLSN(page, lsn);
405
0
      MarkBufferDirty(buffer);
406
0
    }
407
0
    if (BufferIsValid(buffer))
408
0
      UnlockReleaseBuffer(buffer);
409
410
    /* Delete old tuple, replacing it with redirect or placeholder tuple */
411
0
    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
412
0
    {
413
0
      SpGistDeadTuple dt;
414
415
0
      page = BufferGetPage(buffer);
416
417
0
      if (state.isBuild)
418
0
        dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
419
0
                    InvalidBlockNumber,
420
0
                    InvalidOffsetNumber);
421
0
      else
422
0
        dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
423
0
                    blknoNew,
424
0
                    xldata->offnumNew);
425
426
0
      PageIndexTupleDelete(page, xldata->offnum);
427
0
      if (PageAddItem(page, (Item) dt, dt->size,
428
0
              xldata->offnum,
429
0
              false, false) != xldata->offnum)
430
0
        elog(ERROR, "failed to add item of size %u to SPGiST index page",
431
0
           dt->size);
432
433
0
      if (state.isBuild)
434
0
        SpGistPageGetOpaque(page)->nPlaceholder++;
435
0
      else
436
0
        SpGistPageGetOpaque(page)->nRedirection++;
437
438
      /*
439
       * If parent is in this same page, update it now.
440
       */
441
0
      if (xldata->parentBlk == 0)
442
0
      {
443
0
        SpGistInnerTuple parentTuple;
444
445
0
        parentTuple = (SpGistInnerTuple) PageGetItem(page,
446
0
                               PageGetItemId(page, xldata->offnumParent));
447
448
0
        spgUpdateNodeLink(parentTuple, xldata->nodeI,
449
0
                  blknoNew, xldata->offnumNew);
450
0
      }
451
0
      PageSetLSN(page, lsn);
452
0
      MarkBufferDirty(buffer);
453
0
    }
454
0
    if (BufferIsValid(buffer))
455
0
      UnlockReleaseBuffer(buffer);
456
457
    /*
458
     * Update parent downlink (if we didn't do it as part of the source or
459
     * destination page update already).
460
     */
461
0
    if (xldata->parentBlk == 2)
462
0
    {
463
0
      if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
464
0
      {
465
0
        SpGistInnerTuple parentTuple;
466
467
0
        page = BufferGetPage(buffer);
468
469
0
        parentTuple = (SpGistInnerTuple) PageGetItem(page,
470
0
                               PageGetItemId(page, xldata->offnumParent));
471
472
0
        spgUpdateNodeLink(parentTuple, xldata->nodeI,
473
0
                  blknoNew, xldata->offnumNew);
474
475
0
        PageSetLSN(page, lsn);
476
0
        MarkBufferDirty(buffer);
477
0
      }
478
0
      if (BufferIsValid(buffer))
479
0
        UnlockReleaseBuffer(buffer);
480
0
    }
481
0
  }
482
0
}
483
484
static void
485
spgRedoSplitTuple(XLogReaderState *record)
486
0
{
487
0
  XLogRecPtr  lsn = record->EndRecPtr;
488
0
  char     *ptr = XLogRecGetData(record);
489
0
  spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
490
0
  char     *prefixTuple;
491
0
  SpGistInnerTupleData prefixTupleHdr;
492
0
  char     *postfixTuple;
493
0
  SpGistInnerTupleData postfixTupleHdr;
494
0
  Buffer    buffer;
495
0
  Page    page;
496
0
  XLogRedoAction action;
497
498
0
  ptr += sizeof(spgxlogSplitTuple);
499
0
  prefixTuple = ptr;
500
  /* the prefix tuple is unaligned, so make a copy to access its header */
501
0
  memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
502
0
  ptr += prefixTupleHdr.size;
503
0
  postfixTuple = ptr;
504
  /* postfix tuple is also unaligned */
505
0
  memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
506
507
  /*
508
   * In normal operation we would have both pages locked simultaneously; but
509
   * in WAL replay it should be safe to update them one at a time, as long
510
   * as we do it in the right order.
511
   */
512
513
  /* insert postfix tuple first to avoid dangling link */
514
0
  if (!xldata->postfixBlkSame)
515
0
  {
516
0
    if (xldata->newPage)
517
0
    {
518
0
      buffer = XLogInitBufferForRedo(record, 1);
519
      /* SplitTuple is not used for nulls pages */
520
0
      SpGistInitBuffer(buffer, 0);
521
0
      action = BLK_NEEDS_REDO;
522
0
    }
523
0
    else
524
0
      action = XLogReadBufferForRedo(record, 1, &buffer);
525
0
    if (action == BLK_NEEDS_REDO)
526
0
    {
527
0
      page = BufferGetPage(buffer);
528
529
0
      addOrReplaceTuple(page, (Item) postfixTuple,
530
0
                postfixTupleHdr.size, xldata->offnumPostfix);
531
532
0
      PageSetLSN(page, lsn);
533
0
      MarkBufferDirty(buffer);
534
0
    }
535
0
    if (BufferIsValid(buffer))
536
0
      UnlockReleaseBuffer(buffer);
537
0
  }
538
539
  /* now handle the original page */
540
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
541
0
  {
542
0
    page = BufferGetPage(buffer);
543
544
0
    PageIndexTupleDelete(page, xldata->offnumPrefix);
545
0
    if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
546
0
            xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
547
0
      elog(ERROR, "failed to add item of size %u to SPGiST index page",
548
0
         prefixTupleHdr.size);
549
550
0
    if (xldata->postfixBlkSame)
551
0
      addOrReplaceTuple(page, (Item) postfixTuple,
552
0
                postfixTupleHdr.size,
553
0
                xldata->offnumPostfix);
554
555
0
    PageSetLSN(page, lsn);
556
0
    MarkBufferDirty(buffer);
557
0
  }
558
0
  if (BufferIsValid(buffer))
559
0
    UnlockReleaseBuffer(buffer);
560
0
}
561
562
static void
563
spgRedoPickSplit(XLogReaderState *record)
564
0
{
565
0
  XLogRecPtr  lsn = record->EndRecPtr;
566
0
  char     *ptr = XLogRecGetData(record);
567
0
  spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
568
0
  char     *innerTuple;
569
0
  SpGistInnerTupleData innerTupleHdr;
570
0
  SpGistState state;
571
0
  OffsetNumber *toDelete;
572
0
  OffsetNumber *toInsert;
573
0
  uint8    *leafPageSelect;
574
0
  Buffer    srcBuffer;
575
0
  Buffer    destBuffer;
576
0
  Buffer    innerBuffer;
577
0
  Page    srcPage;
578
0
  Page    destPage;
579
0
  Page    page;
580
0
  int     i;
581
0
  BlockNumber blknoInner;
582
0
  XLogRedoAction action;
583
584
0
  XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
585
586
0
  fillFakeState(&state, xldata->stateSrc);
587
588
0
  ptr += SizeOfSpgxlogPickSplit;
589
0
  toDelete = (OffsetNumber *) ptr;
590
0
  ptr += sizeof(OffsetNumber) * xldata->nDelete;
591
0
  toInsert = (OffsetNumber *) ptr;
592
0
  ptr += sizeof(OffsetNumber) * xldata->nInsert;
593
0
  leafPageSelect = (uint8 *) ptr;
594
0
  ptr += sizeof(uint8) * xldata->nInsert;
595
596
0
  innerTuple = ptr;
597
  /* the inner tuple is unaligned, so make a copy to access its header */
598
0
  memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
599
0
  ptr += innerTupleHdr.size;
600
601
  /* now ptr points to the list of leaf tuples */
602
603
0
  if (xldata->isRootSplit)
604
0
  {
605
    /* when splitting root, we touch it only in the guise of new inner */
606
0
    srcBuffer = InvalidBuffer;
607
0
    srcPage = NULL;
608
0
  }
609
0
  else if (xldata->initSrc)
610
0
  {
611
    /* just re-init the source page */
612
0
    srcBuffer = XLogInitBufferForRedo(record, 0);
613
0
    srcPage = (Page) BufferGetPage(srcBuffer);
614
615
0
    SpGistInitBuffer(srcBuffer,
616
0
             SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
617
    /* don't update LSN etc till we're done with it */
618
0
  }
619
0
  else
620
0
  {
621
    /*
622
     * Delete the specified tuples from source page.  (In case we're in
623
     * Hot Standby, we need to hold lock on the page till we're done
624
     * inserting leaf tuples and the new inner tuple, else the added
625
     * redirect tuple will be a dangling link.)
626
     */
627
0
    srcPage = NULL;
628
0
    if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
629
0
    {
630
0
      srcPage = BufferGetPage(srcBuffer);
631
632
      /*
633
       * We have it a bit easier here than in doPickSplit(), because we
634
       * know the inner tuple's location already, so we can inject the
635
       * correct redirection tuple now.
636
       */
637
0
      if (!state.isBuild)
638
0
        spgPageIndexMultiDelete(&state, srcPage,
639
0
                    toDelete, xldata->nDelete,
640
0
                    SPGIST_REDIRECT,
641
0
                    SPGIST_PLACEHOLDER,
642
0
                    blknoInner,
643
0
                    xldata->offnumInner);
644
0
      else
645
0
        spgPageIndexMultiDelete(&state, srcPage,
646
0
                    toDelete, xldata->nDelete,
647
0
                    SPGIST_PLACEHOLDER,
648
0
                    SPGIST_PLACEHOLDER,
649
0
                    InvalidBlockNumber,
650
0
                    InvalidOffsetNumber);
651
652
      /* don't update LSN etc till we're done with it */
653
0
    }
654
0
  }
655
656
  /* try to access dest page if any */
657
0
  if (!XLogRecHasBlockRef(record, 1))
658
0
  {
659
0
    destBuffer = InvalidBuffer;
660
0
    destPage = NULL;
661
0
  }
662
0
  else if (xldata->initDest)
663
0
  {
664
    /* just re-init the dest page */
665
0
    destBuffer = XLogInitBufferForRedo(record, 1);
666
0
    destPage = (Page) BufferGetPage(destBuffer);
667
668
0
    SpGistInitBuffer(destBuffer,
669
0
             SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
670
    /* don't update LSN etc till we're done with it */
671
0
  }
672
0
  else
673
0
  {
674
    /*
675
     * We could probably release the page lock immediately in the
676
     * full-page-image case, but for safety let's hold it till later.
677
     */
678
0
    if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
679
0
      destPage = (Page) BufferGetPage(destBuffer);
680
0
    else
681
0
      destPage = NULL; /* don't do any page updates */
682
0
  }
683
684
  /* restore leaf tuples to src and/or dest page */
685
0
  for (i = 0; i < xldata->nInsert; i++)
686
0
  {
687
0
    char     *leafTuple;
688
0
    SpGistLeafTupleData leafTupleHdr;
689
690
    /* the tuples are not aligned, so must copy to access the size field. */
691
0
    leafTuple = ptr;
692
0
    memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
693
0
    ptr += leafTupleHdr.size;
694
695
0
    page = leafPageSelect[i] ? destPage : srcPage;
696
0
    if (page == NULL)
697
0
      continue;     /* no need to touch this page */
698
699
0
    addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
700
0
              toInsert[i]);
701
0
  }
702
703
  /* Now update src and dest page LSNs if needed */
704
0
  if (srcPage != NULL)
705
0
  {
706
0
    PageSetLSN(srcPage, lsn);
707
0
    MarkBufferDirty(srcBuffer);
708
0
  }
709
0
  if (destPage != NULL)
710
0
  {
711
0
    PageSetLSN(destPage, lsn);
712
0
    MarkBufferDirty(destBuffer);
713
0
  }
714
715
  /* restore new inner tuple */
716
0
  if (xldata->initInner)
717
0
  {
718
0
    innerBuffer = XLogInitBufferForRedo(record, 2);
719
0
    SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
720
0
    action = BLK_NEEDS_REDO;
721
0
  }
722
0
  else
723
0
    action = XLogReadBufferForRedo(record, 2, &innerBuffer);
724
725
0
  if (action == BLK_NEEDS_REDO)
726
0
  {
727
0
    page = BufferGetPage(innerBuffer);
728
729
0
    addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
730
0
              xldata->offnumInner);
731
732
    /* if inner is also parent, update link while we're here */
733
0
    if (xldata->innerIsParent)
734
0
    {
735
0
      SpGistInnerTuple parent;
736
737
0
      parent = (SpGistInnerTuple) PageGetItem(page,
738
0
                          PageGetItemId(page, xldata->offnumParent));
739
0
      spgUpdateNodeLink(parent, xldata->nodeI,
740
0
                blknoInner, xldata->offnumInner);
741
0
    }
742
743
0
    PageSetLSN(page, lsn);
744
0
    MarkBufferDirty(innerBuffer);
745
0
  }
746
0
  if (BufferIsValid(innerBuffer))
747
0
    UnlockReleaseBuffer(innerBuffer);
748
749
  /*
750
   * Now we can release the leaf-page locks.  It's okay to do this before
751
   * updating the parent downlink.
752
   */
753
0
  if (BufferIsValid(srcBuffer))
754
0
    UnlockReleaseBuffer(srcBuffer);
755
0
  if (BufferIsValid(destBuffer))
756
0
    UnlockReleaseBuffer(destBuffer);
757
758
  /* update parent downlink, unless we did it above */
759
0
  if (XLogRecHasBlockRef(record, 3))
760
0
  {
761
0
    Buffer    parentBuffer;
762
763
0
    if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
764
0
    {
765
0
      SpGistInnerTuple parent;
766
767
0
      page = BufferGetPage(parentBuffer);
768
769
0
      parent = (SpGistInnerTuple) PageGetItem(page,
770
0
                          PageGetItemId(page, xldata->offnumParent));
771
0
      spgUpdateNodeLink(parent, xldata->nodeI,
772
0
                blknoInner, xldata->offnumInner);
773
774
0
      PageSetLSN(page, lsn);
775
0
      MarkBufferDirty(parentBuffer);
776
0
    }
777
0
    if (BufferIsValid(parentBuffer))
778
0
      UnlockReleaseBuffer(parentBuffer);
779
0
  }
780
0
  else
781
0
    Assert(xldata->innerIsParent || xldata->isRootSplit);
782
0
}
783
784
static void
785
spgRedoVacuumLeaf(XLogReaderState *record)
786
0
{
787
0
  XLogRecPtr  lsn = record->EndRecPtr;
788
0
  char     *ptr = XLogRecGetData(record);
789
0
  spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
790
0
  OffsetNumber *toDead;
791
0
  OffsetNumber *toPlaceholder;
792
0
  OffsetNumber *moveSrc;
793
0
  OffsetNumber *moveDest;
794
0
  OffsetNumber *chainSrc;
795
0
  OffsetNumber *chainDest;
796
0
  SpGistState state;
797
0
  Buffer    buffer;
798
0
  Page    page;
799
0
  int     i;
800
801
0
  fillFakeState(&state, xldata->stateSrc);
802
803
0
  ptr += SizeOfSpgxlogVacuumLeaf;
804
0
  toDead = (OffsetNumber *) ptr;
805
0
  ptr += sizeof(OffsetNumber) * xldata->nDead;
806
0
  toPlaceholder = (OffsetNumber *) ptr;
807
0
  ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
808
0
  moveSrc = (OffsetNumber *) ptr;
809
0
  ptr += sizeof(OffsetNumber) * xldata->nMove;
810
0
  moveDest = (OffsetNumber *) ptr;
811
0
  ptr += sizeof(OffsetNumber) * xldata->nMove;
812
0
  chainSrc = (OffsetNumber *) ptr;
813
0
  ptr += sizeof(OffsetNumber) * xldata->nChain;
814
0
  chainDest = (OffsetNumber *) ptr;
815
816
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
817
0
  {
818
0
    page = BufferGetPage(buffer);
819
820
0
    spgPageIndexMultiDelete(&state, page,
821
0
                toDead, xldata->nDead,
822
0
                SPGIST_DEAD, SPGIST_DEAD,
823
0
                InvalidBlockNumber,
824
0
                InvalidOffsetNumber);
825
826
0
    spgPageIndexMultiDelete(&state, page,
827
0
                toPlaceholder, xldata->nPlaceholder,
828
0
                SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
829
0
                InvalidBlockNumber,
830
0
                InvalidOffsetNumber);
831
832
    /* see comments in vacuumLeafPage() */
833
0
    for (i = 0; i < xldata->nMove; i++)
834
0
    {
835
0
      ItemId    idSrc = PageGetItemId(page, moveSrc[i]);
836
0
      ItemId    idDest = PageGetItemId(page, moveDest[i]);
837
0
      ItemIdData  tmp;
838
839
0
      tmp = *idSrc;
840
0
      *idSrc = *idDest;
841
0
      *idDest = tmp;
842
0
    }
843
844
0
    spgPageIndexMultiDelete(&state, page,
845
0
                moveSrc, xldata->nMove,
846
0
                SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
847
0
                InvalidBlockNumber,
848
0
                InvalidOffsetNumber);
849
850
0
    for (i = 0; i < xldata->nChain; i++)
851
0
    {
852
0
      SpGistLeafTuple lt;
853
854
0
      lt = (SpGistLeafTuple) PageGetItem(page,
855
0
                         PageGetItemId(page, chainSrc[i]));
856
0
      Assert(lt->tupstate == SPGIST_LIVE);
857
0
      lt->nextOffset = chainDest[i];
858
0
    }
859
860
0
    PageSetLSN(page, lsn);
861
0
    MarkBufferDirty(buffer);
862
0
  }
863
0
  if (BufferIsValid(buffer))
864
0
    UnlockReleaseBuffer(buffer);
865
0
}
866
867
static void
868
spgRedoVacuumRoot(XLogReaderState *record)
869
0
{
870
0
  XLogRecPtr  lsn = record->EndRecPtr;
871
0
  char     *ptr = XLogRecGetData(record);
872
0
  spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
873
0
  OffsetNumber *toDelete;
874
0
  Buffer    buffer;
875
0
  Page    page;
876
877
0
  toDelete = xldata->offsets;
878
879
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
880
0
  {
881
0
    page = BufferGetPage(buffer);
882
883
    /* The tuple numbers are in order */
884
0
    PageIndexMultiDelete(page, toDelete, xldata->nDelete);
885
886
0
    PageSetLSN(page, lsn);
887
0
    MarkBufferDirty(buffer);
888
0
  }
889
0
  if (BufferIsValid(buffer))
890
0
    UnlockReleaseBuffer(buffer);
891
0
}
892
893
static void
894
spgRedoVacuumRedirect(XLogReaderState *record)
895
0
{
896
0
  XLogRecPtr  lsn = record->EndRecPtr;
897
0
  char     *ptr = XLogRecGetData(record);
898
0
  spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
899
0
  OffsetNumber *itemToPlaceholder;
900
0
  Buffer    buffer;
901
902
0
  itemToPlaceholder = xldata->offsets;
903
904
  /*
905
   * If any redirection tuples are being removed, make sure there are no
906
   * live Hot Standby transactions that might need to see them.
907
   */
908
0
  if (InHotStandby)
909
0
  {
910
0
    if (TransactionIdIsValid(xldata->newestRedirectXid))
911
0
    {
912
0
      RelFileNode node;
913
914
0
      XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
915
0
      ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
916
0
                        node);
917
0
    }
918
0
  }
919
920
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
921
0
  {
922
0
    Page    page = BufferGetPage(buffer);
923
0
    SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
924
0
    int     i;
925
926
    /* Convert redirect pointers to plain placeholders */
927
0
    for (i = 0; i < xldata->nToPlaceholder; i++)
928
0
    {
929
0
      SpGistDeadTuple dt;
930
931
0
      dt = (SpGistDeadTuple) PageGetItem(page,
932
0
                         PageGetItemId(page, itemToPlaceholder[i]));
933
0
      Assert(dt->tupstate == SPGIST_REDIRECT);
934
0
      dt->tupstate = SPGIST_PLACEHOLDER;
935
0
      ItemPointerSetInvalid(&dt->pointer);
936
0
    }
937
938
0
    Assert(opaque->nRedirection >= xldata->nToPlaceholder);
939
0
    opaque->nRedirection -= xldata->nToPlaceholder;
940
0
    opaque->nPlaceholder += xldata->nToPlaceholder;
941
942
    /* Remove placeholder tuples at end of page */
943
0
    if (xldata->firstPlaceholder != InvalidOffsetNumber)
944
0
    {
945
0
      int     max = PageGetMaxOffsetNumber(page);
946
0
      OffsetNumber *toDelete;
947
948
0
      toDelete = palloc(sizeof(OffsetNumber) * max);
949
950
0
      for (i = xldata->firstPlaceholder; i <= max; i++)
951
0
        toDelete[i - xldata->firstPlaceholder] = i;
952
953
0
      i = max - xldata->firstPlaceholder + 1;
954
0
      Assert(opaque->nPlaceholder >= i);
955
0
      opaque->nPlaceholder -= i;
956
957
      /* The array is sorted, so can use PageIndexMultiDelete */
958
0
      PageIndexMultiDelete(page, toDelete, i);
959
960
0
      pfree(toDelete);
961
0
    }
962
963
0
    PageSetLSN(page, lsn);
964
0
    MarkBufferDirty(buffer);
965
0
  }
966
0
  if (BufferIsValid(buffer))
967
0
    UnlockReleaseBuffer(buffer);
968
0
}
969
970
void
971
spg_redo(XLogReaderState *record)
972
0
{
973
0
  uint8   info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
974
0
  MemoryContext oldCxt;
975
976
0
  oldCxt = MemoryContextSwitchTo(opCtx);
977
0
  switch (info)
978
0
  {
979
0
    case XLOG_SPGIST_CREATE_INDEX:
980
0
      spgRedoCreateIndex(record);
981
0
      break;
982
0
    case XLOG_SPGIST_ADD_LEAF:
983
0
      spgRedoAddLeaf(record);
984
0
      break;
985
0
    case XLOG_SPGIST_MOVE_LEAFS:
986
0
      spgRedoMoveLeafs(record);
987
0
      break;
988
0
    case XLOG_SPGIST_ADD_NODE:
989
0
      spgRedoAddNode(record);
990
0
      break;
991
0
    case XLOG_SPGIST_SPLIT_TUPLE:
992
0
      spgRedoSplitTuple(record);
993
0
      break;
994
0
    case XLOG_SPGIST_PICKSPLIT:
995
0
      spgRedoPickSplit(record);
996
0
      break;
997
0
    case XLOG_SPGIST_VACUUM_LEAF:
998
0
      spgRedoVacuumLeaf(record);
999
0
      break;
1000
0
    case XLOG_SPGIST_VACUUM_ROOT:
1001
0
      spgRedoVacuumRoot(record);
1002
0
      break;
1003
0
    case XLOG_SPGIST_VACUUM_REDIRECT:
1004
0
      spgRedoVacuumRedirect(record);
1005
0
      break;
1006
0
    default:
1007
0
      elog(PANIC, "spg_redo: unknown op code %u", info);
1008
0
  }
1009
1010
0
  MemoryContextSwitchTo(oldCxt);
1011
0
  MemoryContextReset(opCtx);
1012
0
}
1013
1014
void
1015
spg_xlog_startup(void)
1016
0
{
1017
0
  opCtx = AllocSetContextCreate(GetCurrentMemoryContext(),
1018
0
                  "SP-GiST temporary context",
1019
0
                  ALLOCSET_DEFAULT_SIZES);
1020
0
}
1021
1022
void
1023
spg_xlog_cleanup(void)
1024
0
{
1025
0
  MemoryContextDelete(opCtx);
1026
0
  opCtx = NULL;
1027
0
}
1028
1029
/*
1030
 * Mask a SpGist page before performing consistency checks on it.
1031
 */
1032
void
1033
spg_mask(char *pagedata, BlockNumber blkno)
1034
0
{
1035
0
  Page    page = (Page) pagedata;
1036
0
  PageHeader  pagehdr = (PageHeader) page;
1037
1038
0
  mask_page_lsn_and_checksum(page);
1039
1040
0
  mask_page_hint_bits(page);
1041
1042
  /*
1043
   * Mask the unused space, but only if the page's pd_lower appears to have
1044
   * been set correctly.
1045
   */
1046
0
  if (pagehdr->pd_lower > SizeOfPageHeaderData)
1047
0
    mask_unused_space(page);
1048
0
}