YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/backend/access/spgist/spgdoinsert.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * spgdoinsert.c
4
 *    implementation of insert algorithm
5
 *
6
 *
7
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * IDENTIFICATION
11
 *      src/backend/access/spgist/spgdoinsert.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
16
#include "postgres.h"
17
18
#include "access/genam.h"
19
#include "access/spgist_private.h"
20
#include "access/spgxlog.h"
21
#include "access/xloginsert.h"
22
#include "miscadmin.h"
23
#include "storage/bufmgr.h"
24
#include "utils/rel.h"
25
26
27
/*
28
 * SPPageDesc tracks all info about a page we are inserting into.  In some
29
 * situations it actually identifies a tuple, or even a specific node within
30
 * an inner tuple.  But any of the fields can be invalid.  If the buffer
31
 * field is valid, it implies we hold pin and exclusive lock on that buffer.
32
 * page pointer should be valid exactly when buffer is.
33
 */
34
typedef struct SPPageDesc
35
{
36
  BlockNumber blkno;      /* block number, or InvalidBlockNumber */
37
  Buffer    buffer;     /* page's buffer number, or InvalidBuffer */
38
  Page    page;     /* pointer to page buffer, or NULL */
39
  OffsetNumber offnum;    /* offset of tuple, or InvalidOffsetNumber */
40
  int     node;     /* node number within inner tuple, or -1 */
41
} SPPageDesc;
42
43
44
/*
45
 * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
46
 * is used to update the parent inner tuple's downlink after a move or
47
 * split operation.
48
 */
49
void
50
spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
51
          BlockNumber blkno, OffsetNumber offset)
52
0
{
53
0
  int     i;
54
0
  SpGistNodeTuple node;
55
56
0
  SGITITERATE(tup, i, node)
57
0
  {
58
0
    if (i == nodeN)
59
0
    {
60
0
      ItemPointerSet(&node->t_tid, blkno, offset);
61
0
      return;
62
0
    }
63
0
  }
64
65
0
  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
66
0
     nodeN);
67
0
}
68
69
/*
70
 * Form a new inner tuple containing one more node than the given one, with
71
 * the specified label datum, inserted at offset "offset" in the node array.
72
 * The new tuple's prefix is the same as the old one's.
73
 *
74
 * Note that the new node initially has an invalid downlink.  We'll find a
75
 * page to point it to later.
76
 */
77
static SpGistInnerTuple
78
addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
79
0
{
80
0
  SpGistNodeTuple node,
81
0
         *nodes;
82
0
  int     i;
83
84
  /* if offset is negative, insert at end */
85
0
  if (offset < 0)
86
0
    offset = tuple->nNodes;
87
0
  else if (offset > tuple->nNodes)
88
0
    elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
89
90
0
  nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
91
0
  SGITITERATE(tuple, i, node)
92
0
  {
93
0
    if (i < offset)
94
0
      nodes[i] = node;
95
0
    else
96
0
      nodes[i + 1] = node;
97
0
  }
98
99
0
  nodes[offset] = spgFormNodeTuple(state, label, false);
100
101
0
  return spgFormInnerTuple(state,
102
0
               (tuple->prefixSize > 0),
103
0
               SGITDATUM(tuple, state),
104
0
               tuple->nNodes + 1,
105
0
               nodes);
106
0
}
107
108
/* qsort comparator for sorting OffsetNumbers */
109
static int
110
cmpOffsetNumbers(const void *a, const void *b)
111
0
{
112
0
  if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
113
0
    return 0;
114
0
  return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
115
0
}
116
117
/*
118
 * Delete multiple tuples from an index page, preserving tuple offset numbers.
119
 *
120
 * The first tuple in the given list is replaced with a dead tuple of type
121
 * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122
 * with dead tuples of type "reststate".  If either firststate or reststate
123
 * is REDIRECT, blkno/offnum specify where to link to.
124
 *
125
 * NB: this is used during WAL replay, so beware of trying to make it too
126
 * smart.  In particular, it shouldn't use "state" except for calling
127
 * spgFormDeadTuple().  This is also used in a critical section, so no
128
 * pallocs either!
129
 */
130
void
131
spgPageIndexMultiDelete(SpGistState *state, Page page,
132
            OffsetNumber *itemnos, int nitems,
133
            int firststate, int reststate,
134
            BlockNumber blkno, OffsetNumber offnum)
135
0
{
136
0
  OffsetNumber firstItem;
137
0
  OffsetNumber sortednos[MaxIndexTuplesPerPage];
138
0
  SpGistDeadTuple tuple = NULL;
139
0
  int     i;
140
141
0
  if (nitems == 0)
142
0
    return;         /* nothing to do */
143
144
  /*
145
   * For efficiency we want to use PageIndexMultiDelete, which requires the
146
   * targets to be listed in sorted order, so we have to sort the itemnos
147
   * array.  (This also greatly simplifies the math for reinserting the
148
   * replacement tuples.)  However, we must not scribble on the caller's
149
   * array, so we have to make a copy.
150
   */
151
0
  memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152
0
  if (nitems > 1)
153
0
    qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154
155
0
  PageIndexMultiDelete(page, sortednos, nitems);
156
157
0
  firstItem = itemnos[0];
158
159
0
  for (i = 0; i < nitems; i++)
160
0
  {
161
0
    OffsetNumber itemno = sortednos[i];
162
0
    int     tupstate;
163
164
0
    tupstate = (itemno == firstItem) ? firststate : reststate;
165
0
    if (tuple == NULL || tuple->tupstate != tupstate)
166
0
      tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167
168
0
    if (PageAddItem(page, (Item) tuple, tuple->size,
169
0
            itemno, false, false) != itemno)
170
0
      elog(ERROR, "failed to add item of size %u to SPGiST index page",
171
0
         tuple->size);
172
173
0
    if (tupstate == SPGIST_REDIRECT)
174
0
      SpGistPageGetOpaque(page)->nRedirection++;
175
0
    else if (tupstate == SPGIST_PLACEHOLDER)
176
0
      SpGistPageGetOpaque(page)->nPlaceholder++;
177
0
  }
178
0
}
179
180
/*
181
 * Update the parent inner tuple's downlink, and mark the parent buffer
182
 * dirty (this must be the last change to the parent page in the current
183
 * WAL action).
184
 */
185
static void
186
saveNodeLink(Relation index, SPPageDesc *parent,
187
       BlockNumber blkno, OffsetNumber offnum)
188
0
{
189
0
  SpGistInnerTuple innerTuple;
190
191
0
  innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192
0
                        PageGetItemId(parent->page, parent->offnum));
193
194
0
  spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195
196
0
  MarkBufferDirty(parent->buffer);
197
0
}
198
199
/*
200
 * Add a leaf tuple to a leaf page where there is known to be room for it
201
 */
202
static void
203
addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
204
       SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205
0
{
206
0
  spgxlogAddLeaf xlrec;
207
208
0
  xlrec.newPage = isNew;
209
0
  xlrec.storesNulls = isNulls;
210
211
  /* these will be filled below as needed */
212
0
  xlrec.offnumLeaf = InvalidOffsetNumber;
213
0
  xlrec.offnumHeadLeaf = InvalidOffsetNumber;
214
0
  xlrec.offnumParent = InvalidOffsetNumber;
215
0
  xlrec.nodeI = 0;
216
217
0
  START_CRIT_SECTION();
218
219
0
  if (current->offnum == InvalidOffsetNumber ||
220
0
    SpGistBlockIsRoot(current->blkno))
221
0
  {
222
    /* Tuple is not part of a chain */
223
0
    leafTuple->nextOffset = InvalidOffsetNumber;
224
0
    current->offnum = SpGistPageAddNewItem(state, current->page,
225
0
                         (Item) leafTuple, leafTuple->size,
226
0
                         NULL, false);
227
228
0
    xlrec.offnumLeaf = current->offnum;
229
230
    /* Must update parent's downlink if any */
231
0
    if (parent->buffer != InvalidBuffer)
232
0
    {
233
0
      xlrec.offnumParent = parent->offnum;
234
0
      xlrec.nodeI = parent->node;
235
236
0
      saveNodeLink(index, parent, current->blkno, current->offnum);
237
0
    }
238
0
  }
239
0
  else
240
0
  {
241
    /*
242
     * Tuple must be inserted into existing chain.  We mustn't change the
243
     * chain's head address, but we don't need to chase the entire chain
244
     * to put the tuple at the end; we can insert it second.
245
     *
246
     * Also, it's possible that the "chain" consists only of a DEAD tuple,
247
     * in which case we should replace the DEAD tuple in-place.
248
     */
249
0
    SpGistLeafTuple head;
250
0
    OffsetNumber offnum;
251
252
0
    head = (SpGistLeafTuple) PageGetItem(current->page,
253
0
                       PageGetItemId(current->page, current->offnum));
254
0
    if (head->tupstate == SPGIST_LIVE)
255
0
    {
256
0
      leafTuple->nextOffset = head->nextOffset;
257
0
      offnum = SpGistPageAddNewItem(state, current->page,
258
0
                      (Item) leafTuple, leafTuple->size,
259
0
                      NULL, false);
260
261
      /*
262
       * re-get head of list because it could have been moved on page,
263
       * and set new second element
264
       */
265
0
      head = (SpGistLeafTuple) PageGetItem(current->page,
266
0
                         PageGetItemId(current->page, current->offnum));
267
0
      head->nextOffset = offnum;
268
269
0
      xlrec.offnumLeaf = offnum;
270
0
      xlrec.offnumHeadLeaf = current->offnum;
271
0
    }
272
0
    else if (head->tupstate == SPGIST_DEAD)
273
0
    {
274
0
      leafTuple->nextOffset = InvalidOffsetNumber;
275
0
      PageIndexTupleDelete(current->page, current->offnum);
276
0
      if (PageAddItem(current->page,
277
0
              (Item) leafTuple, leafTuple->size,
278
0
              current->offnum, false, false) != current->offnum)
279
0
        elog(ERROR, "failed to add item of size %u to SPGiST index page",
280
0
           leafTuple->size);
281
282
      /* WAL replay distinguishes this case by equal offnums */
283
0
      xlrec.offnumLeaf = current->offnum;
284
0
      xlrec.offnumHeadLeaf = current->offnum;
285
0
    }
286
0
    else
287
0
      elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288
0
  }
289
290
0
  MarkBufferDirty(current->buffer);
291
292
0
  if (RelationNeedsWAL(index))
293
0
  {
294
0
    XLogRecPtr  recptr;
295
0
    int     flags;
296
297
0
    XLogBeginInsert();
298
0
    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299
0
    XLogRegisterData((char *) leafTuple, leafTuple->size);
300
301
0
    flags = REGBUF_STANDARD;
302
0
    if (xlrec.newPage)
303
0
      flags |= REGBUF_WILL_INIT;
304
0
    XLogRegisterBuffer(0, current->buffer, flags);
305
0
    if (xlrec.offnumParent != InvalidOffsetNumber)
306
0
      XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
307
308
0
    recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309
310
0
    PageSetLSN(current->page, recptr);
311
312
    /* update parent only if we actually changed it */
313
0
    if (xlrec.offnumParent != InvalidOffsetNumber)
314
0
    {
315
0
      PageSetLSN(parent->page, recptr);
316
0
    }
317
0
  }
318
319
0
  END_CRIT_SECTION();
320
0
}
321
322
/*
323
 * Count the number and total size of leaf tuples in the chain starting at
324
 * current->offnum.  Return number into *nToSplit and total size as function
325
 * result.
326
 *
327
 * Klugy special case when considering the root page (i.e., root is a leaf
328
 * page, but we're about to split for the first time): return fake large
329
 * values to force spgdoinsert() to take the doPickSplit rather than
330
 * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
331
 */
