YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/backend/access/gist/gist.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * gist.c
4
 *    interface routines for the postgres GiST index access method.
5
 *
6
 *
7
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * IDENTIFICATION
11
 *    src/backend/access/gist/gist.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
#include "postgres.h"
16
17
#include "access/gist_private.h"
18
#include "access/gistscan.h"
19
#include "catalog/pg_collation.h"
20
#include "miscadmin.h"
21
#include "storage/lmgr.h"
22
#include "storage/predicate.h"
23
#include "nodes/execnodes.h"
24
#include "utils/builtins.h"
25
#include "utils/index_selfuncs.h"
26
#include "utils/memutils.h"
27
#include "utils/rel.h"
28
29
30
/* non-export function prototypes */
31
static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
32
static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
33
        GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
34
static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
35
         GISTSTATE *giststate,
36
         IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
37
         Buffer leftchild, Buffer rightchild,
38
         bool unlockbuf, bool unlockleftchild);
39
static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
40
        GISTSTATE *giststate, List *splitinfo, bool releasebuf);
41
static void gistvacuumpage(Relation rel, Page page, Buffer buffer,
42
         Relation heapRel);
43
44
45
638
#define ROTATEDIST(d) do { \
46
638
  SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
47
638
  memset(tmp,0,sizeof(SplitedPageLayout)); \
48
638
  tmp->block.blkno = InvalidBlockNumber; \
49
638
  tmp->buffer = InvalidBuffer; \
50
638
  tmp->next = (d); \
51
638
  (d)=tmp; \
52
638
} while(0)
53
54
55
/*
56
 * GiST handler function: return IndexAmRoutine with access method parameters
57
 * and callbacks.
58
 */
59
Datum
60
gisthandler(PG_FUNCTION_ARGS)
61
39
{
62
39
  IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
63
64
39
  amroutine->amstrategies = 0;
65
39
  amroutine->amsupport = GISTNProcs;
66
39
  amroutine->amcanorder = false;
67
39
  amroutine->amcanorderbyop = true;
68
39
  amroutine->amcanbackward = false;
69
39
  amroutine->amcanunique = false;
70
39
  amroutine->amcanmulticol = true;
71
39
  amroutine->amoptionalkey = true;
72
39
  amroutine->amsearcharray = false;
73
39
  amroutine->amsearchnulls = true;
74
39
  amroutine->amstorage = true;
75
39
  amroutine->amclusterable = true;
76
39
  amroutine->ampredlocks = true;
77
39
  amroutine->amcanparallel = false;
78
39
  amroutine->amcaninclude = false;
79
39
  amroutine->amkeytype = InvalidOid;
80
81
39
  amroutine->ambuild = gistbuild;
82
39
  amroutine->ambuildempty = gistbuildempty;
83
39
  amroutine->aminsert = gistinsert;
84
39
  amroutine->ambulkdelete = gistbulkdelete;
85
39
  amroutine->amvacuumcleanup = gistvacuumcleanup;
86
39
  amroutine->amcanreturn = gistcanreturn;
87
39
  amroutine->amcostestimate = gistcostestimate;
88
39
  amroutine->amoptions = gistoptions;
89
39
  amroutine->amproperty = gistproperty;
90
39
  amroutine->amvalidate = gistvalidate;
91
39
  amroutine->ambeginscan = gistbeginscan;
92
39
  amroutine->amrescan = gistrescan;
93
39
  amroutine->amgettuple = gistgettuple;
94
39
  amroutine->amgetbitmap = gistgetbitmap;
95
39
  amroutine->amendscan = gistendscan;
96
39
  amroutine->ammarkpos = NULL;
97
39
  amroutine->amrestrpos = NULL;
98
39
  amroutine->amestimateparallelscan = NULL;
99
39
  amroutine->aminitparallelscan = NULL;
100
39
  amroutine->amparallelrescan = NULL;
101
102
39
  PG_RETURN_POINTER(amroutine);
103
39
}
104
105
/*
106
 * Create and return a temporary memory context for use by GiST. We
107
 * _always_ invoke user-provided methods in a temporary memory
108
 * context, so that memory leaks in those functions cannot cause
109
 * problems. Also, we use some additional temporary contexts in the
110
 * GiST code itself, to avoid the need to do some awkward manual
111
 * memory management.
112
 */
113
MemoryContext
114
createTempGistContext(void)
115
3
{
116
3
  return AllocSetContextCreate(GetCurrentMemoryContext(),
117
3
                 "GiST temporary context",
118
3
                 ALLOCSET_DEFAULT_SIZES);
119
3
}
120
121
/*
122
 *  gistbuildempty() -- build an empty gist index in the initialization fork
123
 */
124
void
125
gistbuildempty(Relation index)
126
0
{
127
0
  Buffer    buffer;
128
129
  /* Initialize the root page */
130
0
  buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
131
0
  LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
132
133
  /* Initialize and xlog buffer */
134
0
  START_CRIT_SECTION();
135
0
  GISTInitBuffer(buffer, F_LEAF);
136
0
  MarkBufferDirty(buffer);
137
0
  log_newpage_buffer(buffer, true);
138
0
  END_CRIT_SECTION();
139
140
  /* Unlock and release the buffer */
141
0
  UnlockReleaseBuffer(buffer);
142
0
}
143
144
/*
145
 *  gistinsert -- wrapper for GiST tuple insertion.
146
 *
147
 *    This is the public interface routine for tuple insertion in GiSTs.
148
 *    It doesn't do any work; just locks the relation and passes the buck.
149
 */
150
bool
151
gistinsert(Relation r, Datum *values, bool *isnull,
152
       ItemPointer ht_ctid, Relation heapRel,
153
       IndexUniqueCheck checkUnique,
154
       IndexInfo *indexInfo)
155
0
{
156
0
  GISTSTATE  *giststate = (GISTSTATE *) indexInfo->ii_AmCache;
157
0
  IndexTuple  itup;
158
0
  MemoryContext oldCxt;
159
160
  /* Initialize GISTSTATE cache if first call in this statement */
161
0
  if (giststate == NULL)
162
0
  {
163
0
    oldCxt = MemoryContextSwitchTo(indexInfo->ii_Context);
164
0
    giststate = initGISTstate(r);
165
0
    giststate->tempCxt = createTempGistContext();
166
0
    indexInfo->ii_AmCache = (void *) giststate;
167
0
    MemoryContextSwitchTo(oldCxt);
168
0
  }
169
170
0
  oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
171
172
0
  itup = gistFormTuple(giststate, r,
173
0
             values, isnull, true /* size is currently bogus */ );
174
0
  itup->t_tid = *ht_ctid;
175
176
0
  gistdoinsert(r, itup, 0, giststate, heapRel);
177
178
  /* cleanup */
179
0
  MemoryContextSwitchTo(oldCxt);
180
0
  MemoryContextReset(giststate->tempCxt);
181
182
0
  return false;
183
0
}
184
185
186
/*
187
 * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
188
 * at that offset is atomically removed along with inserting the new tuples.
189
 * This is used to replace a tuple with a new one.
190
 *
191
 * If 'leftchildbuf' is valid, we're inserting the downlink for the page
192
 * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
193
 * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
194
 *
195
 * If 'markfollowright' is true and the page is split, the left child is
196
 * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
197
 * index build, however, there is no concurrent access and the page splitting
198
 * is done in a slightly simpler fashion, and false is passed.
199
 *
200
 * If there is not enough room on the page, it is split. All the split
201
 * pages are kept pinned and locked and returned in *splitinfo, the caller
202
 * is responsible for inserting the downlinks for them. However, if
203
 * 'buffer' is the root page and it needs to be split, gistplacetopage()
204
 * performs the split as one atomic operation, and *splitinfo is set to NIL.
205
 * In that case, we continue to hold the root page locked, and the child
206
 * pages are released; note that new tuple(s) are *not* on the root page
207
 * but in one of the new child pages.
208
 *
209
 * If 'newblkno' is not NULL, returns the block number of page the first
210
 * new/updated tuple was inserted to. Usually it's the given page, but could
211
 * be its right sibling if the page was split.
212
 *
213
 * Returns 'true' if the page was split, 'false' otherwise.
214
 */