332
static int
333
checkSplitConditions(Relation index, SpGistState *state,
334
           SPPageDesc *current, int *nToSplit)
335
0
{
336
0
  int     i,
337
0
        n = 0,
338
0
        totalSize = 0;
339
340
0
  if (SpGistBlockIsRoot(current->blkno))
341
0
  {
342
    /* return impossible values to force split */
343
0
    *nToSplit = BLCKSZ;
344
0
    return BLCKSZ;
345
0
  }
346
347
0
  i = current->offnum;
348
0
  while (i != InvalidOffsetNumber)
349
0
  {
350
0
    SpGistLeafTuple it;
351
352
0
    Assert(i >= FirstOffsetNumber &&
353
0
         i <= PageGetMaxOffsetNumber(current->page));
354
0
    it = (SpGistLeafTuple) PageGetItem(current->page,
355
0
                       PageGetItemId(current->page, i));
356
0
    if (it->tupstate == SPGIST_LIVE)
357
0
    {
358
0
      n++;
359
0
      totalSize += it->size + sizeof(ItemIdData);
360
0
    }
361
0
    else if (it->tupstate == SPGIST_DEAD)
362
0
    {
363
      /* We could see a DEAD tuple as first/only chain item */
364
0
      Assert(i == current->offnum);
365
0
      Assert(it->nextOffset == InvalidOffsetNumber);
366
      /* Don't count it in result, because it won't go to other page */
367
0
    }
368
0
    else
369
0
      elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370
371
0
    i = it->nextOffset;
372
0
  }
373
374
0
  *nToSplit = n;
375
376
0
  return totalSize;
377
0
}
378
379
/*
380
 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381
 * but the chain has to be moved because there's not enough room to add
382
 * newLeafTuple to its page.  We use this method when the chain contains
383
 * very little data so a split would be inefficient.  We are sure we can
384
 * fit the chain plus newLeafTuple on one other page.
385
 */
386
static void
387
moveLeafs(Relation index, SpGistState *state,
388
      SPPageDesc *current, SPPageDesc *parent,
389
      SpGistLeafTuple newLeafTuple, bool isNulls)
390
0
{
391
0
  int     i,
392
0
        nDelete,
393
0
        nInsert,
394
0
        size;
395
0
  Buffer    nbuf;
396
0
  Page    npage;
397
0
  SpGistLeafTuple it;
398
0
  OffsetNumber r = InvalidOffsetNumber,
399
0
        startOffset = InvalidOffsetNumber;
400
0
  bool    replaceDead = false;
401
0
  OffsetNumber *toDelete;
402
0
  OffsetNumber *toInsert;
403
0
  BlockNumber nblkno;
404
0
  spgxlogMoveLeafs xlrec;
405
0
  char     *leafdata,
406
0
         *leafptr;
407
408
  /* This doesn't work on root page */
409
0
  Assert(parent->buffer != InvalidBuffer);
410
0
  Assert(parent->buffer != current->buffer);
411
412
  /* Locate the tuples to be moved, and count up the space needed */
413
0
  i = PageGetMaxOffsetNumber(current->page);
414
0
  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
415
0
  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
416
417
0
  size = newLeafTuple->size + sizeof(ItemIdData);
418
419
0
  nDelete = 0;
420
0
  i = current->offnum;
421
0
  while (i != InvalidOffsetNumber)
422
0
  {
423
0
    SpGistLeafTuple it;
424
425
0
    Assert(i >= FirstOffsetNumber &&
426
0
         i <= PageGetMaxOffsetNumber(current->page));
427
0
    it = (SpGistLeafTuple) PageGetItem(current->page,
428
0
                       PageGetItemId(current->page, i));
429
430
0
    if (it->tupstate == SPGIST_LIVE)
431
0
    {
432
0
      toDelete[nDelete] = i;
433
0
      size += it->size + sizeof(ItemIdData);
434
0
      nDelete++;
435
0
    }
436
0
    else if (it->tupstate == SPGIST_DEAD)
437
0
    {
438
      /* We could see a DEAD tuple as first/only chain item */
439
0
      Assert(i == current->offnum);
440
0
      Assert(it->nextOffset == InvalidOffsetNumber);
441
      /* We don't want to move it, so don't count it in size */
442
0
      toDelete[nDelete] = i;
443
0
      nDelete++;
444
0
      replaceDead = true;
445
0
    }
446
0
    else
447
0
      elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
448
449
0
    i = it->nextOffset;
450
0
  }
451
452
  /* Find a leaf page that will hold them */
453
0
  nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
454
0
               size, &xlrec.newPage);
455
0
  npage = BufferGetPage(nbuf);
456
0
  nblkno = BufferGetBlockNumber(nbuf);
457
0
  Assert(nblkno != current->blkno);
458
459
0
  leafdata = leafptr = palloc(size);
460
461
0
  START_CRIT_SECTION();
462
463
  /* copy all the old tuples to new page, unless they're dead */
464
0
  nInsert = 0;
465
0
  if (!replaceDead)
466
0
  {
467
0
    for (i = 0; i < nDelete; i++)
468
0
    {
469
0
      it = (SpGistLeafTuple) PageGetItem(current->page,
470
0
                         PageGetItemId(current->page, toDelete[i]));
471
0
      Assert(it->tupstate == SPGIST_LIVE);
472
473
      /*
474
       * Update chain link (notice the chain order gets reversed, but we
475
       * don't care).  We're modifying the tuple on the source page
476
       * here, but it's okay since we're about to delete it.
477
       */
478
0
      it->nextOffset = r;
479
480
0
      r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
481
0
                   &startOffset, false);
482
483
0
      toInsert[nInsert] = r;
484
0
      nInsert++;
485
486
      /* save modified tuple into leafdata as well */
487
0
      memcpy(leafptr, it, it->size);
488
0
      leafptr += it->size;
489
0
    }
490
0
  }
491
492
  /* add the new tuple as well */
493
0
  newLeafTuple->nextOffset = r;
494
0
  r = SpGistPageAddNewItem(state, npage,
495
0
               (Item) newLeafTuple, newLeafTuple->size,
496
0
               &startOffset, false);
497
0
  toInsert[nInsert] = r;
498
0
  nInsert++;
499
0
  memcpy(leafptr, newLeafTuple, newLeafTuple->size);
500
0
  leafptr += newLeafTuple->size;
501
502
  /*
503
   * Now delete the old tuples, leaving a redirection pointer behind for the
504
   * first one, unless we're doing an index build; in which case there can't
505
   * be any concurrent scan so we need not provide a redirect.
506
   */
507
0
  spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
508
0
              state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
509
0
              SPGIST_PLACEHOLDER,
510
0
              nblkno, r);
511
512
  /* Update parent's downlink and mark parent page dirty */
513
0
  saveNodeLink(index, parent, nblkno, r);
514
515
  /* Mark the leaf pages too */
516
0
  MarkBufferDirty(current->buffer);
517
0
  MarkBufferDirty(nbuf);
518
519
0
  if (RelationNeedsWAL(index))
520
0
  {
521
0
    XLogRecPtr  recptr;
522
523
    /* prepare WAL info */
524
0
    STORE_STATE(state, xlrec.stateSrc);
525
526
0
    xlrec.nMoves = nDelete;
527
0
    xlrec.replaceDead = replaceDead;
528
0
    xlrec.storesNulls = isNulls;
529
530
0
    xlrec.offnumParent = parent->offnum;
531
0
    xlrec.nodeI = parent->node;
532
533
0
    XLogBeginInsert();
534
0
    XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
535
0
    XLogRegisterData((char *) toDelete,
536
0
             sizeof(OffsetNumber) * nDelete);
537
0
    XLogRegisterData((char *) toInsert,
538
0
             sizeof(OffsetNumber) * nInsert);
539
0
    XLogRegisterData((char *) leafdata, leafptr - leafdata);
540
541
0
    XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
542
0
    XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
543
0
    XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
544
545
0
    recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
546
547
0
    PageSetLSN(current->page, recptr);
548
0
    PageSetLSN(npage, recptr);
549
0
    PageSetLSN(parent->page, recptr);
550
0
  }
551
552
0
  END_CRIT_SECTION();
553
554
  /* Update local free-space cache and release new buffer */
555
0
  SpGistSetLastUsedPage(index, nbuf);
556
0
  UnlockReleaseBuffer(nbuf);
557
0
}
558
559
/*
560
 * Update previously-created redirection tuple with appropriate destination
561
 *
562
 * We use this when it's not convenient to know the destination first.
563
 * The tuple should have been made with the "impossible" destination of
564
 * the metapage.
565
 */
566
static void
567
setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
568
          BlockNumber blkno, OffsetNumber offnum)
569
0
{
570
0
  SpGistDeadTuple dt;
571
572
0
  dt = (SpGistDeadTuple) PageGetItem(current->page,
573
0
                     PageGetItemId(current->page, position));
574
0
  Assert(dt->tupstate == SPGIST_REDIRECT);
575
0
  Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
576
0
  ItemPointerSet(&dt->pointer, blkno, offnum);
577
0
}
578
579
/*
580
 * Test to see if the user-defined picksplit function failed to do its job,
581
 * ie, it put all the leaf tuples into the same node.
582
 * If so, randomly divide the tuples into several nodes (all with the same
583
 * label) and return true to select allTheSame mode for this inner tuple.
584
 *
585
 * (This code is also used to forcibly select allTheSame mode for nulls.)
586
 *
587
 * If we know that the leaf tuples wouldn't all fit on one page, then we
588
 * exclude the last tuple (which is the incoming new tuple that forced a split)
589
 * from the check to see if more than one node is used.  The reason for this
590
 * is that if the existing tuples are put into only one chain, then even if
591
 * we move them all to an empty page, there would still not be room for the
592
 * new tuple, so we'd get into an infinite loop of picksplit attempts.
593
 * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
594
 * be split across pages.  (Exercise for the reader: figure out why this
595
 * fixes the problem even when there is only one old tuple.)
596
 */