215
bool
216
gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
217
        Buffer buffer,
218
        IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
219
        BlockNumber *newblkno,
220
        Buffer leftchildbuf,
221
        List **splitinfo,
222
        bool markfollowright,
223
        Relation heapRel)
224
20.3k
{
225
20.3k
  BlockNumber blkno = BufferGetBlockNumber(buffer);
226
20.3k
  Page    page = BufferGetPage(buffer);
227
20.3k
  bool    is_leaf = (GistPageIsLeaf(page)) ? true : false;
228
20.3k
  XLogRecPtr  recptr;
229
20.3k
  int     i;
230
20.3k
  bool    is_split;
231
232
  /*
233
   * Refuse to modify a page that's incompletely split. This should not
234
   * happen because we finish any incomplete splits while we walk down the
235
   * tree. However, it's remotely possible that another concurrent inserter
236
   * splits a parent page, and errors out before completing the split. We
237
   * will just throw an error in that case, and leave any split we had in
238
   * progress unfinished too. The next insert that comes along will clean up
239
   * the mess.
240
   */
241
20.3k
  if (GistFollowRight(page))
242
0
    elog(ERROR, "concurrent GiST page split was incomplete");
243
244
20.3k
  *splitinfo = NIL;
245
246
  /*
247
   * if isupdate, remove old key: This node's key has been modified, either
248
   * because a child split occurred or because we needed to adjust our key
249
   * for an insert in a child node. Therefore, remove the old version of
250
   * this node's key.
251
   *
252
   * for WAL replay, in the non-split case we handle this by setting up a
253
   * one-element todelete array; in the split case, it's handled implicitly
254
   * because the tuple vector passed to gistSplit won't include this tuple.
255
   */
256
20.3k
  is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
257
258
  /*
259
   * If leaf page is full, try at first to delete dead tuples. And then
260
   * check again.
261
   */
262
20.3k
  if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
263
0
  {
264
0
    gistvacuumpage(rel, page, buffer, heapRel);
265
0
    is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
266
0
  }
267
268
20.3k
  if (is_split)
269
319
  {
270
    /* no space for insertion */
271
319
    IndexTuple *itvec;
272
319
    int     tlen;
273
319
    SplitedPageLayout *dist = NULL,
274
319
           *ptr;
275
319
    BlockNumber oldrlink = InvalidBlockNumber;
276
319
    GistNSN   oldnsn = 0;
277
319
    SplitedPageLayout rootpg;
278
319
    bool    is_rootsplit;
279
319
    int     npage;
280
281
319
    is_rootsplit = (blkno == GIST_ROOT_BLKNO);
282
283
    /*
284
     * Form index tuples vector to split. If we're replacing an old tuple,
285
     * remove the old version from the vector.
286
     */
287
319
    itvec = gistextractpage(page, &tlen);
288
319
    if (OffsetNumberIsValid(oldoffnum))
289
2
    {
290
      /* on inner page we should remove old tuple */
291
2
      int     pos = oldoffnum - FirstOffsetNumber;
292
293
2
      tlen--;
294
2
      if (pos != tlen)
295
2
        memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
296
2
    }
297
319
    itvec = gistjoinvector(itvec, &tlen, itup, ntup);
298
319
    dist = gistSplit(rel, page, itvec, tlen, giststate);
299
300
    /*
301
     * Check that split didn't produce too many pages.
302
     */
303
319
    npage = 0;
304
957
    for (ptr = dist; ptr; ptr = ptr->next)
305
638
      npage++;
306
    /* in a root split, we'll add one more page to the list below */
307
319
    if (is_rootsplit)
308
2
      npage++;
309
319
    if (npage > GIST_MAX_SPLIT_PAGES)
310
0
      elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
311
319
         npage, GIST_MAX_SPLIT_PAGES);
312
313
    /*
314
     * Set up pages to work with. Allocate new buffers for all but the
315
     * leftmost page. The original page becomes the new leftmost page, and
316
     * is just replaced with the new contents.
317
     *
318
     * For a root-split, allocate new buffers for all child pages, the
319
     * original page is overwritten with new root page containing
320
     * downlinks to the new child pages.
321
     */
322
319
    ptr = dist;
323
319
    if (!is_rootsplit)
324
317
    {
325
      /* save old rightlink and NSN */
326
317
      oldrlink = GistPageGetOpaque(page)->rightlink;
327
317
      oldnsn = GistPageGetNSN(page);
328
329
317
      dist->buffer = buffer;
330
317
      dist->block.blkno = BufferGetBlockNumber(buffer);
331
317
      dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
332
333
      /* clean all flags except F_LEAF */
334
317
      GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
335
336
317
      ptr = ptr->next;
337
317
    }
338
640
    for (; ptr; ptr = ptr->next)
339
321
    {
340
      /* Allocate new page */
341
321
      ptr->buffer = gistNewBuffer(rel);
342
318
      GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
343
321
      ptr->page = BufferGetPage(ptr->buffer);
344
321
      ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
345
321
      PredicateLockPageSplit(rel,
346
321
                   BufferGetBlockNumber(buffer),
347
321
                   BufferGetBlockNumber(ptr->buffer));
348
321
    }
349
350
    /*
351
     * Now that we know which blocks the new pages go to, set up downlink
352
     * tuples to point to them.
353
     */
354
957
    for (ptr = dist; ptr; ptr = ptr->next)
355
638
    {
356
638
      ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
357
638
      GistTupleSetValid(ptr->itup);
358
638
    }
359
360
    /*
361
     * If this is a root split, we construct the new root page with the
362
     * downlinks here directly, instead of requiring the caller to insert
363
     * them. Add the new root page to the list along with the child pages.
364
     */
365
319
    if (is_rootsplit)
366
2
    {
367
2
      IndexTuple *downlinks;
368
2
      int     ndownlinks = 0;
369
2
      int     i;
370
371
2
      rootpg.buffer = buffer;
372
2
      rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
373
2
      GistPageGetOpaque(rootpg.page)->flags = 0;
374
375
      /* Prepare a vector of all the downlinks */
376
6
      for (ptr = dist; ptr; ptr = ptr->next)
377
4
        ndownlinks++;
378
2
      downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
379
6
      for (i = 0, ptr = dist; ptr; ptr = ptr->next)
380
4
        downlinks[i++] = ptr->itup;
381
382
2
      rootpg.block.blkno = GIST_ROOT_BLKNO;
383
2
      rootpg.block.num = ndownlinks;
384
2
      rootpg.list = gistfillitupvec(downlinks, ndownlinks,
385
2
                      &(rootpg.lenlist));
386
2
      rootpg.itup = NULL;
387
388
2
      rootpg.next = dist;
389
2
      dist = &rootpg;
390
2
    }
391
317
    else
392
317
    {
393
      /* Prepare split-info to be returned to caller */
394
951
      for (ptr = dist; ptr; ptr = ptr->next)
395
634
      {
396
634
        GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
397
398
634
        si->buf = ptr->buffer;
399
634
        si->downlink = ptr->itup;
400
634
        *splitinfo = lappend(*splitinfo, si);
401
634
      }
402
317
    }
403
404
    /*
405
     * Fill all pages. All the pages are new, ie. freshly allocated empty
406
     * pages, or a temporary copy of the old page.
407
     */
408
959
    for (ptr = dist; ptr; ptr = ptr->next)
409
640
    {
410
640
      char     *data = (char *) (ptr->list);
411
412
39.9k
      for (i = 0; i < ptr->block.num; i++)
413
39.2k
      {
414
39.2k
        IndexTuple  thistup = (IndexTuple) data;
415
416
39.2k
        if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
417
0
          elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
418
419
        /*
420
         * If this is the first inserted/updated tuple, let the caller
421
         * know which page it landed on.
422
         */
423
39.2k
        if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
424
0
          *newblkno = ptr->block.blkno;
425
426
39.2k
        data += IndexTupleSize(thistup);
427
39.2k
      }
428
429
      /* Set up rightlinks */
430
640
      if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
431
319
        GistPageGetOpaque(ptr->page)->rightlink =
432
319
          ptr->next->block.blkno;
433
640
      else
434
321
        GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
435
436
      /*
437
       * Mark the all but the right-most page with the follow-right
438
       * flag. It will be cleared as soon as the downlink is inserted
439
       * into the parent, but this ensures that if we error out before
440
       * that, the index is still consistent. (in buffering build mode,
441
       * any error will abort the index build anyway, so this is not
442
       * needed.)
443
       */
444
640
      if (ptr->next && !is_rootsplit && markfollowright)
445
317
        GistMarkFollowRight(ptr->page);
446
640
      else
447
323
        GistClearFollowRight(ptr->page);
448
449
      /*
450
       * Copy the NSN of the original page to all pages. The
451
       * F_FOLLOW_RIGHT flags ensure that scans will follow the
452
       * rightlinks until the downlinks are inserted.
453
       */
454
640
      GistPageSetNSN(ptr->page, oldnsn);
455
640
    }
456
457
    /*
458
     * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
459
     * insertion for that. NB: The number of pages and data segments
460
     * specified here must match the calculations in gistXLogSplit()!
461
     */
462
319
    if (RelationNeedsWAL(rel))
463
0
      XLogEnsureRecordSpace(npage, 1 + npage * 2);
464
465
319
    START_CRIT_SECTION();
466
467
    /*
468
     * Must mark buffers dirty before XLogInsert, even though we'll still
469
     * be changing their opaque fields below.
470
     */
471
959
    for (ptr = dist; ptr; ptr = ptr->next)
472
640
      MarkBufferDirty(ptr->buffer);
473
319
    if (BufferIsValid(leftchildbuf))
474
2
      MarkBufferDirty(leftchildbuf);
475
476
    /*
477
     * The first page in the chain was a temporary working copy meant to
478
     * replace the old page. Copy it over the old page.
479
     */
480
319
    PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
481
319
    dist->page = BufferGetPage(dist->buffer);
482
483
    /* Write the WAL record */
484
319
    if (RelationNeedsWAL(rel))
485
0
      recptr = gistXLogSplit(is_leaf,
486
0
                   dist, oldrlink, oldnsn, leftchildbuf,
487
0
                   markfollowright);
488
319
    else
489
319
      recptr = gistGetFakeLSN(rel);
490
491
959
    for (ptr = dist; ptr; ptr = ptr->next)
492
640
    {
493
640
      PageSetLSN(ptr->page, recptr);
494
640
    }
495
496
    /*
497
     * Return the new child buffers to the caller.
498
     *
499
     * If this was a root split, we've already inserted the downlink
500
     * pointers, in the form of a new root page. Therefore we can release
501
     * all the new buffers, and keep just the root page locked.
502
     */
503
319
    if (is_rootsplit)
504
2
    {
505
6
      for (ptr = dist->next; ptr; ptr = ptr->next)
506
4
        UnlockReleaseBuffer(ptr->buffer);
507
2
    }
508
319
  }