597
static bool
598
checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
599
        bool *includeNew)
600
0
{
601
0
  int     theNode;
602
0
  int     limit;
603
0
  int     i;
604
605
  /* For the moment, assume we can include the new leaf tuple */
606
0
  *includeNew = true;
607
608
  /* If there's only the new leaf tuple, don't select allTheSame mode */
609
0
  if (in->nTuples <= 1)
610
0
    return false;
611
612
  /* If tuple set doesn't fit on one page, ignore the new tuple in test */
613
0
  limit = tooBig ? in->nTuples - 1 : in->nTuples;
614
615
  /* Check to see if more than one node is populated */
616
0
  theNode = out->mapTuplesToNodes[0];
617
0
  for (i = 1; i < limit; i++)
618
0
  {
619
0
    if (out->mapTuplesToNodes[i] != theNode)
620
0
      return false;
621
0
  }
622
623
  /* Nope, so override the picksplit function's decisions */
624
625
  /* If the new tuple is in its own node, it can't be included in split */
626
0
  if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
627
0
    *includeNew = false;
628
629
0
  out->nNodes = 8;      /* arbitrary number of child nodes */
630
631
  /* Random assignment of tuples to nodes (note we include new tuple) */
632
0
  for (i = 0; i < in->nTuples; i++)
633
0
    out->mapTuplesToNodes[i] = i % out->nNodes;
634
635
  /* The opclass may not use node labels, but if it does, duplicate 'em */
636
0
  if (out->nodeLabels)
637
0
  {
638
0
    Datum   theLabel = out->nodeLabels[theNode];
639
640
0
    out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
641
0
    for (i = 0; i < out->nNodes; i++)
642
0
      out->nodeLabels[i] = theLabel;
643
0
  }
644
645
  /* We don't touch the prefix or the leaf tuple datum assignments */
646
647
0
  return true;
648
0
}
649
650
/*
651
 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
652
 * but the chain has to be split because there's not enough room to add
653
 * newLeafTuple to its page.
654
 *
655
 * This function splits the leaf tuple set according to picksplit's rules,
656
 * creating one or more new chains that are spread across the current page
657
 * and an additional leaf page (we assume that two leaf pages will be
658
 * sufficient).  A new inner tuple is created, and the parent downlink
659
 * pointer is updated to point to that inner tuple instead of the leaf chain.
660
 *
661
 * On exit, current contains the address of the new inner tuple.
662
 *
663
 * Returns true if we successfully inserted newLeafTuple during this function,
664
 * false if caller still has to do it (meaning another picksplit operation is
665
 * probably needed).  Failure could occur if the picksplit result is fairly
666
 * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
667
 * Because we force the picksplit result to be at least two chains, each
668
 * cycle will get rid of at least one leaf tuple from the chain, so the loop
669
 * will eventually terminate if lack of balance is the issue.  If the tuple
670
 * is too big, we assume that repeated picksplit operations will eventually
671
 * make it small enough by repeated prefix-stripping.  A broken opclass could
672
 * make this an infinite loop, though.
673
 */
674
static bool
675
doPickSplit(Relation index, SpGistState *state,
676
      SPPageDesc *current, SPPageDesc *parent,
677
      SpGistLeafTuple newLeafTuple,
678
      int level, bool isNulls, bool isNew)
679
0
{
680
0
  bool    insertedNew = false;
681
0
  spgPickSplitIn in;
682
0
  spgPickSplitOut out;
683
0
  FmgrInfo   *procinfo;
684
0
  bool    includeNew;
685
0
  int     i,
686
0
        max,
687
0
        n;
688
0
  SpGistInnerTuple innerTuple;
689
0
  SpGistNodeTuple node,
690
0
         *nodes;
691
0
  Buffer    newInnerBuffer,
692
0
        newLeafBuffer;
693
0
  ItemPointerData *heapPtrs;
694
0
  uint8    *leafPageSelect;
695
0
  int      *leafSizes;
696
0
  OffsetNumber *toDelete;
697
0
  OffsetNumber *toInsert;
698
0
  OffsetNumber redirectTuplePos = InvalidOffsetNumber;
699
0
  OffsetNumber startOffsets[2];
700
0
  SpGistLeafTuple *newLeafs;
701
0
  int     spaceToDelete;
702
0
  int     currentFreeSpace;
703
0
  int     totalLeafSizes;
704
0
  bool    allTheSame;
705
0
  spgxlogPickSplit xlrec;
706
0
  char     *leafdata,
707
0
         *leafptr;
708
0
  SPPageDesc  saveCurrent;
709
0
  int     nToDelete,
710
0
        nToInsert,
711
0
        maxToInclude;
712
713
0
  in.level = level;
714
715
  /*
716
   * Allocate per-leaf-tuple work arrays with max possible size
717
   */
718
0
  max = PageGetMaxOffsetNumber(current->page);
719
0
  n = max + 1;
720
0
  in.datums = (Datum *) palloc(sizeof(Datum) * n);
721
0
  heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
722
0
  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
723
0
  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
724
0
  newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
725
0
  leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
726
727
0
  STORE_STATE(state, xlrec.stateSrc);
728
729
  /*
730
   * Form list of leaf tuples which will be distributed as split result;
731
   * also, count up the amount of space that will be freed from current.
732
   * (Note that in the non-root case, we won't actually delete the old
733
   * tuples, only replace them with redirects or placeholders.)
734
   *
735
   * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
736
   * page.  For a pass-by-value data type we will fetch a word that must
737
   * exist even though it may contain garbage (because of the fact that leaf
738
   * tuples must have size at least SGDTSIZE).  For a pass-by-reference type
739
   * we are just computing a pointer that isn't going to get dereferenced.
740
   * So it's not worth guarding the calls with isNulls checks.
741
   */
742
0
  nToInsert = 0;
743
0
  nToDelete = 0;
744
0
  spaceToDelete = 0;
745
0
  if (SpGistBlockIsRoot(current->blkno))
746
0
  {
747
    /*
748
     * We are splitting the root (which up to now is also a leaf page).
749
     * Its tuples are not linked, so scan sequentially to get them all. We
750
     * ignore the original value of current->offnum.
751
     */
752
0
    for (i = FirstOffsetNumber; i <= max; i++)
753
0
    {
754
0
      SpGistLeafTuple it;
755
756
0
      it = (SpGistLeafTuple) PageGetItem(current->page,
757
0
                         PageGetItemId(current->page, i));
758
0
      if (it->tupstate == SPGIST_LIVE)
759
0
      {
760
0
        in.datums[nToInsert] = SGLTDATUM(it, state);
761
0
        heapPtrs[nToInsert] = it->heapPtr;
762
0
        nToInsert++;
763
0
        toDelete[nToDelete] = i;
764
0
        nToDelete++;
765
        /* we will delete the tuple altogether, so count full space */
766
0
        spaceToDelete += it->size + sizeof(ItemIdData);
767
0
      }
768
0
      else        /* tuples on root should be live */
769
0
        elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
770
0
    }
771
0
  }
772
0
  else
773
0
  {
774
    /* Normal case, just collect the leaf tuples in the chain */
775
0
    i = current->offnum;
776
0
    while (i != InvalidOffsetNumber)
777
0
    {
778
0
      SpGistLeafTuple it;
779
780
0
      Assert(i >= FirstOffsetNumber && i <= max);
781
0
      it = (SpGistLeafTuple) PageGetItem(current->page,
782
0
                         PageGetItemId(current->page, i));
783
0
      if (it->tupstate == SPGIST_LIVE)
784
0
      {
785
0
        in.datums[nToInsert] = SGLTDATUM(it, state);
786
0
        heapPtrs[nToInsert] = it->heapPtr;
787
0
        nToInsert++;
788
0
        toDelete[nToDelete] = i;
789
0
        nToDelete++;
790
        /* we will not delete the tuple, only replace with dead */
791
0
        Assert(it->size >= SGDTSIZE);
792
0
        spaceToDelete += it->size - SGDTSIZE;
793
0
      }
794
0
      else if (it->tupstate == SPGIST_DEAD)
795
0
      {
796
        /* We could see a DEAD tuple as first/only chain item */
797
0
        Assert(i == current->offnum);
798
0
        Assert(it->nextOffset == InvalidOffsetNumber);
799
0
        toDelete[nToDelete] = i;
800
0
        nToDelete++;
801
        /* replacing it with redirect will save no space */
802
0
      }
803
0
      else
804
0
        elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
805
806
0
      i = it->nextOffset;
807
0
    }
808
0
  }
809
0
  in.nTuples = nToInsert;
810
811
  /*
812
   * We may not actually insert new tuple because another picksplit may be
813
   * necessary due to too large value, but we will try to allocate enough
814
   * space to include it; and in any case it has to be included in the input
815
   * for the picksplit function.  So don't increment nToInsert yet.
816
   */
817
0
  in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
818
0
  heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
819
0
  in.nTuples++;
820
821
0
  memset(&out, 0, sizeof(out));
822
823
0
  if (!isNulls)
824
0
  {
825
    /*
826
     * Perform split using user-defined method.
827
     */
828
0
    procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
829
0
    FunctionCall2Coll(procinfo,
830
0
              index->rd_indcollation[0],
831
0
              PointerGetDatum(&in),
832
0
              PointerGetDatum(&out));
833
834
    /*
835
     * Form new leaf tuples and count up the total space needed.
836
     */
837
0
    totalLeafSizes = 0;
838
0
    for (i = 0; i < in.nTuples; i++)
839
0
    {
840
0
      newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
841
0
                       out.leafTupleDatums[i],
842
0
                       false);
843
0
      totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
844
0
    }
845
0
  }
846
0
  else
847
0
  {
848
    /*
849
     * Perform dummy split that puts all tuples into one node.
850
     * checkAllTheSame will override this and force allTheSame mode.
851
     */
852
0
    out.hasPrefix = false;
853
0
    out.nNodes = 1;
854
0
    out.nodeLabels = NULL;
855
0
    out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
856
857
    /*
858
     * Form new leaf tuples and count up the total space needed.
859
     */
860
0
    totalLeafSizes = 0;
861
0
    for (i = 0; i < in.nTuples; i++)
862
0
    {
863
0
      newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
864
0
                       (Datum) 0,
865
0
                       true);
866
0
      totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
867
0
    }
868
0
  }