509
20.0k
  else
510
20.0k
  {
511
    /*
512
     * Enough space.  We always get here if ntup==0.
513
     */
514
20.0k
    START_CRIT_SECTION();
515
516
    /*
517
     * Delete old tuple if any, then insert new tuple(s) if any.  If
518
     * possible, use the fast path of PageIndexTupleOverwrite.
519
     */
520
20.0k
    if (OffsetNumberIsValid(oldoffnum))
521
317
    {
522
317
      if (ntup == 1)
523
2
      {
524
        /* One-for-one replacement, so use PageIndexTupleOverwrite */
525
2
        if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
526
2
                       IndexTupleSize(*itup)))
527
0
          elog(ERROR, "failed to add item to index page in \"%s\"",
528
2
             RelationGetRelationName(rel));
529
2
      }
530
315
      else
531
315
      {
532
        /* Delete old, then append new tuple(s) to page */
533
315
        PageIndexTupleDelete(page, oldoffnum);
534
315
        gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
535
315
      }
536
317
    }
537
19.6k
    else
538
19.6k
    {
539
      /* Just append new tuples at the end of the page */
540
19.6k
      gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
541
19.6k
    }
542
543
20.0k
    MarkBufferDirty(buffer);
544
545
20.0k
    if (BufferIsValid(leftchildbuf))
546
315
      MarkBufferDirty(leftchildbuf);
547
548
20.0k
    if (RelationNeedsWAL(rel))
549
0
    {
550
0
      OffsetNumber ndeloffs = 0,
551
0
            deloffs[1];
552
553
0
      if (OffsetNumberIsValid(oldoffnum))
554
0
      {
555
0
        deloffs[0] = oldoffnum;
556
0
        ndeloffs = 1;
557
0
      }
558
559
0
      recptr = gistXLogUpdate(buffer,
560
0
                  deloffs, ndeloffs, itup, ntup,
561
0
                  leftchildbuf, NULL);
562
563
0
      PageSetLSN(page, recptr);
564
0
    }
565
20.0k
    else
566
20.0k
    {
567
20.0k
      recptr = gistGetFakeLSN(rel);
568
20.0k
      PageSetLSN(page, recptr);
569
20.0k
    }
570
571
20.0k
    if (newblkno)
572
0
      *newblkno = blkno;
573
20.0k
  }
574
575
  /*
576
   * If we inserted the downlink for a child page, set NSN and clear
577
   * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
578
   * follow the rightlink if and only if they looked at the parent page
579
   * before we inserted the downlink.
580
   *
581
   * Note that we do this *after* writing the WAL record. That means that
582
   * the possible full page image in the WAL record does not include these
583
   * changes, and they must be replayed even if the page is restored from
584
   * the full page image. There's a chicken-and-egg problem: if we updated
585
   * the child pages first, we wouldn't know the recptr of the WAL record
586
   * we're about to write.
587
   */
588
20.3k
  if (BufferIsValid(leftchildbuf))
589
317
  {
590
317
    Page    leftpg = BufferGetPage(leftchildbuf);
591
592
317
    GistPageSetNSN(leftpg, recptr);
593
317
    GistClearFollowRight(leftpg);
594
595
317
    PageSetLSN(leftpg, recptr);
596
317
  }
597
598
20.3k
  END_CRIT_SECTION();
599
600
20.3k
  return is_split;
601
20.3k
}
602
603
/*
604
 * Workhouse routine for doing insertion into a GiST index. Note that
605
 * this routine assumes it is invoked in a short-lived memory context,
606
 * so it does not bother releasing palloc'd allocations.
607
 */
608
void
609
gistdoinsert(Relation r, IndexTuple itup, Size freespace,
610
       GISTSTATE *giststate, Relation heapRel)
611
20.0k
{
612
20.0k
  ItemId    iid;
613
20.0k
  IndexTuple  idxtuple;
614
20.0k
  GISTInsertStack firststack;
615
20.0k
  GISTInsertStack *stack;
616
20.0k
  GISTInsertState state;
617
20.0k
  bool    xlocked = false;
618
619
20.0k
  memset(&state, 0, sizeof(GISTInsertState));
620
20.0k
  state.freespace = freespace;
621
20.0k
  state.r = r;
622
20.0k
  state.heapRel = heapRel;
623
624
  /* Start from the root */
625
20.0k
  firststack.blkno = GIST_ROOT_BLKNO;
626
20.0k
  firststack.lsn = 0;
627
20.0k
  firststack.parent = NULL;
628
20.0k
  firststack.downlinkoffnum = InvalidOffsetNumber;
629
20.0k
  state.stack = stack = &firststack;
630
631
  /*
632
   * Walk down along the path of smallest penalty, updating the parent
633
   * pointers with the key we're inserting as we go. If we crash in the
634
   * middle, the tree is consistent, although the possible parent updates
635
   * were a waste.
636
   */
637
20.0k
  for (;;)
638
51.1k
  {
639
51.1k
    if (XLogRecPtrIsInvalid(stack->lsn))
640
51.1k
      stack->buffer = ReadBuffer(state.r, stack->blkno);
641
642
    /*
643
     * Be optimistic and grab shared lock first. Swap it for an exclusive
644
     * lock later if we need to update the page.
645
     */
646
51.1k
    if (!xlocked)
647
51.1k
    {
648
51.1k
      LockBuffer(stack->buffer, GIST_SHARE);
649
51.1k
      gistcheckpage(state.r, stack->buffer);
650
51.1k
    }
651
652
51.1k
    stack->page = (Page) BufferGetPage(stack->buffer);
653
51.1k
    stack->lsn = xlocked ?
654
51.1k
      PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
655
51.1k
    Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
656
657
    /*
658
     * If this page was split but the downlink was never inserted to the
659
     * parent because the inserting backend crashed before doing that, fix
660
     * that now.
661
     */
662
51.1k
    if (GistFollowRight(stack->page))
663
0
    {
664
0
      if (!xlocked)
665
0
      {
666
0
        LockBuffer(stack->buffer, GIST_UNLOCK);
667
0
        LockBuffer(stack->buffer, GIST_EXCLUSIVE);
668
0
        xlocked = true;
669
        /* someone might've completed the split when we unlocked */
670
0
        if (!GistFollowRight(stack->page))
671
0
          continue;
672
0
      }
673
0
      gistfixsplit(&state, giststate);
674
675
0
      UnlockReleaseBuffer(stack->buffer);
676
0
      xlocked = false;
677
0
      state.stack = stack = stack->parent;
678
0
      continue;
679
0
    }
680
681
51.1k
    if (stack->blkno != GIST_ROOT_BLKNO &&
682
31.1k
      stack->parent->lsn < GistPageGetNSN(stack->page))
683
0
    {
684
      /*
685
       * Concurrent split detected. There's no guarantee that the
686
       * downlink for this page is consistent with the tuple we're
687
       * inserting anymore, so go back to parent and rechoose the best
688
       * child.
689
       */
690
0
      UnlockReleaseBuffer(stack->buffer);
691
0
      xlocked = false;
692
0
      state.stack = stack = stack->parent;
693
0
      continue;
694
0
    }
695
696
51.1k
    if (!GistPageIsLeaf(stack->page))
697
31.1k
    {
698
      /*
699
       * This is an internal page so continue to walk down the tree.
700
       * Find the child node that has the minimum insertion penalty.
701
       */
702
31.1k
      BlockNumber childblkno;
703
31.1k
      IndexTuple  newtup;
704
31.1k
      GISTInsertStack *item;
705
31.1k
      OffsetNumber downlinkoffnum;
706
707
31.1k
      downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
708
31.1k
      iid = PageGetItemId(stack->page, downlinkoffnum);
709
31.1k
      idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
710
31.1k
      childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
711
712
      /*
713
       * Check that it's not a leftover invalid tuple from pre-9.1
714
       */
715
31.1k
      if (GistTupleIsInvalid(idxtuple))
716
31.1k
        ereport(ERROR,
717
31.1k
            (errmsg("index \"%s\" contains an inner tuple marked as invalid",
718
31.1k
                RelationGetRelationName(r)),
719
31.1k
             errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
720
31.1k
             errhint("Please REINDEX it.")));
721
722
      /*
723
       * Check that the key representing the target child node is
724
       * consistent with the key we're inserting. Update it if it's not.
725
       */
726
31.1k
      newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
727
31.1k
      if (newtup)
728
2
      {
729
        /*
730
         * Swap shared lock for an exclusive one. Beware, the page may
731
         * change while we unlock/lock the page...
732
         */
733
2
        if (!xlocked)
734
2
        {
735
2
          LockBuffer(stack->buffer, GIST_UNLOCK);
736
2
          LockBuffer(stack->buffer, GIST_EXCLUSIVE);
737
2
          xlocked = true;
738
2
          stack->page = (Page) BufferGetPage(stack->buffer);
739
740
2
          if (PageGetLSN(stack->page) != stack->lsn)
741
0
          {
742
            /* the page was changed while we unlocked it, retry */
743
0
            continue;
744
0
          }
745
2
        }
746
747
        /*
748
         * Update the tuple.
749
         *
750
         * We still hold the lock after gistinserttuple(), but it
751
         * might have to split the page to make the updated tuple fit.
752
         * In that case the updated tuple might migrate to the other
753
         * half of the split, so we have to go back to the parent and
754
         * descend back to the half that's a better fit for the new
755
         * tuple.
756
         */
757
2
        if (gistinserttuple(&state, stack, giststate, newtup,
758
2
                  downlinkoffnum))
759
0
        {
760
          /*
761
           * If this was a root split, the root page continues to be
762
           * the parent and the updated tuple went to one of the
763
           * child pages, so we just need to retry from the root
764
           * page.
765
           */
766
0
          if (stack->blkno != GIST_ROOT_BLKNO)
767
0
          {
768
0
            UnlockReleaseBuffer(stack->buffer);
769
0
            xlocked = false;
770
0
            state.stack = stack = stack->parent;
771
0
          }
772
0
          continue;
773
0
        }
774
31.1k
      }
775
31.1k
      LockBuffer(stack->buffer, GIST_UNLOCK);
776
31.1k
      xlocked = false;
777
778
      /* descend to the chosen child */
779
31.1k
      item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
780
31.1k
      item->blkno = childblkno;
781
31.1k
      item->parent = stack;
782
31.1k
      item->downlinkoffnum = downlinkoffnum;
783
31.1k
      state.stack = stack = item;
784
31.1k
    }
785
20.0k
    else
786
20.0k
    {
787
      /*
788
       * Leaf page. Insert the new key. We've already updated all the
789
       * parents on the way down, but we might have to split the page if
790
       * it doesn't fit. gistinserthere() will take care of that.
791
       */
792
793
      /*
794
       * Swap shared lock for an exclusive one. Be careful, the page may
795
       * change while we unlock/lock the page...
796
       */
797
20.0k
      if (!xlocked)
798
20.0k
      {
799
20.0k
        LockBuffer(stack->buffer, GIST_UNLOCK);
800
20.0k
        LockBuffer(stack->buffer, GIST_EXCLUSIVE);
801
20.0k
        xlocked = true;
802
20.0k
        stack->page = (Page) BufferGetPage(stack->buffer);
803
20.0k
        stack->lsn = PageGetLSN(stack->page);
804
805
20.0k
        if (stack->blkno == GIST_ROOT_BLKNO)
806
123
        {
807
          /*
808
           * the only page that can become inner instead of leaf is
809
           * the root page, so for root we should recheck it
810
           */
811
123
          if (!GistPageIsLeaf(stack->page))
812
0
          {
813
            /*
814
             * very rare situation: during unlock/lock index with
815
             * number of pages = 1 was increased
816
             */
817
0
            LockBuffer(stack->buffer, GIST_UNLOCK);
818
0
            xlocked = false;
819
0
            continue;
820
0
          }
821
822
          /*
823
           * we don't need to check root split, because checking
824
           * leaf/inner is enough to recognize split for root
825
           */
826
19.8k
        }
827
19.8k
        else if (GistFollowRight(stack->page) ||
828
19.8k
             stack->parent->lsn < GistPageGetNSN(stack->page))
829
0
        {
830
          /*
831
           * The page was split while we momentarily unlocked the
832
           * page. Go back to parent.
833
           */
834
0
          UnlockReleaseBuffer(stack->buffer);
835
0
          xlocked = false;
836
0
          state.stack = stack = stack->parent;
837
0
          continue;
838
0
        }
839
20.0k
      }
840
841
      /* now state.stack->(page, buffer and blkno) points to leaf page */
842
843
20.0k
      gistinserttuple(&state, stack, giststate, itup,
844
20.0k
              InvalidOffsetNumber);
845
20.0k
      LockBuffer(stack->buffer, GIST_UNLOCK);
846
847
      /* Release any pins we might still hold before exiting */
848
71.1k
      for (; stack; stack = stack->parent)
849
51.1k
        ReleaseBuffer(stack->buffer);
850
20.0k
      break;
851
20.0k
    }
852
51.1k
  }
853
20.0k
}
854
855
/*
856
 * Traverse the tree to find path from root page to specified "child" block.
857
 *
858
 * returns a new insertion stack, starting from the parent of "child", up
859
 * to the root. *downlinkoffnum is set to the offset of the downlink in the
860
 * direct parent of child.
861
 *
862
 * To prevent deadlocks, this should lock only one page at a time.
863
 */