869
870
  /*
871
   * Check to see if the picksplit function failed to separate the values,
872
   * ie, it put them all into the same child node.  If so, select allTheSame
873
   * mode and create a random split instead.  See comments for
874
   * checkAllTheSame as to why we need to know if the new leaf tuples could
875
   * fit on one page.
876
   */
877
0
  allTheSame = checkAllTheSame(&in, &out,
878
0
                 totalLeafSizes > SPGIST_PAGE_CAPACITY,
879
0
                 &includeNew);
880
881
  /*
882
   * If checkAllTheSame decided we must exclude the new tuple, don't
883
   * consider it any further.
884
   */
885
0
  if (includeNew)
886
0
    maxToInclude = in.nTuples;
887
0
  else
888
0
  {
889
0
    maxToInclude = in.nTuples - 1;
890
0
    totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
891
0
  }
892
893
  /*
894
   * Allocate per-node work arrays.  Since checkAllTheSame could replace
895
   * out.nNodes with a value larger than the number of tuples on the input
896
   * page, we can't allocate these arrays before here.
897
   */
898
0
  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
899
0
  leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
900
901
  /*
902
   * Form nodes of inner tuple and inner tuple itself
903
   */
904
0
  for (i = 0; i < out.nNodes; i++)
905
0
  {
906
0
    Datum   label = (Datum) 0;
907
0
    bool    labelisnull = (out.nodeLabels == NULL);
908
909
0
    if (!labelisnull)
910
0
      label = out.nodeLabels[i];
911
0
    nodes[i] = spgFormNodeTuple(state, label, labelisnull);
912
0
  }
913
0
  innerTuple = spgFormInnerTuple(state,
914
0
                   out.hasPrefix, out.prefixDatum,
915
0
                   out.nNodes, nodes);
916
0
  innerTuple->allTheSame = allTheSame;
917
918
  /*
919
   * Update nodes[] array to point into the newly formed innerTuple, so that
920
   * we can adjust their downlinks below.
921
   */
922
0
  SGITITERATE(innerTuple, i, node)
923
0
  {
924
0
    nodes[i] = node;
925
0
  }
926
927
  /*
928
   * Re-scan new leaf tuples and count up the space needed under each node.
929
   */
930
0
  for (i = 0; i < maxToInclude; i++)
931
0
  {
932
0
    n = out.mapTuplesToNodes[i];
933
0
    if (n < 0 || n >= out.nNodes)
934
0
      elog(ERROR, "inconsistent result of SPGiST picksplit function");
935
0
    leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
936
0
  }
937
938
  /*
939
   * To perform the split, we must insert a new inner tuple, which can't go
940
   * on a leaf page; and unless we are splitting the root page, we must then
941
   * update the parent tuple's downlink to point to the inner tuple.  If
942
   * there is room, we'll put the new inner tuple on the same page as the
943
   * parent tuple, otherwise we need another non-leaf buffer. But if the
944
   * parent page is the root, we can't add the new inner tuple there,
945
   * because the root page must have only one inner tuple.
946
   */
947
0
  xlrec.initInner = false;
948
0
  if (parent->buffer != InvalidBuffer &&
949
0
    !SpGistBlockIsRoot(parent->blkno) &&
950
0
    (SpGistPageGetFreeSpace(parent->page, 1) >=
951
0
     innerTuple->size + sizeof(ItemIdData)))
952
0
  {
953
    /* New inner tuple will fit on parent page */
954
0
    newInnerBuffer = parent->buffer;
955
0
  }
956
0
  else if (parent->buffer != InvalidBuffer)
957
0
  {
958
    /* Send tuple to page with next triple parity (see README) */
959
0
    newInnerBuffer = SpGistGetBuffer(index,
960
0
                     GBUF_INNER_PARITY(parent->blkno + 1) |
961
0
                     (isNulls ? GBUF_NULLS : 0),
962
0
                     innerTuple->size + sizeof(ItemIdData),
963
0
                     &xlrec.initInner);
964
0
  }
965
0
  else
966
0
  {
967
    /* Root page split ... inner tuple will go to root page */
968
0
    newInnerBuffer = InvalidBuffer;
969
0
  }
970
971
  /*
972
   * The new leaf tuples converted from the existing ones should require the
973
   * same or less space, and therefore should all fit onto one page
974
   * (although that's not necessarily the current page, since we can't
975
   * delete the old tuples but only replace them with placeholders).
976
   * However, the incoming new tuple might not also fit, in which case we
977
   * might need another picksplit cycle to reduce it some more.
978
   *
979
   * If there's not room to put everything back onto the current page, then
980
   * we decide on a per-node basis which tuples go to the new page. (We do
981
   * it like that because leaf tuple chains can't cross pages, so we must
982
   * place all leaf tuples belonging to the same parent node on the same
983
   * page.)
984
   *
985
   * If we are splitting the root page (turning it from a leaf page into an
986
   * inner page), then no leaf tuples can go back to the current page; they
987
   * must all go somewhere else.
988
   */
989
0
  if (!SpGistBlockIsRoot(current->blkno))
990
0
    currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
991
0
  else
992
0
    currentFreeSpace = 0; /* prevent assigning any tuples to current */
993
994
0
  xlrec.initDest = false;
995
996
0
  if (totalLeafSizes <= currentFreeSpace)
997
0
  {
998
    /* All the leaf tuples will fit on current page */
999
0
    newLeafBuffer = InvalidBuffer;
1000
    /* mark new leaf tuple as included in insertions, if allowed */
1001
0
    if (includeNew)
1002
0
    {
1003
0
      nToInsert++;
1004
0
      insertedNew = true;
1005
0
    }
1006
0
    for (i = 0; i < nToInsert; i++)
1007
0
      leafPageSelect[i] = 0; /* signifies current page */
1008
0
  }
1009
0
  else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1010
0
  {
1011
    /*
1012
     * We're trying to split up a long value by repeated suffixing, but
1013
     * it's not going to fit yet.  Don't bother allocating a second leaf
1014
     * buffer that we won't be able to use.
1015
     */
1016
0
    newLeafBuffer = InvalidBuffer;
1017
0
    Assert(includeNew);
1018
0
    Assert(nToInsert == 0);
1019
0
  }
1020
0
  else
1021
0
  {
1022
    /* We will need another leaf page */
1023
0
    uint8    *nodePageSelect;
1024
0
    int     curspace;
1025
0
    int     newspace;
1026
1027
0
    newLeafBuffer = SpGistGetBuffer(index,
1028
0
                    GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1029
0
                    Min(totalLeafSizes,
1030
0
                      SPGIST_PAGE_CAPACITY),
1031
0
                    &xlrec.initDest);
1032
1033
    /*
1034
     * Attempt to assign node groups to the two pages.  We might fail to
1035
     * do so, even if totalLeafSizes is less than the available space,
1036
     * because we can't split a group across pages.
1037
     */
1038
0
    nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1039
1040
0
    curspace = currentFreeSpace;
1041
0
    newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1042
0
    for (i = 0; i < out.nNodes; i++)
1043
0
    {
1044
0
      if (leafSizes[i] <= curspace)
1045
0
      {
1046
0
        nodePageSelect[i] = 0;  /* signifies current page */
1047
0
        curspace -= leafSizes[i];
1048
0
      }
1049
0
      else
1050
0
      {
1051
0
        nodePageSelect[i] = 1;  /* signifies new leaf page */
1052
0
        newspace -= leafSizes[i];
1053
0
      }
1054
0
    }
1055
0
    if (curspace >= 0 && newspace >= 0)
1056
0
    {
1057
      /* Successful assignment, so we can include the new leaf tuple */
1058
0
      if (includeNew)
1059
0
      {
1060
0
        nToInsert++;
1061
0
        insertedNew = true;
1062
0
      }
1063
0
    }
1064
0
    else if (includeNew)
1065
0
    {
1066
      /* We must exclude the new leaf tuple from the split */
1067
0
      int     nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1068
1069
0
      leafSizes[nodeOfNewTuple] -=
1070
0
        newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1071
1072
      /* Repeat the node assignment process --- should succeed now */
1073
0
      curspace = currentFreeSpace;
1074
0
      newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1075
0
      for (i = 0; i < out.nNodes; i++)
1076
0
      {
1077
0
        if (leafSizes[i] <= curspace)
1078
0
        {
1079
0
          nodePageSelect[i] = 0;  /* signifies current page */
1080
0
          curspace -= leafSizes[i];
1081
0
        }
1082
0
        else
1083
0
        {
1084
0
          nodePageSelect[i] = 1;  /* signifies new leaf page */
1085
0
          newspace -= leafSizes[i];
1086
0
        }
1087
0
      }
1088
0
      if (curspace < 0 || newspace < 0)
1089
0
        elog(ERROR, "failed to divide leaf tuple groups across pages");
1090
0
    }
1091
0
    else
1092
0
    {
1093
      /* oops, we already excluded new tuple ... should not get here */
1094
0
      elog(ERROR, "failed to divide leaf tuple groups across pages");
1095
0
    }
1096
    /* Expand the per-node assignments to be shown per leaf tuple */
1097
0
    for (i = 0; i < nToInsert; i++)
1098
0
    {
1099
0
      n = out.mapTuplesToNodes[i];
1100
0
      leafPageSelect[i] = nodePageSelect[n];
1101
0
    }
1102
0
  }
1103
1104
  /* Start preparing WAL record */
1105
0
  xlrec.nDelete = 0;
1106
0
  xlrec.initSrc = isNew;
1107
0
  xlrec.storesNulls = isNulls;
1108
0
  xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1109
1110
0
  leafdata = leafptr = (char *) palloc(totalLeafSizes);
1111
1112
  /* Here we begin making the changes to the target pages */
1113
0
  START_CRIT_SECTION();
1114
1115
  /*
1116
   * Delete old leaf tuples from current buffer, except when we're splitting
1117
   * the root; in that case there's no need because we'll re-init the page
1118
   * below.  We do this first to make room for reinserting new leaf tuples.
1119
   */
1120
0
  if (!SpGistBlockIsRoot(current->blkno))