864
static GISTInsertStack *
865
gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
866
0
{
867
0
  Page    page;
868
0
  Buffer    buffer;
869
0
  OffsetNumber i,
870
0
        maxoff;
871
0
  ItemId    iid;
872
0
  IndexTuple  idxtuple;
873
0
  List     *fifo;
874
0
  GISTInsertStack *top,
875
0
         *ptr;
876
0
  BlockNumber blkno;
877
878
0
  top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
879
0
  top->blkno = GIST_ROOT_BLKNO;
880
0
  top->downlinkoffnum = InvalidOffsetNumber;
881
882
0
  fifo = list_make1(top);
883
0
  while (fifo != NIL)
884
0
  {
885
    /* Get next page to visit */
886
0
    top = linitial(fifo);
887
0
    fifo = list_delete_first(fifo);
888
889
0
    buffer = ReadBuffer(r, top->blkno);
890
0
    LockBuffer(buffer, GIST_SHARE);
891
0
    gistcheckpage(r, buffer);
892
0
    page = (Page) BufferGetPage(buffer);
893
894
0
    if (GistPageIsLeaf(page))
895
0
    {
896
      /*
897
       * Because we scan the index top-down, all the rest of the pages
898
       * in the queue must be leaf pages as well.
899
       */
900
0
      UnlockReleaseBuffer(buffer);
901
0
      break;
902
0
    }
903
904
0
    top->lsn = BufferGetLSNAtomic(buffer);
905
906
    /*
907
     * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
908
     * downlink. This should not normally happen..
909
     */
910
0
    if (GistFollowRight(page))
911
0
      elog(ERROR, "concurrent GiST page split was incomplete");
912
913
0
    if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
914
0
      GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
915
0
    {
916
      /*
917
       * Page was split while we looked elsewhere. We didn't see the
918
       * downlink to the right page when we scanned the parent, so add
919
       * it to the queue now.
920
       *
921
       * Put the right page ahead of the queue, so that we visit it
922
       * next. That's important, because if this is the lowest internal
923
       * level, just above leaves, we might already have queued up some
924
       * leaf pages, and we assume that there can't be any non-leaf
925
       * pages behind leaf pages.
926
       */
927
0
      ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
928
0
      ptr->blkno = GistPageGetOpaque(page)->rightlink;
929
0
      ptr->downlinkoffnum = InvalidOffsetNumber;
930
0
      ptr->parent = top->parent;
931
932
0
      fifo = lcons(ptr, fifo);
933
0
    }
934
935
0
    maxoff = PageGetMaxOffsetNumber(page);
936
937
0
    for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
938
0
    {
939
0
      iid = PageGetItemId(page, i);
940
0
      idxtuple = (IndexTuple) PageGetItem(page, iid);
941
0
      blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
942
0
      if (blkno == child)
943
0
      {
944
        /* Found it! */
945
0
        UnlockReleaseBuffer(buffer);
946
0
        *downlinkoffnum = i;
947
0
        return top;
948
0
      }
949
0
      else
950
0
      {
951
        /* Append this child to the list of pages to visit later */
952
0
        ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
953
0
        ptr->blkno = blkno;
954
0
        ptr->downlinkoffnum = i;
955
0
        ptr->parent = top;
956
957
0
        fifo = lappend(fifo, ptr);
958
0
      }
959
0
    }
960
961
0
    UnlockReleaseBuffer(buffer);
962
0
  }
963
964
0
  elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
965
0
     RelationGetRelationName(r), child);
966
0
  return NULL;       /* keep compiler quiet */
967
0
}
968
969
/*
970
 * Updates the stack so that child->parent is the correct parent of the
971
 * child. child->parent must be exclusively locked on entry, and will
972
 * remain so at exit, but it might not be the same page anymore.
973
 */
974
static void
975
gistFindCorrectParent(Relation r, GISTInsertStack *child)
976
317
{
977
317
  GISTInsertStack *parent = child->parent;
978
979
317
  gistcheckpage(r, parent->buffer);
980
317
  parent->page = (Page) BufferGetPage(parent->buffer);
981
982
  /* here we don't need to distinguish between split and page update */
983
317
  if (child->downlinkoffnum == InvalidOffsetNumber ||
984
317
    parent->lsn != PageGetLSN(parent->page))
985
0
  {
986
    /* parent is changed, look child in right links until found */
987
0
    OffsetNumber i,
988
0
          maxoff;
989
0
    ItemId    iid;
990
0
    IndexTuple  idxtuple;
991
0
    GISTInsertStack *ptr;
992
993
0
    while (true)
994
0
    {
995
0
      maxoff = PageGetMaxOffsetNumber(parent->page);
996
0
      for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
997
0
      {
998
0
        iid = PageGetItemId(parent->page, i);
999
0
        idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
1000
0
        if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
1001
0
        {
1002
          /* yes!!, found */
1003
0
          child->downlinkoffnum = i;
1004
0
          return;
1005
0
        }
1006
0
      }
1007
1008
0
      parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
1009
0
      UnlockReleaseBuffer(parent->buffer);
1010
0
      if (parent->blkno == InvalidBlockNumber)
1011
0
      {
1012
        /*
1013
         * End of chain and still didn't find parent. It's a very-very
1014
         * rare situation when root splited.
1015
         */
1016
0
        break;
1017
0
      }
1018
0
      parent->buffer = ReadBuffer(r, parent->blkno);
1019
0
      LockBuffer(parent->buffer, GIST_EXCLUSIVE);
1020
0
      gistcheckpage(r, parent->buffer);
1021
0
      parent->page = (Page) BufferGetPage(parent->buffer);
1022
0
    }
1023
1024
    /*
1025
     * awful!!, we need search tree to find parent ... , but before we
1026
     * should release all old parent
1027
     */
1028
1029
0
    ptr = child->parent->parent;  /* child->parent already released
1030
                     * above */
1031
0
    while (ptr)
1032
0
    {
1033
0
      ReleaseBuffer(ptr->buffer);
1034
0
      ptr = ptr->parent;
1035
0
    }
1036
1037
    /* ok, find new path */
1038
0
    ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
1039
1040
    /* read all buffers as expected by caller */
1041
    /* note we don't lock them or gistcheckpage them here! */
1042
0
    while (ptr)
1043
0
    {
1044
0
      ptr->buffer = ReadBuffer(r, ptr->blkno);
1045
0
      ptr->page = (Page) BufferGetPage(ptr->buffer);
1046
0
      ptr = ptr->parent;
1047
0
    }
1048
1049
    /* install new chain of parents to stack */
1050
0
    child->parent = parent;
1051
1052
    /* make recursive call to normal processing */
1053
0
    LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
1054
0
    gistFindCorrectParent(r, child);
1055
0
  }
1056
1057
317
  return;
1058
317
}
1059
1060
/*
1061
 * Form a downlink pointer for the page in 'buf'.
1062
 */