1121
0
  {
1122
    /*
1123
     * Init buffer instead of deleting individual tuples, but only if
1124
     * there aren't any other live tuples and only during build; otherwise
1125
     * we need to set a redirection tuple for concurrent scans.
1126
     */
1127
0
    if (state->isBuild &&
1128
0
      nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1129
0
      PageGetMaxOffsetNumber(current->page))
1130
0
    {
1131
0
      SpGistInitBuffer(current->buffer,
1132
0
               SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1133
0
      xlrec.initSrc = true;
1134
0
    }
1135
0
    else if (isNew)
1136
0
    {
1137
      /* don't expose the freshly init'd buffer as a backup block */
1138
0
      Assert(nToDelete == 0);
1139
0
    }
1140
0
    else
1141
0
    {
1142
0
      xlrec.nDelete = nToDelete;
1143
1144
0
      if (!state->isBuild)
1145
0
      {
1146
        /*
1147
         * Need to create redirect tuple (it will point to new inner
1148
         * tuple) but right now the new tuple's location is not known
1149
         * yet.  So, set the redirection pointer to "impossible" value
1150
         * and remember its position to update tuple later.
1151
         */
1152
0
        if (nToDelete > 0)
1153
0
          redirectTuplePos = toDelete[0];
1154
0
        spgPageIndexMultiDelete(state, current->page,
1155
0
                    toDelete, nToDelete,
1156
0
                    SPGIST_REDIRECT,
1157
0
                    SPGIST_PLACEHOLDER,
1158
0
                    SPGIST_METAPAGE_BLKNO,
1159
0
                    FirstOffsetNumber);
1160
0
      }
1161
0
      else
1162
0
      {
1163
        /*
1164
         * During index build there is not concurrent searches, so we
1165
         * don't need to create redirection tuple.
1166
         */
1167
0
        spgPageIndexMultiDelete(state, current->page,
1168
0
                    toDelete, nToDelete,
1169
0
                    SPGIST_PLACEHOLDER,
1170
0
                    SPGIST_PLACEHOLDER,
1171
0
                    InvalidBlockNumber,
1172
0
                    InvalidOffsetNumber);
1173
0
      }
1174
0
    }
1175
0
  }
1176
1177
  /*
1178
   * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1179
   * nodes.
1180
   */
1181
0
  startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1182
0
  for (i = 0; i < nToInsert; i++)
1183
0
  {
1184
0
    SpGistLeafTuple it = newLeafs[i];
1185
0
    Buffer    leafBuffer;
1186
0
    BlockNumber leafBlock;
1187
0
    OffsetNumber newoffset;
1188
1189
    /* Which page is it going to? */
1190
0
    leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1191
0
    leafBlock = BufferGetBlockNumber(leafBuffer);
1192
1193
    /* Link tuple into correct chain for its node */
1194
0
    n = out.mapTuplesToNodes[i];
1195
1196
0
    if (ItemPointerIsValid(&nodes[n]->t_tid))
1197
0
    {
1198
0
      Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1199
0
      it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
1200
0
    }
1201
0
    else
1202
0
      it->nextOffset = InvalidOffsetNumber;
1203
1204
    /* Insert it on page */
1205
0
    newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1206
0
                     (Item) it, it->size,
1207
0
                     &startOffsets[leafPageSelect[i]],
1208
0
                     false);
1209
0
    toInsert[i] = newoffset;
1210
1211
    /* ... and complete the chain linking */
1212
0
    ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1213
1214
    /* Also copy leaf tuple into WAL data */
1215
0
    memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1216
0
    leafptr += newLeafs[i]->size;
1217
0
  }
1218
1219
  /*
1220
   * We're done modifying the other leaf buffer (if any), so mark it dirty.
1221
   * current->buffer will be marked below, after we're entirely done
1222
   * modifying it.
1223
   */
1224
0
  if (newLeafBuffer != InvalidBuffer)
1225
0
  {
1226
0
    MarkBufferDirty(newLeafBuffer);
1227
0
  }
1228
1229
  /* Remember current buffer, since we're about to change "current" */
1230
0
  saveCurrent = *current;
1231
1232
  /*
1233
   * Store the new innerTuple
1234
   */
1235
0
  if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1236
0
  {
1237
    /*
1238
     * new inner tuple goes to parent page
1239
     */
1240
0
    Assert(current->buffer != parent->buffer);
1241
1242
    /* Repoint "current" at the new inner tuple */
1243
0
    current->blkno = parent->blkno;
1244
0
    current->buffer = parent->buffer;
1245
0
    current->page = parent->page;
1246
0
    xlrec.offnumInner = current->offnum =
1247
0
      SpGistPageAddNewItem(state, current->page,
1248
0
                 (Item) innerTuple, innerTuple->size,
1249
0
                 NULL, false);
1250
1251
    /*
1252
     * Update parent node link and mark parent page dirty
1253
     */
1254
0
    xlrec.innerIsParent = true;
1255
0
    xlrec.offnumParent = parent->offnum;
1256
0
    xlrec.nodeI = parent->node;
1257
0
    saveNodeLink(index, parent, current->blkno, current->offnum);
1258
1259
    /*
1260
     * Update redirection link (in old current buffer)
1261
     */
1262
0
    if (redirectTuplePos != InvalidOffsetNumber)
1263
0
      setRedirectionTuple(&saveCurrent, redirectTuplePos,
1264
0
                current->blkno, current->offnum);
1265
1266
    /* Done modifying old current buffer, mark it dirty */
1267
0
    MarkBufferDirty(saveCurrent.buffer);
1268
0
  }
1269
0
  else if (parent->buffer != InvalidBuffer)
1270
0
  {
1271
    /*
1272
     * new inner tuple will be stored on a new page
1273
     */
1274
0
    Assert(newInnerBuffer != InvalidBuffer);
1275
1276
    /* Repoint "current" at the new inner tuple */
1277
0
    current->buffer = newInnerBuffer;
1278
0
    current->blkno = BufferGetBlockNumber(current->buffer);
1279
0
    current->page = BufferGetPage(current->buffer);
1280
0
    xlrec.offnumInner = current->offnum =
1281
0
      SpGistPageAddNewItem(state, current->page,
1282
0
                 (Item) innerTuple, innerTuple->size,
1283
0
                 NULL, false);
1284
1285
    /* Done modifying new current buffer, mark it dirty */
1286
0
    MarkBufferDirty(current->buffer);
1287
1288
    /*
1289
     * Update parent node link and mark parent page dirty
1290
     */
1291
0
    xlrec.innerIsParent = (parent->buffer == current->buffer);
1292
0
    xlrec.offnumParent = parent->offnum;
1293
0
    xlrec.nodeI = parent->node;
1294
0
    saveNodeLink(index, parent, current->blkno, current->offnum);
1295
1296
    /*
1297
     * Update redirection link (in old current buffer)
1298
     */
1299
0
    if (redirectTuplePos != InvalidOffsetNumber)
1300
0
      setRedirectionTuple(&saveCurrent, redirectTuplePos,
1301
0
                current->blkno, current->offnum);
1302
1303
    /* Done modifying old current buffer, mark it dirty */
1304
0
    MarkBufferDirty(saveCurrent.buffer);
1305
0
  }
1306
0
  else
1307
0
  {
1308
    /*
1309
     * Splitting root page, which was a leaf but now becomes inner page
1310
     * (and so "current" continues to point at it)
1311
     */
1312
0
    Assert(SpGistBlockIsRoot(current->blkno));
1313
0
    Assert(redirectTuplePos == InvalidOffsetNumber);
1314
1315
0
    SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1316
0
    xlrec.initInner = true;
1317
0
    xlrec.innerIsParent = false;
1318
1319
0
    xlrec.offnumInner = current->offnum =
1320
0
      PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1321
0
            InvalidOffsetNumber, false, false);
1322
0
    if (current->offnum != FirstOffsetNumber)
1323
0
      elog(ERROR, "failed to add item of size %u to SPGiST index page",
1324
0
         innerTuple->size);
1325
1326
    /* No parent link to update, nor redirection to do */
1327
0
    xlrec.offnumParent = InvalidOffsetNumber;
1328
0
    xlrec.nodeI = 0;
1329
1330
    /* Done modifying new current buffer, mark it dirty */
1331
0
    MarkBufferDirty(current->buffer);
1332
1333
    /* saveCurrent doesn't represent a different buffer */
1334
0
    saveCurrent.buffer = InvalidBuffer;
1335
0
  }
1336
1337
0
  if (RelationNeedsWAL(index))
1338
0
  {
1339
0
    XLogRecPtr  recptr;
1340
0
    int     flags;
1341
1342
0
    XLogBeginInsert();
1343
1344
0
    xlrec.nInsert = nToInsert;
1345
0
    XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1346
1347
0
    XLogRegisterData((char *) toDelete,
1348
0
             sizeof(OffsetNumber) * xlrec.nDelete);
1349
0
    XLogRegisterData((char *) toInsert,
1350
0
             sizeof(OffsetNumber) * xlrec.nInsert);
1351
0
    XLogRegisterData((char *) leafPageSelect,
1352
0
             sizeof(uint8) * xlrec.nInsert);
1353
0
    XLogRegisterData((char *) innerTuple, innerTuple->size);
1354
0
    XLogRegisterData(leafdata, leafptr - leafdata);
1355
1356
    /* Old leaf page */
1357
0
    if (BufferIsValid(saveCurrent.buffer))
1358
0
    {
1359
0
      flags = REGBUF_STANDARD;
1360
0
      if (xlrec.initSrc)
1361
0
        flags |= REGBUF_WILL_INIT;
1362
0
      XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1363
0
    }
1364
1365
    /* New leaf page */
1366
0
    if (BufferIsValid(newLeafBuffer))
1367
0
    {
1368
0
      flags = REGBUF_STANDARD;
1369
0
      if (xlrec.initDest)
1370
0
        flags |= REGBUF_WILL_INIT;
1371
0
      XLogRegisterBuffer(1, newLeafBuffer, flags);
1372
0
    }
1373
1374
    /* Inner page */
1375
0
    flags = REGBUF_STANDARD;
1376
0
    if (xlrec.initInner)
1377
0
      flags |= REGBUF_WILL_INIT;
1378
0
    XLogRegisterBuffer(2, current->buffer, flags);
1379
1380
    /* Parent page, if different from inner page */
1381
0
    if (parent->buffer != InvalidBuffer)
1382
0
    {
1383
0
      if (parent->buffer != current->buffer)
1384
0
        XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1385
0
      else
1386
0
        Assert(xlrec.innerIsParent);
1387
0
    }
1388
1389
    /* Issue the WAL record */
1390
0
    recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1391
1392
    /* Update page LSNs on all affected pages */
1393
0
    if (newLeafBuffer != InvalidBuffer)
1394
0
    {
1395
0
      Page    page = BufferGetPage(newLeafBuffer);
1396
1397
0
      PageSetLSN(page, recptr);
1398
0
    }
1399
1400
0
    if (saveCurrent.buffer != InvalidBuffer)
1401
0
    {
1402
0
      Page    page = BufferGetPage(saveCurrent.buffer);
1403
1404
0
      PageSetLSN(page, recptr);
1405
0
    }
1406
1407
0
    PageSetLSN(current->page, recptr);
1408
1409
0
    if (parent->buffer != InvalidBuffer)
1410
0
    {
1411
0
      PageSetLSN(parent->page, recptr);
1412
0
    }
1413
0
  }