1063
static IndexTuple
1064
gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
1065
         GISTInsertStack *stack)
1066
0
{
1067
0
  Page    page = BufferGetPage(buf);
1068
0
  OffsetNumber maxoff;
1069
0
  OffsetNumber offset;
1070
0
  IndexTuple  downlink = NULL;
1071
1072
0
  maxoff = PageGetMaxOffsetNumber(page);
1073
0
  for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
1074
0
  {
1075
0
    IndexTuple  ituple = (IndexTuple)
1076
0
    PageGetItem(page, PageGetItemId(page, offset));
1077
1078
0
    if (downlink == NULL)
1079
0
      downlink = CopyIndexTuple(ituple);
1080
0
    else
1081
0
    {
1082
0
      IndexTuple  newdownlink;
1083
1084
0
      newdownlink = gistgetadjusted(rel, downlink, ituple,
1085
0
                      giststate);
1086
0
      if (newdownlink)
1087
0
        downlink = newdownlink;
1088
0
    }
1089
0
  }
1090
1091
  /*
1092
   * If the page is completely empty, we can't form a meaningful downlink
1093
   * for it. But we have to insert a downlink for the page. Any key will do,
1094
   * as long as its consistent with the downlink of parent page, so that we
1095
   * can legally insert it to the parent. A minimal one that matches as few
1096
   * scans as possible would be best, to keep scans from doing useless work,
1097
   * but we don't know how to construct that. So we just use the downlink of
1098
   * the original page that was split - that's as far from optimal as it can
1099
   * get but will do..
1100
   */
1101
0
  if (!downlink)
1102
0
  {
1103
0
    ItemId    iid;
1104
1105
0
    LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1106
0
    gistFindCorrectParent(rel, stack);
1107
0
    iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
1108
0
    downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
1109
0
    downlink = CopyIndexTuple(downlink);
1110
0
    LockBuffer(stack->parent->buffer, GIST_UNLOCK);
1111
0
  }
1112
1113
0
  ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
1114
0
  GistTupleSetValid(downlink);
1115
1116
0
  return downlink;
1117
0
}
1118
1119
1120
/*
1121
 * Complete the incomplete split of state->stack->page.
1122
 */
1123
static void
1124
gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1125
0
{
1126
0
  GISTInsertStack *stack = state->stack;
1127
0
  Buffer    buf;
1128
0
  Page    page;
1129
0
  List     *splitinfo = NIL;
1130
1131
0
  elog(LOG, "fixing incomplete split in index \"%s\", block %u",
1132
0
     RelationGetRelationName(state->r), stack->blkno);
1133
1134
0
  Assert(GistFollowRight(stack->page));
1135
0
  Assert(OffsetNumberIsValid(stack->downlinkoffnum));
1136
1137
0
  buf = stack->buffer;
1138
1139
  /*
1140
   * Read the chain of split pages, following the rightlinks. Construct a
1141
   * downlink tuple for each page.
1142
   */
1143
0
  for (;;)
1144
0
  {
1145
0
    GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
1146
0
    IndexTuple  downlink;
1147
1148
0
    page = BufferGetPage(buf);
1149
1150
    /* Form the new downlink tuples to insert to parent */
1151
0
    downlink = gistformdownlink(state->r, buf, giststate, stack);
1152
1153
0
    si->buf = buf;
1154
0
    si->downlink = downlink;
1155
1156
0
    splitinfo = lappend(splitinfo, si);
1157
1158
0
    if (GistFollowRight(page))
1159
0
    {
1160
      /* lock next page */
1161
0
      buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
1162
0
      LockBuffer(buf, GIST_EXCLUSIVE);
1163
0
    }
1164
0
    else
1165
0
      break;
1166
0
  }
1167
1168
  /* Insert the downlinks */
1169
0
  gistfinishsplit(state, stack, giststate, splitinfo, false);
1170
0
}
1171
1172
/*
1173
 * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1174
 * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
1175
 * 'stack' represents the path from the root to the page being updated.
1176
 *
1177
 * The caller must hold an exclusive lock on stack->buffer.  The lock is still
1178
 * held on return, but the page might not contain the inserted tuple if the
1179
 * page was split. The function returns true if the page was split, false
1180
 * otherwise.
1181
 */
1182
static bool
1183
gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
1184
        GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
1185
20.0k
{
1186
20.0k
  return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
1187
20.0k
              InvalidBuffer, InvalidBuffer, false, false);
1188
20.0k
}
1189
1190
/* ----------------
1191
 * An extended workhorse version of gistinserttuple(). This version allows
1192
 * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1193
 * This is used to recursively update the downlinks in the parent when a page
1194
 * is split.
1195
 *
1196
 * If leftchild and rightchild are valid, we're inserting/replacing the
1197
 * downlink for rightchild, and leftchild is its left sibling. We clear the
1198
 * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1199
 * insertion of the downlink.
1200
 *
1201
 * To avoid holding locks for longer than necessary, when recursing up the
1202
 * tree to update the parents, the locking is a bit peculiar here. On entry,
1203
 * the caller must hold an exclusive lock on stack->buffer, as well as
1204
 * leftchild and rightchild if given. On return:
1205
 *
1206
 *  - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1207
 *    always kept pinned, however.
1208
 *  - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1209
 *    is kept pinned.
1210
 *  - Lock and pin on 'rightchild' are always released.
1211
 *
1212
 * Returns 'true' if the page had to be split. Note that if the page was
1213
 * split, the inserted/updated tuples might've been inserted to a right
1214
 * sibling of stack->buffer instead of stack->buffer itself.
1215
 */
1216
static bool
1217
gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
1218
         GISTSTATE *giststate,
1219
         IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
1220
         Buffer leftchild, Buffer rightchild,
1221
         bool unlockbuf, bool unlockleftchild)
1222
20.3k
{
1223
20.3k
  List     *splitinfo;
1224
20.3k
  bool    is_split;
1225
1226
  /*
1227
   * Check for any rw conflicts (in serializable isolation level) just
1228
   * before we intend to modify the page
1229
   */
1230
20.3k
  CheckForSerializableConflictIn(state->r, NULL, stack->buffer);
1231
1232
  /* Insert the tuple(s) to the page, splitting the page if necessary */
1233
20.3k
  is_split = gistplacetopage(state->r, state->freespace, giststate,
1234
20.3k
                 stack->buffer,
1235
20.3k
                 tuples, ntup,
1236
20.3k
                 oldoffnum, NULL,
1237
20.3k
                 leftchild,
1238
20.3k
                 &splitinfo,
1239
20.3k
                 true,
1240
20.3k
                 state->heapRel);
1241
1242
  /*
1243
   * Before recursing up in case the page was split, release locks on the
1244
   * child pages. We don't need to keep them locked when updating the
1245
   * parent.
1246
   */
1247
20.3k
  if (BufferIsValid(rightchild))
1248
317
    UnlockReleaseBuffer(rightchild);
1249
20.3k
  if (BufferIsValid(leftchild) && unlockleftchild)
1250
1
    LockBuffer(leftchild, GIST_UNLOCK);
1251
1252
  /*
1253
   * If we had to split, insert/update the downlinks in the parent. If the
1254
   * caller requested us to release the lock on stack->buffer, tell
1255
   * gistfinishsplit() to do that as soon as it's safe to do so. If we
1256
   * didn't have to split, release it ourselves.
1257
   */
1258
20.3k
  if (splitinfo)
1259
317
    gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
1260
20.0k
  else if (unlockbuf)
1261
316
    LockBuffer(stack->buffer, GIST_UNLOCK);
1262
1263
20.3k
  return is_split;
1264
20.3k
}
1265
1266
/*
1267
 * Finish an incomplete split by inserting/updating the downlinks in parent
1268
 * page. 'splitinfo' contains all the child pages involved in the split,
1269
 * from left-to-right.
1270
 *
1271
 * On entry, the caller must hold a lock on stack->buffer and all the child
1272
 * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1273
 * released on return. The child pages are always unlocked and unpinned.
1274
 */
1275
static void
1276
gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1277
        GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
1278
317
{
1279
317
  ListCell   *lc;
1280
317
  List     *reversed;
1281
317
  GISTPageSplitInfo *right;
1282
317
  GISTPageSplitInfo *left;
1283
317
  IndexTuple  tuples[2];
1284
1285
  /* A split always contains at least two halves */
1286
317
  Assert(list_length(splitinfo) >= 2);
1287
1288
  /*
1289
   * We need to insert downlinks for each new page, and update the downlink
1290
   * for the original (leftmost) page in the split. Begin at the rightmost
1291
   * page, inserting one downlink at a time until there's only two pages
1292
   * left. Finally insert the downlink for the last new page and update the
1293
   * downlink for the original page as one operation.
1294
   */
1295
1296
  /* for convenience, create a copy of the list in reverse order */
1297
317
  reversed = NIL;
1298
317
  foreach(lc, splitinfo)
1299
634
  {
1300
634
    reversed = lcons(lfirst(lc), reversed);
1301
634
  }
1302
1303
317
  LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
1304
317
  gistFindCorrectParent(state->r, stack);
1305
1306
  /*
1307
   * insert downlinks for the siblings from right to left, until there are
1308
   * only two siblings left.
1309
   */
1310
317
  while (list_length(reversed) > 2)
1311
0
  {
1312
0
    right = (GISTPageSplitInfo *) linitial(reversed);
1313
0
    left = (GISTPageSplitInfo *) lsecond(reversed);
1314
1315
0
    if (gistinserttuples(state, stack->parent, giststate,
1316
0
               &right->downlink, 1,
1317
0
               InvalidOffsetNumber,
1318
0
               left->buf, right->buf, false, false))
1319
0
    {
1320
      /*
1321
       * If the parent page was split, need to relocate the original
1322
       * parent pointer.
1323
       */
1324
0
      gistFindCorrectParent(state->r, stack);
1325
0
    }
1326
    /* gistinserttuples() released the lock on right->buf. */
1327
0
    reversed = list_delete_first(reversed);
1328
0
  }
1329
1330
317
  right = (GISTPageSplitInfo *) linitial(reversed);
1331
317
  left = (GISTPageSplitInfo *) lsecond(reversed);
1332
1333
  /*
1334
   * Finally insert downlink for the remaining right page and update the
1335
   * downlink for the original page to not contain the tuples that were
1336
   * moved to the new pages.
1337
   */
1338
317
  tuples[0] = left->downlink;
1339
317
  tuples[1] = right->downlink;
1340
317
  gistinserttuples(state, stack->parent, giststate,
1341
317
           tuples, 2,
1342
317
           stack->downlinkoffnum,
1343
317
           left->buf, right->buf,
1344
317
           true,    /* Unlock parent */
1345
317
           unlockbuf  /* Unlock stack->buffer if caller wants that */
1346
317
    );
1347
317
  Assert(left->buf == stack->buffer);
1348
317
}
1349
1350
/*
1351
 * gistSplit -- split a page in the tree and fill struct
1352
 * used for XLOG and real writes buffers. Function is recursive, ie
1353
 * it will split page until keys will fit in every page.
1354
 */
1355
SplitedPageLayout *
1356
gistSplit(Relation r,
1357
      Page page,
1358
      IndexTuple *itup,   /* contains compressed entry */
1359
      int len,
1360
      GISTSTATE *giststate)
1361
319
{
1362
319
  IndexTuple *lvectup,
1363
319
         *rvectup;
1364
319
  GistSplitVector v;
1365
319
  int     i;
1366
319
  SplitedPageLayout *res = NULL;
1367
1368
  /* this should never recurse very deeply, but better safe than sorry */
1369
319
  check_stack_depth();
1370
1371
  /* there's no point in splitting an empty page */
1372
319
  Assert(len > 0);
1373
1374
  /*
1375
   * If a single tuple doesn't fit on a page, no amount of splitting will
1376
   * help.
1377
   */
1378
319
  if (len == 1)
1379
319
    ereport(ERROR,
1380
319
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1381
319
         errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1382
319
            IndexTupleSize(itup[0]), GiSTPageSize,
1383
319
            RelationGetRelationName(r))));
1384
1385
319
  memset(v.spl_lisnull, true, sizeof(bool) * giststate->tupdesc->natts);
1386
319
  memset(v.spl_risnull, true, sizeof(bool) * giststate->tupdesc->natts);
1387
319
  gistSplitByKey(r, page, itup, len, giststate, &v, 0);
1388
1389
  /* form left and right vector */
1390
319
  lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1391
319
  rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
1392
1393
19.7k
  for (i = 0; i < v.splitVector.spl_nleft; i++)
1394
19.4k
    lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
1395
1396
20.1k
  for (i = 0; i < v.splitVector.spl_nright; i++)
1397
19.7k
    rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
1398
1399
  /* finalize splitting (may need another split) */
1400
319
  if (!gistfitpage(rvectup, v.splitVector.spl_nright))
1401
0
  {
1402
0
    res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
1403
0
  }
1404
319
  else
1405
319
  {
1406
319
    ROTATEDIST(res);
1407
319
    res->block.num = v.splitVector.spl_nright;
1408
319
    res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
1409
319
    res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
1410
319
  }
1411
1412
319
  if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
1413
0
  {
1414
0
    SplitedPageLayout *resptr,
1415
0
           *subres;
1416
1417
0
    resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
1418
1419
    /* install on list's tail */
1420
0
    while (resptr->next)
1421
0
      resptr = resptr->next;
1422
1423
0
    resptr->next = res;
1424
0
    res = subres;
1425
0
  }
1426
319
  else
1427
319
  {
1428
319
    ROTATEDIST(res);
1429
319
    res->block.num = v.splitVector.spl_nleft;
1430
319
    res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
1431
319
    res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
1432
319
  }