1414
1415
0
  END_CRIT_SECTION();
1416
1417
  /* Update local free-space cache and unlock buffers */
1418
0
  if (newLeafBuffer != InvalidBuffer)
1419
0
  {
1420
0
    SpGistSetLastUsedPage(index, newLeafBuffer);
1421
0
    UnlockReleaseBuffer(newLeafBuffer);
1422
0
  }
1423
0
  if (saveCurrent.buffer != InvalidBuffer)
1424
0
  {
1425
0
    SpGistSetLastUsedPage(index, saveCurrent.buffer);
1426
0
    UnlockReleaseBuffer(saveCurrent.buffer);
1427
0
  }
1428
1429
0
  return insertedNew;
1430
0
}
1431
1432
/*
1433
 * spgMatchNode action: descend to N'th child node of current inner tuple
1434
 */
1435
static void
1436
spgMatchNodeAction(Relation index, SpGistState *state,
1437
           SpGistInnerTuple innerTuple,
1438
           SPPageDesc *current, SPPageDesc *parent, int nodeN)
1439
0
{
1440
0
  int     i;
1441
0
  SpGistNodeTuple node;
1442
1443
  /* Release previous parent buffer if any */
1444
0
  if (parent->buffer != InvalidBuffer &&
1445
0
    parent->buffer != current->buffer)
1446
0
  {
1447
0
    SpGistSetLastUsedPage(index, parent->buffer);
1448
0
    UnlockReleaseBuffer(parent->buffer);
1449
0
  }
1450
1451
  /* Repoint parent to specified node of current inner tuple */
1452
0
  parent->blkno = current->blkno;
1453
0
  parent->buffer = current->buffer;
1454
0
  parent->page = current->page;
1455
0
  parent->offnum = current->offnum;
1456
0
  parent->node = nodeN;
1457
1458
  /* Locate that node */
1459
0
  SGITITERATE(innerTuple, i, node)
1460
0
  {
1461
0
    if (i == nodeN)
1462
0
      break;
1463
0
  }
1464
1465
0
  if (i != nodeN)
1466
0
    elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1467
0
       nodeN);
1468
1469
  /* Point current to the downlink location, if any */
1470
0
  if (ItemPointerIsValid(&node->t_tid))
1471
0
  {
1472
0
    current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1473
0
    current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1474
0
  }
1475
0
  else
1476
0
  {
1477
    /* Downlink is empty, so we'll need to find a new page */
1478
0
    current->blkno = InvalidBlockNumber;
1479
0
    current->offnum = InvalidOffsetNumber;
1480
0
  }
1481
1482
0
  current->buffer = InvalidBuffer;
1483
0
  current->page = NULL;
1484
0
}
1485
1486
/*
1487
 * spgAddNode action: add a node to the inner tuple at current
1488
 */
1489
static void
1490
spgAddNodeAction(Relation index, SpGistState *state,
1491
         SpGistInnerTuple innerTuple,
1492
         SPPageDesc *current, SPPageDesc *parent,
1493
         int nodeN, Datum nodeLabel)
1494
0
{
1495
0
  SpGistInnerTuple newInnerTuple;
1496
0
  spgxlogAddNode xlrec;
1497
1498
  /* Should not be applied to nulls */
1499
0
  Assert(!SpGistPageStoresNulls(current->page));
1500
1501
  /* Construct new inner tuple with additional node */
1502
0
  newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1503
1504
  /* Prepare WAL record */
1505
0
  STORE_STATE(state, xlrec.stateSrc);
1506
0
  xlrec.offnum = current->offnum;
1507
1508
  /* we don't fill these unless we need to change the parent downlink */
1509
0
  xlrec.parentBlk = -1;
1510
0
  xlrec.offnumParent = InvalidOffsetNumber;
1511
0
  xlrec.nodeI = 0;
1512
1513
  /* we don't fill these unless tuple has to be moved */
1514
0
  xlrec.offnumNew = InvalidOffsetNumber;
1515
0
  xlrec.newPage = false;
1516
1517
0
  if (PageGetExactFreeSpace(current->page) >=
1518
0
    newInnerTuple->size - innerTuple->size)
1519
0
  {
1520
    /*
1521
     * We can replace the inner tuple by new version in-place
1522
     */
1523
0
    START_CRIT_SECTION();
1524
1525
0
    PageIndexTupleDelete(current->page, current->offnum);
1526
0
    if (PageAddItem(current->page,
1527
0
            (Item) newInnerTuple, newInnerTuple->size,
1528
0
            current->offnum, false, false) != current->offnum)
1529
0
      elog(ERROR, "failed to add item of size %u to SPGiST index page",
1530
0
         newInnerTuple->size);
1531
1532
0
    MarkBufferDirty(current->buffer);
1533
1534
0
    if (RelationNeedsWAL(index))
1535
0
    {
1536
0
      XLogRecPtr  recptr;
1537
1538
0
      XLogBeginInsert();
1539
0
      XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1540
0
      XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1541
1542
0
      XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1543
1544
0
      recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1545
1546
0
      PageSetLSN(current->page, recptr);
1547
0
    }
1548
1549
0
    END_CRIT_SECTION();
1550
0
  }
1551
0
  else
1552
0
  {
1553
    /*
1554
     * move inner tuple to another page, and update parent
1555
     */
1556
0
    SpGistDeadTuple dt;
1557
0
    SPPageDesc  saveCurrent;
1558
1559
    /*
1560
     * It should not be possible to get here for the root page, since we
1561
     * allow only one inner tuple on the root page, and spgFormInnerTuple
1562
     * always checks that inner tuples don't exceed the size of a page.
1563
     */
1564
0
    if (SpGistBlockIsRoot(current->blkno))
1565
0
      elog(ERROR, "cannot enlarge root tuple any more");
1566
0
    Assert(parent->buffer != InvalidBuffer);
1567
1568
0
    saveCurrent = *current;
1569
1570
0
    xlrec.offnumParent = parent->offnum;
1571
0
    xlrec.nodeI = parent->node;
1572
1573
    /*
1574
     * obtain new buffer with the same parity as current, since it will be
1575
     * a child of same parent tuple
1576
     */
1577
0
    current->buffer = SpGistGetBuffer(index,
1578
0
                      GBUF_INNER_PARITY(current->blkno),
1579
0
                      newInnerTuple->size + sizeof(ItemIdData),
1580
0
                      &xlrec.newPage);
1581
0
    current->blkno = BufferGetBlockNumber(current->buffer);
1582
0
    current->page = BufferGetPage(current->buffer);
1583
1584
    /*
1585
     * Let's just make real sure new current isn't same as old.  Right now
1586
     * that's impossible, but if SpGistGetBuffer ever got smart enough to
1587
     * delete placeholder tuples before checking space, maybe it wouldn't
1588
     * be impossible.  The case would appear to work except that WAL
1589
     * replay would be subtly wrong, so I think a mere assert isn't enough
1590
     * here.
1591
     */
1592
0
    if (current->blkno == saveCurrent.blkno)
1593
0
      elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1594
1595
    /*
1596
     * New current and parent buffer will both be modified; but note that
1597
     * parent buffer could be same as either new or old current.
1598
     */
1599
0
    if (parent->buffer == saveCurrent.buffer)
1600
0
      xlrec.parentBlk = 0;
1601
0
    else if (parent->buffer == current->buffer)
1602
0
      xlrec.parentBlk = 1;
1603
0
    else
1604
0
      xlrec.parentBlk = 2;
1605
1606
0
    START_CRIT_SECTION();
1607
1608
    /* insert new ... */
1609
0
    xlrec.offnumNew = current->offnum =
1610
0
      SpGistPageAddNewItem(state, current->page,
1611
0
                 (Item) newInnerTuple, newInnerTuple->size,
1612
0
                 NULL, false);
1613
1614
0
    MarkBufferDirty(current->buffer);
1615
1616
    /* update parent's downlink and mark parent page dirty */
1617
0
    saveNodeLink(index, parent, current->blkno, current->offnum);
1618
1619
    /*
1620
     * Replace old tuple with a placeholder or redirection tuple.  Unless
1621
     * doing an index build, we have to insert a redirection tuple for
1622
     * possible concurrent scans.  We can't just delete it in any case,
1623
     * because that could change the offsets of other tuples on the page,
1624
     * breaking downlinks from their parents.
1625
     */
1626
0
    if (state->isBuild)
1627
0
      dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1628
0
                  InvalidBlockNumber, InvalidOffsetNumber);
1629
0
    else
1630
0
      dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1631
0
                  current->blkno, current->offnum);
1632
1633
0
    PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1634
0
    if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1635
0
            saveCurrent.offnum,
1636
0
            false, false) != saveCurrent.offnum)
1637
0
      elog(ERROR, "failed to add item of size %u to SPGiST index page",
1638
0
         dt->size);
1639
1640
0
    if (state->isBuild)
1641
0
      SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1642
0
    else
1643
0
      SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1644
1645
0
    MarkBufferDirty(saveCurrent.buffer);
1646
1647
0
    if (RelationNeedsWAL(index))
1648
0
    {
1649
0
      XLogRecPtr  recptr;
1650
0
      int     flags;
1651
1652
0
      XLogBeginInsert();
1653
1654
      /* orig page */
1655
0
      XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1656
      /* new page */
1657
0
      flags = REGBUF_STANDARD;
1658
0
      if (xlrec.newPage)
1659
0
        flags |= REGBUF_WILL_INIT;
1660
0
      XLogRegisterBuffer(1, current->buffer, flags);
1661
      /* parent page (if different from orig and new) */
1662
0
      if (xlrec.parentBlk == 2)
1663
0
        XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1664
1665
0
      XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1666
0
      XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1667
1668
0
      recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1669
1670
      /* we don't bother to check if any of these are redundant */
1671
0
      PageSetLSN(current->page, recptr);
1672
0
      PageSetLSN(parent->page, recptr);
1673
0
      PageSetLSN(saveCurrent.page, recptr);
1674
0
    }
1675
1676
0
    END_CRIT_SECTION();
1677
1678
    /* Release saveCurrent if it's not same as current or parent */
1679
0
    if (saveCurrent.buffer != current->buffer &&
1680
0
      saveCurrent.buffer != parent->buffer)
1681
0
    {
1682
0
      SpGistSetLastUsedPage(index, saveCurrent.buffer);
1683
0
      UnlockReleaseBuffer(saveCurrent.buffer);
1684
0
    }
1685
0
  }
1686
0
}
1687
1688
/*
1689
 * spgSplitNode action: split inner tuple at current into prefix and postfix
1690
 */
1691
static void
1692
spgSplitNodeAction(Relation index, SpGistState *state,
1693
           SpGistInnerTuple innerTuple,
1694
           SPPageDesc *current, spgChooseOut *out)
1695
0
{
1696
0
  SpGistInnerTuple prefixTuple,
1697
0
        postfixTuple;
1698
0
  SpGistNodeTuple node,
1699
0
         *nodes;
1700
0
  BlockNumber postfixBlkno;
1701
0
  OffsetNumber postfixOffset;
1702
0
  int     i;
1703
0
  spgxlogSplitTuple xlrec;
1704
0
  Buffer    newBuffer = InvalidBuffer;
1705
1706
  /* Should not be applied to nulls */
1707
0
  Assert(!SpGistPageStoresNulls(current->page));
1708
1709
  /* Check opclass gave us sane values */
1710
0
  if (out->result.splitTuple.prefixNNodes <= 0 ||
1711
0
    out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1712
0
    elog(ERROR, "invalid number of prefix nodes: %d",
1713
0
       out->result.splitTuple.prefixNNodes);
1714
0
  if (out->result.splitTuple.childNodeN < 0 ||
1715
0
    out->result.splitTuple.childNodeN >=
1716
0
    out->result.splitTuple.prefixNNodes)
1717
0
    elog(ERROR, "invalid child node number: %d",
1718
0
       out->result.splitTuple.childNodeN);
1719
1720
  /*
1721
   * Construct new prefix tuple with requested number of nodes.  We'll fill
1722
   * in the childNodeN'th node's downlink below.
1723
   */
1724
0
  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1725
0
                     out->result.splitTuple.prefixNNodes);
1726
1727
0
  for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1728
0
  {
1729
0
    Datum   label = (Datum) 0;
1730
0
    bool    labelisnull;
1731
1732
0
    labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1733
0
    if (!labelisnull)
1734
0
      label = out->result.splitTuple.prefixNodeLabels[i];
1735
0
    nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1736
0
  }
1737
1738
0
  prefixTuple = spgFormInnerTuple(state,
1739
0
                  out->result.splitTuple.prefixHasPrefix,
1740
0
                  out->result.splitTuple.prefixPrefixDatum,
1741
0
                  out->result.splitTuple.prefixNNodes,
1742
0
                  nodes);
1743
1744
  /* it must fit in the space that innerTuple now occupies */
1745
0
  if (prefixTuple->size > innerTuple->size)
1746
0
    elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1747
1748
  /*
1749
   * Construct new postfix tuple, containing all nodes of innerTuple with
1750
   * same node datums, but with the prefix specified by the picksplit
1751
   * function.
1752
   */
1753
0
  nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1754
0
  SGITITERATE(innerTuple, i, node)
1755
0
  {
1756
0
    nodes[i] = node;
1757
0
  }
1758
1759
0
  postfixTuple = spgFormInnerTuple(state,
1760
0
                   out->result.splitTuple.postfixHasPrefix,
1761
0
                   out->result.splitTuple.postfixPrefixDatum,
1762
0
                   innerTuple->nNodes, nodes);
1763
1764
  /* Postfix tuple is allTheSame if original tuple was */
1765
0
  postfixTuple->allTheSame = innerTuple->allTheSame;
1766
1767
  /* prep data for WAL record */
1768
0
  xlrec.newPage = false;
1769
1770
  /*
1771
   * If we can't fit both tuples on the current page, get a new page for the
1772
   * postfix tuple.  In particular, can't split to the root page.
1773
   *
1774
   * For the space calculation, note that prefixTuple replaces innerTuple
1775
   * but postfixTuple will be a new entry.
1776
   */
1777
0
  if (SpGistBlockIsRoot(current->blkno) ||
1778
0
    SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1779
0
    prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1780
0
  {
1781
    /*
1782
     * Choose page with next triple parity, because postfix tuple is a
1783
     * child of prefix one
1784
     */
1785
0
    newBuffer = SpGistGetBuffer(index,
1786
0
                  GBUF_INNER_PARITY(current->blkno + 1),
1787
0
                  postfixTuple->size + sizeof(ItemIdData),
1788
0
                  &xlrec.newPage);
1789
0
  }
1790
1791
0
  START_CRIT_SECTION();
1792
1793
  /*
1794
   * Replace old tuple by prefix tuple
1795
   */
1796
0
  PageIndexTupleDelete(current->page, current->offnum);
1797
0
  xlrec.offnumPrefix = PageAddItem(current->page,
1798
0
                   (Item) prefixTuple, prefixTuple->size,
1799
0
                   current->offnum, false, false);
1800
0
  if (xlrec.offnumPrefix != current->offnum)
1801
0
    elog(ERROR, "failed to add item of size %u to SPGiST index page",
1802
0
       prefixTuple->size);
1803
1804
  /*
1805
   * put postfix tuple into appropriate page
1806
   */
1807
0
  if (newBuffer == InvalidBuffer)
1808
0
  {
1809
0
    postfixBlkno = current->blkno;
1810
0
    xlrec.offnumPostfix = postfixOffset =
1811
0
      SpGistPageAddNewItem(state, current->page,
1812
0
                 (Item) postfixTuple, postfixTuple->size,
1813
0
                 NULL, false);
1814
0
    xlrec.postfixBlkSame = true;
1815
0
  }
1816
0
  else
1817
0
  {
1818
0
    postfixBlkno = BufferGetBlockNumber(newBuffer);
1819
0
    xlrec.offnumPostfix = postfixOffset =
1820
0
      SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1821
0
                 (Item) postfixTuple, postfixTuple->size,
1822
0
                 NULL, false);
1823
0
    MarkBufferDirty(newBuffer);
1824
0
    xlrec.postfixBlkSame = false;
1825
0
  }
1826
1827
  /*
1828
   * And set downlink pointer in the prefix tuple to point to postfix tuple.
1829
   * (We can't avoid this step by doing the above two steps in opposite
1830
   * order, because there might not be enough space on the page to insert
1831
   * the postfix tuple first.)  We have to update the local copy of the
1832
   * prefixTuple too, because that's what will be written to WAL.
1833
   */
1834
0
  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1835
0
            postfixBlkno, postfixOffset);
1836
0
  prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1837
0
                         PageGetItemId(current->page, current->offnum));
1838
0
  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1839
0
            postfixBlkno, postfixOffset);
1840
1841
0
  MarkBufferDirty(current->buffer);
1842
1843
0
  if (RelationNeedsWAL(index))
1844
0
  {
1845
0
    XLogRecPtr  recptr;
1846
1847
0
    XLogBeginInsert();
1848
0
    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1849
0
    XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1850
0
    XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1851
1852
0
    XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1853
0
    if (newBuffer != InvalidBuffer)
1854
0
    {
1855
0
      int     flags;
1856
1857
0
      flags = REGBUF_STANDARD;
1858
0
      if (xlrec.newPage)
1859
0
        flags |= REGBUF_WILL_INIT;
1860
0
      XLogRegisterBuffer(1, newBuffer, flags);
1861
0
    }
1862
1863
0
    recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1864
1865
0
    PageSetLSN(current->page, recptr);
1866
1867
0
    if (newBuffer != InvalidBuffer)
1868
0
    {
1869
0
      PageSetLSN(BufferGetPage(newBuffer), recptr);
1870
0
    }
1871
0
  }
1872
1873
0
  END_CRIT_SECTION();
1874
1875
  /* Update local free-space cache and release buffer */
1876
0
  if (newBuffer != InvalidBuffer)
1877
0
  {
1878
0
    SpGistSetLastUsedPage(index, newBuffer);
1879
0
    UnlockReleaseBuffer(newBuffer);
1880
0
  }
1881
0
}
1882
1883
/*
1884
 * Insert one item into the index.
1885
 *
1886
 * Returns true on success, false if we failed to complete the insertion
1887
 * because of conflict with a concurrent insert.  In the latter case,
1888
 * caller should re-call spgdoinsert() with the same args.
1889
 */
1890
bool
1891
spgdoinsert(Relation index, SpGistState *state,
1892
      ItemPointer heapPtr, Datum datum, bool isnull)
1893
0
{
1894
0
  int     level = 0;
1895
0
  Datum   leafDatum;
1896
0
  int     leafSize;
1897
0
  SPPageDesc  current,
1898
0
        parent;
1899
0
  FmgrInfo   *procinfo = NULL;
1900
1901
  /*
1902
   * Look up FmgrInfo of the user-defined choose function once, to save
1903
   * cycles in the loop below.
1904
   */
1905
0
  if (!isnull)
1906
0
    procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1907
1908
  /*
1909
   * Prepare the leaf datum to insert.
1910
   *
1911
   * If an optional "compress" method is provided, then call it to form the
1912
   * leaf datum from the input datum.  Otherwise store the input datum as
1913
   * is.  Since we don't use index_form_tuple in this AM, we have to make
1914
   * sure value to be inserted is not toasted; FormIndexDatum doesn't
1915
   * guarantee that.  But we assume the "compress" method to return an
1916
   * untoasted value.
1917
   */
1918
0
  if (!isnull)
1919
0
  {
1920
0
    if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
1921
0
    {
1922
0
      FmgrInfo   *compressProcinfo = NULL;
1923
1924
0
      compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1925
0
      leafDatum = FunctionCall1Coll(compressProcinfo,
1926
0
                      index->rd_indcollation[0],
1927
0
                      datum);
1928
0
    }
1929
0
    else
1930
0
    {
1931
0
      Assert(state->attLeafType.type == state->attType.type);
1932
1933
0
      if (state->attType.attlen == -1)
1934
0
        leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum));
1935
0
      else
1936
0
        leafDatum = datum;
1937
0
    }
1938
0
  }
1939
0
  else
1940
0
    leafDatum = (Datum) 0;
1941
1942
  /*
1943
   * Compute space needed for a leaf tuple containing the given datum.
1944
   *
1945
   * If it isn't gonna fit, and the opclass can't reduce the datum size by
1946
   * suffixing, bail out now rather than getting into an endless loop.
1947
   */
1948
0
  if (!isnull)
1949
0
    leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
1950
0
      SpGistGetTypeSize(&state->attLeafType, leafDatum);
1951
0
  else
1952
0
    leafSize = SGDTSIZE + sizeof(ItemIdData);
1953
1954
0
  if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
1955
0
    ereport(ERROR,
1956
0
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1957
0
         errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1958
0
            leafSize - sizeof(ItemIdData),
1959
0
            SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
1960
0
            RelationGetRelationName(index)),
1961
0
         errhint("Values larger than a buffer page cannot be indexed.")));
1962
1963
  /* Initialize "current" to the appropriate root page */
1964
0
  current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
1965
0
  current.buffer = InvalidBuffer;