1433
1434
319
  return res;
1435
319
}
1436
1437
/*
1438
 * Create a GISTSTATE and fill it with information about the index
1439
 */
1440
GISTSTATE *
1441
initGISTstate(Relation index)
1442
3
{
1443
3
  GISTSTATE  *giststate;
1444
3
  MemoryContext scanCxt;
1445
3
  MemoryContext oldCxt;
1446
3
  int     i;
1447
1448
  /* safety check to protect fixed-size arrays in GISTSTATE */
1449
3
  if (index->rd_att->natts > INDEX_MAX_KEYS)
1450
0
    elog(ERROR, "numberOfAttributes %d > %d",
1451
3
       index->rd_att->natts, INDEX_MAX_KEYS);
1452
1453
  /* Create the memory context that will hold the GISTSTATE */
1454
3
  scanCxt = AllocSetContextCreate(GetCurrentMemoryContext(),
1455
3
                  "GiST scan context",
1456
3
                  ALLOCSET_DEFAULT_SIZES);
1457
3
  oldCxt = MemoryContextSwitchTo(scanCxt);
1458
1459
  /* Create and fill in the GISTSTATE */
1460
3
  giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
1461
1462
3
  giststate->scanCxt = scanCxt;
1463
3
  giststate->tempCxt = scanCxt; /* caller must change this if needed */
1464
3
  giststate->tupdesc = index->rd_att;
1465
1466
6
  for (i = 0; i < index->rd_att->natts; i++)
1467
3
  {
1468
3
    fmgr_info_copy(&(giststate->consistentFn[i]),
1469
3
             index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
1470
3
             scanCxt);
1471
3
    fmgr_info_copy(&(giststate->unionFn[i]),
1472
3
             index_getprocinfo(index, i + 1, GIST_UNION_PROC),
1473
3
             scanCxt);
1474
1475
    /* opclasses are not required to provide a Compress method */
1476
3
    if (OidIsValid(index_getprocid(index, i + 1, GIST_COMPRESS_PROC)))
1477
3
      fmgr_info_copy(&(giststate->compressFn[i]),
1478
3
               index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
1479
3
               scanCxt);
1480
0
    else
1481
0
      giststate->compressFn[i].fn_oid = InvalidOid;
1482
1483
    /* opclasses are not required to provide a Decompress method */
1484
3
    if (OidIsValid(index_getprocid(index, i + 1, GIST_DECOMPRESS_PROC)))
1485
3
      fmgr_info_copy(&(giststate->decompressFn[i]),
1486
3
               index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
1487
3
               scanCxt);
1488
0
    else
1489
0
      giststate->decompressFn[i].fn_oid = InvalidOid;
1490
1491
3
    fmgr_info_copy(&(giststate->penaltyFn[i]),
1492
3
             index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
1493
3
             scanCxt);
1494
3
    fmgr_info_copy(&(giststate->picksplitFn[i]),
1495
3
             index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
1496
3
             scanCxt);
1497
3
    fmgr_info_copy(&(giststate->equalFn[i]),
1498
3
             index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
1499
3
             scanCxt);
1500
1501
    /* opclasses are not required to provide a Distance method */
1502
3
    if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
1503
3
      fmgr_info_copy(&(giststate->distanceFn[i]),
1504
3
               index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
1505
3
               scanCxt);
1506
0
    else
1507
0
      giststate->distanceFn[i].fn_oid = InvalidOid;
1508
1509
    /* opclasses are not required to provide a Fetch method */
1510
3
    if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
1511
0
      fmgr_info_copy(&(giststate->fetchFn[i]),
1512
0
               index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
1513
0
               scanCxt);
1514
3
    else
1515
3
      giststate->fetchFn[i].fn_oid = InvalidOid;
1516
1517
    /*
1518
     * If the index column has a specified collation, we should honor that
1519
     * while doing comparisons.  However, we may have a collatable storage
1520
     * type for a noncollatable indexed data type.  If there's no index
1521
     * collation then specify default collation in case the support
1522
     * functions need collation.  This is harmless if the support
1523
     * functions don't care about collation, so we just do it
1524
     * unconditionally.  (We could alternatively call get_typcollation,
1525
     * but that seems like expensive overkill --- there aren't going to be
1526
     * any cases where a GiST storage type has a nondefault collation.)
1527
     */
1528
3
    if (OidIsValid(index->rd_indcollation[i]))
1529
3
      giststate->supportCollation[i] = index->rd_indcollation[i];
1530
0
    else
1531
0
      giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
1532
3
  }
1533
1534
3
  MemoryContextSwitchTo(oldCxt);
1535
1536
3
  return giststate;
1537
3
}
1538
1539
void
1540
freeGISTstate(GISTSTATE *giststate)
1541
3
{
1542
  /* It's sufficient to delete the scanCxt */
1543
3
  MemoryContextDelete(giststate->scanCxt);
1544
3
}
1545
1546
/*
1547
 * gistvacuumpage() -- try to remove LP_DEAD items from the given page.
1548
 * Function assumes that buffer is exclusively locked.
1549
 */
1550
static void
1551
gistvacuumpage(Relation rel, Page page, Buffer buffer, Relation heapRel)
1552
0
{
1553
0
  OffsetNumber deletable[MaxIndexTuplesPerPage];
1554
0
  int     ndeletable = 0;
1555
0
  OffsetNumber offnum,
1556
0
        maxoff;
1557
1558
0
  Assert(GistPageIsLeaf(page));
1559
1560
  /*
1561
   * Scan over all items to see which ones need to be deleted according to
1562
   * LP_DEAD flags.
1563
   */
1564
0
  maxoff = PageGetMaxOffsetNumber(page);
1565
0
  for (offnum = FirstOffsetNumber;
1566
0
     offnum <= maxoff;
1567
0
     offnum = OffsetNumberNext(offnum))
1568
0
  {
1569
0
    ItemId    itemId = PageGetItemId(page, offnum);
1570
1571
0
    if (ItemIdIsDead(itemId))
1572
0
      deletable[ndeletable++] = offnum;
1573
0
  }
1574
1575
0
  if (ndeletable > 0)
1576
0
  {
1577
0
    START_CRIT_SECTION();
1578
1579
0
    PageIndexMultiDelete(page, deletable, ndeletable);
1580
1581
    /*
1582
     * Mark the page as not containing any LP_DEAD items.  This is not
1583
     * certainly true (there might be some that have recently been marked,
1584
     * but weren't included in our target-item list), but it will almost
1585
     * always be true and it doesn't seem worth an additional page scan to
1586
     * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
1587
     */
1588
0
    GistClearPageHasGarbage(page);
1589
1590
0
    MarkBufferDirty(buffer);
1591
1592
    /* XLOG stuff */
1593
0
    if (RelationNeedsWAL(rel))
1594
0
    {
1595
0
      XLogRecPtr  recptr;
1596
1597
0
      recptr = gistXLogUpdate(buffer,
1598
0
                  deletable, ndeletable,
1599
0
                  NULL, 0, InvalidBuffer,
1600
0
                  &heapRel->rd_node);
1601
1602
0
      PageSetLSN(page, recptr);
1603
0
    }
1604
0
    else
1605
0
      PageSetLSN(page, gistGetFakeLSN(rel));
1606
1607
0
    END_CRIT_SECTION();
1608
0
  }
1609
1610
  /*
1611
   * Note: if we didn't find any LP_DEAD items, then the page's
1612
   * F_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
1613
   * separate write to clear it, however.  We will clear it when we split
1614
   * the page.
1615
   */
1616
0
}