1966
0
  current.page = NULL;
1967
0
  current.offnum = FirstOffsetNumber;
1968
0
  current.node = -1;
1969
1970
  /* "parent" is invalid for the moment */
1971
0
  parent.blkno = InvalidBlockNumber;
1972
0
  parent.buffer = InvalidBuffer;
1973
0
  parent.page = NULL;
1974
0
  parent.offnum = InvalidOffsetNumber;
1975
0
  parent.node = -1;
1976
1977
0
  for (;;)
1978
0
  {
1979
0
    bool    isNew = false;
1980
1981
    /*
1982
     * Bail out if query cancel is pending.  We must have this somewhere
1983
     * in the loop since a broken opclass could produce an infinite
1984
     * picksplit loop.
1985
     */
1986
0
    CHECK_FOR_INTERRUPTS();
1987
1988
0
    if (current.blkno == InvalidBlockNumber)
1989
0
    {
1990
      /*
1991
       * Create a leaf page.  If leafSize is too large to fit on a page,
1992
       * we won't actually use the page yet, but it simplifies the API
1993
       * for doPickSplit to always have a leaf page at hand; so just
1994
       * quietly limit our request to a page size.
1995
       */
1996
0
      current.buffer =
1997
0
        SpGistGetBuffer(index,
1998
0
                GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
1999
0
                Min(leafSize, SPGIST_PAGE_CAPACITY),
2000
0
                &isNew);
2001
0
      current.blkno = BufferGetBlockNumber(current.buffer);
2002
0
    }
2003
0
    else if (parent.buffer == InvalidBuffer)
2004
0
    {
2005
      /* we hold no parent-page lock, so no deadlock is possible */
2006
0
      current.buffer = ReadBuffer(index, current.blkno);
2007
0
      LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
2008
0
    }
2009
0
    else if (current.blkno != parent.blkno)
2010
0
    {
2011
      /* descend to a new child page */
2012
0
      current.buffer = ReadBuffer(index, current.blkno);
2013
2014
      /*
2015
       * Attempt to acquire lock on child page.  We must beware of
2016
       * deadlock against another insertion process descending from that
2017
       * page to our parent page (see README).  If we fail to get lock,
2018
       * abandon the insertion and tell our caller to start over.
2019
       *
2020
       * XXX this could be improved, because failing to get lock on a
2021
       * buffer is not proof of a deadlock situation; the lock might be
2022
       * held by a reader, or even just background writer/checkpointer
2023
       * process.  Perhaps it'd be worth retrying after sleeping a bit?
2024
       */
2025
0
      if (!ConditionalLockBuffer(current.buffer))
2026
0
      {
2027
0
        ReleaseBuffer(current.buffer);
2028
0
        UnlockReleaseBuffer(parent.buffer);
2029
0
        return false;
2030
0
      }
2031
0
    }
2032
0
    else
2033
0
    {
2034
      /* inner tuple can be stored on the same page as parent one */
2035
0
      current.buffer = parent.buffer;
2036
0
    }
2037
0
    current.page = BufferGetPage(current.buffer);
2038
2039
    /* should not arrive at a page of the wrong type */
2040
0
    if (isnull ? !SpGistPageStoresNulls(current.page) :
2041
0
      SpGistPageStoresNulls(current.page))
2042
0
      elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2043
0
         current.blkno);
2044
2045
0
    if (SpGistPageIsLeaf(current.page))
2046
0
    {
2047
0
      SpGistLeafTuple leafTuple;
2048
0
      int     nToSplit,
2049
0
            sizeToSplit;
2050
2051
0
      leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
2052
0
      if (leafTuple->size + sizeof(ItemIdData) <=
2053
0
        SpGistPageGetFreeSpace(current.page, 1))
2054
0
      {
2055
        /* it fits on page, so insert it and we're done */
2056
0
        addLeafTuple(index, state, leafTuple,
2057
0
               &current, &parent, isnull, isNew);
2058
0
        break;
2059
0
      }
2060
0
      else if ((sizeToSplit =
2061
0
            checkSplitConditions(index, state, &current,
2062
0
                       &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2063
0
           nToSplit < 64 &&
2064
0
           leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2065
0
      {
2066
        /*
2067
         * the amount of data is pretty small, so just move the whole
2068
         * chain to another leaf page rather than splitting it.
2069
         */
2070
0
        Assert(!isNew);
2071
0
        moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2072
0
        break;     /* we're done */
2073
0
      }
2074
0
      else
2075
0
      {
2076
        /* picksplit */
2077
0
        if (doPickSplit(index, state, &current, &parent,
2078
0
                leafTuple, level, isnull, isNew))
2079
0
          break;   /* doPickSplit installed new tuples */
2080
2081
        /* leaf tuple will not be inserted yet */
2082
0
        pfree(leafTuple);
2083
2084
        /*
2085
         * current now describes new inner tuple, go insert into it
2086
         */
2087
0
        Assert(!SpGistPageIsLeaf(current.page));
2088
0
        goto process_inner_tuple;
2089
0
      }
2090
0
    }
2091
0
    else          /* non-leaf page */
2092
0
    {
2093
      /*
2094
       * Apply the opclass choose function to figure out how to insert
2095
       * the given datum into the current inner tuple.
2096
       */
2097
0
      SpGistInnerTuple innerTuple;
2098
0
      spgChooseIn in;
2099
0
      spgChooseOut out;
2100
2101
      /*
2102
       * spgAddNode and spgSplitTuple cases will loop back to here to
2103
       * complete the insertion operation.  Just in case the choose
2104
       * function is broken and produces add or split requests
2105
       * repeatedly, check for query cancel.
2106
       */
2107
0
  process_inner_tuple:
2108
0
      CHECK_FOR_INTERRUPTS();
2109
2110
0
      innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2111
0
                            PageGetItemId(current.page, current.offnum));
2112
2113
0
      in.datum = datum;
2114
0
      in.leafDatum = leafDatum;
2115
0
      in.level = level;
2116
0
      in.allTheSame = innerTuple->allTheSame;
2117
0
      in.hasPrefix = (innerTuple->prefixSize > 0);
2118
0
      in.prefixDatum = SGITDATUM(innerTuple, state);
2119
0
      in.nNodes = innerTuple->nNodes;
2120
0
      in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2121
2122
0
      memset(&out, 0, sizeof(out));
2123
2124
0
      if (!isnull)
2125
0
      {
2126
        /* use user-defined choose method */
2127
0
        FunctionCall2Coll(procinfo,
2128
0
                  index->rd_indcollation[0],
2129
0
                  PointerGetDatum(&in),
2130
0
                  PointerGetDatum(&out));
2131
0
      }
2132
0
      else
2133
0
      {
2134
        /* force "match" action (to insert to random subnode) */
2135
0
        out.resultType = spgMatchNode;
2136
0
      }
2137
2138
0
      if (innerTuple->allTheSame)
2139
0
      {
2140
        /*
2141
         * It's not allowed to do an AddNode at an allTheSame tuple.
2142
         * Opclass must say "match", in which case we choose a random
2143
         * one of the nodes to descend into, or "split".
2144
         */
2145
0
        if (out.resultType == spgAddNode)
2146
0
          elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2147
0
        else if (out.resultType == spgMatchNode)
2148
0
          out.result.matchNode.nodeN = random() % innerTuple->nNodes;
2149
0
      }
2150
2151
0
      switch (out.resultType)
2152
0
      {
2153
0
        case spgMatchNode:
2154
          /* Descend to N'th child node */
2155
0
          spgMatchNodeAction(index, state, innerTuple,
2156
0
                     &current, &parent,
2157
0
                     out.result.matchNode.nodeN);
2158
          /* Adjust level as per opclass request */
2159
0
          level += out.result.matchNode.levelAdd;
2160
          /* Replace leafDatum and recompute leafSize */
2161
0
          if (!isnull)
2162
0
          {
2163
0
            leafDatum = out.result.matchNode.restDatum;
2164
0
            leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
2165
0
              SpGistGetTypeSize(&state->attLeafType, leafDatum);
2166
0
          }
2167
2168
          /*
2169
           * Loop around and attempt to insert the new leafDatum at
2170
           * "current" (which might reference an existing child
2171
           * tuple, or might be invalid to force us to find a new
2172
           * page for the tuple).
2173
           *
2174
           * Note: if the opclass sets longValuesOK, we rely on the
2175
           * choose function to eventually shorten the leafDatum
2176
           * enough to fit on a page.  We could add a test here to
2177
           * complain if the datum doesn't get visibly shorter each
2178
           * time, but that could get in the way of opclasses that
2179
           * "simplify" datums in a way that doesn't necessarily
2180
           * lead to physical shortening on every cycle.
2181
           */
2182
0
          break;
2183
0
        case spgAddNode:
2184
          /* AddNode is not sensible if nodes don't have labels */
2185
0
          if (in.nodeLabels == NULL)
2186
0
            elog(ERROR, "cannot add a node to an inner tuple without node labels");
2187
          /* Add node to inner tuple, per request */
2188
0
          spgAddNodeAction(index, state, innerTuple,
2189
0
                   &current, &parent,
2190
0
                   out.result.addNode.nodeN,
2191
0
                   out.result.addNode.nodeLabel);
2192
2193
          /*
2194
           * Retry insertion into the enlarged node.  We assume that
2195
           * we'll get a MatchNode result this time.
2196
           */
2197
0
          goto process_inner_tuple;
2198
0
          break;
2199
0
        case spgSplitTuple:
2200
          /* Split inner tuple, per request */
2201
0
          spgSplitNodeAction(index, state, innerTuple,
2202
0
                     &current, &out);
2203
2204
          /* Retry insertion into the split node */
2205
0
          goto process_inner_tuple;
2206
0
          break;
2207
0
        default:
2208
0
          elog(ERROR, "unrecognized SPGiST choose result: %d",
2209
0
             (int) out.resultType);
2210
0
          break;
2211
0
      }
2212
0
    }
2213
0
  }             /* end loop */
2214
2215
  /*
2216
   * Release any buffers we're still holding.  Beware of possibility that
2217
   * current and parent reference same buffer.
2218
   */
2219
0
  if (current.buffer != InvalidBuffer)
2220
0
  {
2221
0
    SpGistSetLastUsedPage(index, current.buffer);
2222
0
    UnlockReleaseBuffer(current.buffer);
2223
0
  }
2224
0
  if (parent.buffer != InvalidBuffer &&
2225
0
    parent.buffer != current.buffer)
2226
0
  {
2227
0
    SpGistSetLastUsedPage(index, parent.buffer);
2228
0
    UnlockReleaseBuffer(parent.buffer);
2229
0
  }
2230
2231
0
  return true;
2232
0
}