YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/backend/access/spgist/spgutils.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * spgutils.c
4
 *    various support functions for SP-GiST
5
 *
6
 *
7
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * IDENTIFICATION
11
 *      src/backend/access/spgist/spgutils.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
16
#include "postgres.h"
17
18
#include "access/reloptions.h"
19
#include "access/spgist_private.h"
20
#include "access/transam.h"
21
#include "access/xact.h"
22
#include "storage/bufmgr.h"
23
#include "storage/indexfsm.h"
24
#include "storage/lmgr.h"
25
#include "utils/builtins.h"
26
#include "utils/index_selfuncs.h"
27
#include "utils/lsyscache.h"
28
29
30
/*
31
 * SP-GiST handler function: return IndexAmRoutine with access method parameters
32
 * and callbacks.
33
 */
34
Datum
35
spghandler(PG_FUNCTION_ARGS)
36
0
{
37
0
  IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
38
39
0
  amroutine->amstrategies = 0;
40
0
  amroutine->amsupport = SPGISTNProc;
41
0
  amroutine->amcanorder = false;
42
0
  amroutine->amcanorderbyop = false;
43
0
  amroutine->amcanbackward = false;
44
0
  amroutine->amcanunique = false;
45
0
  amroutine->amcanmulticol = false;
46
0
  amroutine->amoptionalkey = true;
47
0
  amroutine->amsearcharray = false;
48
0
  amroutine->amsearchnulls = true;
49
0
  amroutine->amstorage = false;
50
0
  amroutine->amclusterable = false;
51
0
  amroutine->ampredlocks = false;
52
0
  amroutine->amcanparallel = false;
53
0
  amroutine->amcaninclude = false;
54
0
  amroutine->amkeytype = InvalidOid;
55
56
0
  amroutine->ambuild = spgbuild;
57
0
  amroutine->ambuildempty = spgbuildempty;
58
0
  amroutine->aminsert = spginsert;
59
0
  amroutine->ambulkdelete = spgbulkdelete;
60
0
  amroutine->amvacuumcleanup = spgvacuumcleanup;
61
0
  amroutine->amcanreturn = spgcanreturn;
62
0
  amroutine->amcostestimate = spgcostestimate;
63
0
  amroutine->amoptions = spgoptions;
64
0
  amroutine->amproperty = NULL;
65
0
  amroutine->amvalidate = spgvalidate;
66
0
  amroutine->ambeginscan = spgbeginscan;
67
0
  amroutine->amrescan = spgrescan;
68
0
  amroutine->amgettuple = spggettuple;
69
0
  amroutine->amgetbitmap = spggetbitmap;
70
0
  amroutine->amendscan = spgendscan;
71
0
  amroutine->ammarkpos = NULL;
72
0
  amroutine->amrestrpos = NULL;
73
0
  amroutine->amestimateparallelscan = NULL;
74
0
  amroutine->aminitparallelscan = NULL;
75
0
  amroutine->amparallelrescan = NULL;
76
77
0
  PG_RETURN_POINTER(amroutine);
78
0
}
79
80
/* Fill in a SpGistTypeDesc struct with info about the specified data type */
81
static void
82
fillTypeDesc(SpGistTypeDesc *desc, Oid type)
83
0
{
84
0
  desc->type = type;
85
0
  get_typlenbyval(type, &desc->attlen, &desc->attbyval);
86
0
}
87
88
/*
89
 * Fetch local cache of AM-specific info about the index, initializing it
90
 * if necessary
91
 */
92
SpGistCache *
93
spgGetCache(Relation index)
94
0
{
95
0
  SpGistCache *cache;
96
97
0
  if (index->rd_amcache == NULL)
98
0
  {
99
0
    Oid     atttype;
100
0
    spgConfigIn in;
101
0
    FmgrInfo   *procinfo;
102
0
    Buffer    metabuffer;
103
0
    SpGistMetaPageData *metadata;
104
105
0
    cache = MemoryContextAllocZero(index->rd_indexcxt,
106
0
                     sizeof(SpGistCache));
107
108
    /* SPGiST doesn't support multi-column indexes */
109
0
    Assert(index->rd_att->natts == 1);
110
111
    /*
112
     * Get the actual data type of the indexed column from the index
113
     * tupdesc.  We pass this to the opclass config function so that
114
     * polymorphic opclasses are possible.
115
     */
116
0
    atttype = TupleDescAttr(index->rd_att, 0)->atttypid;
117
118
    /* Call the config function to get config info for the opclass */
119
0
    in.attType = atttype;
120
121
0
    procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
122
0
    FunctionCall2Coll(procinfo,
123
0
              index->rd_indcollation[0],
124
0
              PointerGetDatum(&in),
125
0
              PointerGetDatum(&cache->config));
126
127
    /* Get the information we need about each relevant datatype */
128
0
    fillTypeDesc(&cache->attType, atttype);
129
130
0
    if (OidIsValid(cache->config.leafType) &&
131
0
      cache->config.leafType != atttype)
132
0
    {
133
0
      if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
134
0
        ereport(ERROR,
135
0
            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
136
0
             errmsg("compress method must be defined when leaf type is different from input type")));
137
138
0
      fillTypeDesc(&cache->attLeafType, cache->config.leafType);
139
0
    }
140
0
    else
141
0
    {
142
0
      cache->attLeafType = cache->attType;
143
0
    }
144
145
0
    fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
146
0
    fillTypeDesc(&cache->attLabelType, cache->config.labelType);
147
148
    /* Last, get the lastUsedPages data from the metapage */
149
0
    metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
150
0
    LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
151
152
0
    metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
153
154
0
    if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
155
0
      elog(ERROR, "index \"%s\" is not an SP-GiST index",
156
0
         RelationGetRelationName(index));
157
158
0
    cache->lastUsedPages = metadata->lastUsedPages;
159
160
0
    UnlockReleaseBuffer(metabuffer);
161
162
0
    index->rd_amcache = (void *) cache;
163
0
  }
164
0
  else
165
0
  {
166
    /* assume it's up to date */
167
0
    cache = (SpGistCache *) index->rd_amcache;
168
0
  }
169
170
0
  return cache;
171
0
}
172
173
/* Initialize SpGistState for working with the given index */
174
void
175
initSpGistState(SpGistState *state, Relation index)
176
0
{
177
0
  SpGistCache *cache;
178
179
  /* Get cached static information about index */
180
0
  cache = spgGetCache(index);
181
182
0
  state->config = cache->config;
183
0
  state->attType = cache->attType;
184
0
  state->attLeafType = cache->attLeafType;
185
0
  state->attPrefixType = cache->attPrefixType;
186
0
  state->attLabelType = cache->attLabelType;
187
188
  /* Make workspace for constructing dead tuples */
189
0
  state->deadTupleStorage = palloc0(SGDTSIZE);
190
191
  /* Set XID to use in redirection tuples */
192
0
  state->myXid = GetTopTransactionIdIfAny();
193
194
  /* Assume we're not in an index build (spgbuild will override) */
195
0
  state->isBuild = false;
196
0
}
197
198
/*
199
 * Allocate a new page (either by recycling, or by extending the index file).
200
 *
201
 * The returned buffer is already pinned and exclusive-locked.
202
 * Caller is responsible for initializing the page by calling SpGistInitBuffer.
203
 */
204
Buffer
205
SpGistNewBuffer(Relation index)
206
0
{
207
0
  Buffer    buffer;
208
0
  bool    needLock;
209
210
  /* First, try to get a page from FSM */
211
0
  for (;;)
212
0
  {
213
0
    BlockNumber blkno = GetFreeIndexPage(index);
214
215
0
    if (blkno == InvalidBlockNumber)
216
0
      break;       /* nothing known to FSM */
217
218
    /*
219
     * The fixed pages shouldn't ever be listed in FSM, but just in case
220
     * one is, ignore it.
221
     */
222
0
    if (SpGistBlockIsFixed(blkno))
223
0
      continue;
224
225
0
    buffer = ReadBuffer(index, blkno);
226
227
    /*
228
     * We have to guard against the possibility that someone else already
229
     * recycled this page; the buffer may be locked if so.
230
     */
231
0
    if (ConditionalLockBuffer(buffer))
232
0
    {
233
0
      Page    page = BufferGetPage(buffer);
234
235
0
      if (PageIsNew(page))
236
0
        return buffer; /* OK to use, if never initialized */
237
238
0
      if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
239
0
        return buffer; /* OK to use */
240
241
0
      LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
242
0
    }
243
244
    /* Can't use it, so release buffer and try again */
245
0
    ReleaseBuffer(buffer);
246
0
  }
247
248
  /* Must extend the file */
249
0
  needLock = !RELATION_IS_LOCAL(index);
250
0
  if (needLock)
251
0
    LockRelationForExtension(index, ExclusiveLock);
252
253
0
  buffer = ReadBuffer(index, P_NEW);
254
0
  LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
255
256
0
  if (needLock)
257
0
    UnlockRelationForExtension(index, ExclusiveLock);
258
259
0
  return buffer;
260
0
}
261
262
/*
263
 * Update index metapage's lastUsedPages info from local cache, if possible
264
 *
265
 * Updating meta page isn't critical for index working, so
266
 * 1 use ConditionalLockBuffer to improve concurrency
267
 * 2 don't WAL-log metabuffer changes to decrease WAL traffic
268
 */
269
void
270
SpGistUpdateMetaPage(Relation index)
271
0
{
272
0
  SpGistCache *cache = (SpGistCache *) index->rd_amcache;
273
274
0
  if (cache != NULL)
275
0
  {
276
0
    Buffer    metabuffer;
277
278
0
    metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
279
280
0
    if (ConditionalLockBuffer(metabuffer))
281
0
    {
282
0
      Page    metapage = BufferGetPage(metabuffer);
283
0
      SpGistMetaPageData *metadata = SpGistPageGetMeta(metapage);
284
285
0
      metadata->lastUsedPages = cache->lastUsedPages;
286
287
      /*
288
       * Set pd_lower just past the end of the metadata.  This is
289
       * essential, because without doing so, metadata will be lost if
290
       * xlog.c compresses the page.  (We must do this here because
291
       * pre-v11 versions of PG did not set the metapage's pd_lower
292
       * correctly, so a pg_upgraded index might contain the wrong
293
       * value.)
294
       */
295
0
      ((PageHeader) metapage)->pd_lower =
296
0
        ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) metapage;
297
298
0
      MarkBufferDirty(metabuffer);
299
0
      UnlockReleaseBuffer(metabuffer);
300
0
    }
301
0
    else
302
0
    {
303
0
      ReleaseBuffer(metabuffer);
304
0
    }
305
0
  }
306
0
}
307
308
/* Macro to select proper element of lastUsedPages cache depending on flags */
309
/* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
310
0
#define GET_LUP(c, f)  (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
311
312
/*
313
 * Allocate and initialize a new buffer of the type and parity specified by
314
 * flags.  The returned buffer is already pinned and exclusive-locked.
315
 *
316
 * When requesting an inner page, if we get one with the wrong parity,
317
 * we just release the buffer and try again.  We will get a different page
318
 * because GetFreeIndexPage will have marked the page used in FSM.  The page
319
 * is entered in our local lastUsedPages cache, so there's some hope of
320
 * making use of it later in this session, but otherwise we rely on VACUUM
321
 * to eventually re-enter the page in FSM, making it available for recycling.
322
 * Note that such a page does not get marked dirty here, so unless it's used
323
 * fairly soon, the buffer will just get discarded and the page will remain
324
 * as it was on disk.
325
 *
326
 * When we return a buffer to the caller, the page is *not* entered into
327
 * the lastUsedPages cache; we expect the caller will do so after it's taken
328
 * whatever space it will use.  This is because after the caller has used up
329
 * some space, the page might have less space than whatever was cached already
330
 * so we'd rather not trash the old cache entry.
331
 */
332
static Buffer
333
allocNewBuffer(Relation index, int flags)
334
0
{
335
0
  SpGistCache *cache = spgGetCache(index);
336
0
  uint16    pageflags = 0;
337
338
0
  if (GBUF_REQ_LEAF(flags))
339
0
    pageflags |= SPGIST_LEAF;
340
0
  if (GBUF_REQ_NULLS(flags))
341
0
    pageflags |= SPGIST_NULLS;
342
343
0
  for (;;)
344
0
  {
345
0
    Buffer    buffer;
346
347
0
    buffer = SpGistNewBuffer(index);
348
0
    SpGistInitBuffer(buffer, pageflags);
349
350
0
    if (pageflags & SPGIST_LEAF)
351
0
    {
352
      /* Leaf pages have no parity concerns, so just use it */
353
0
      return buffer;
354
0
    }
355
0
    else
356
0
    {
357
0
      BlockNumber blkno = BufferGetBlockNumber(buffer);
358
0
      int     blkFlags = GBUF_INNER_PARITY(blkno);
359
360
0
      if ((flags & GBUF_PARITY_MASK) == blkFlags)
361
0
      {
362
        /* Page has right parity, use it */
363
0
        return buffer;
364
0
      }
365
0
      else
366
0
      {
367
        /* Page has wrong parity, record it in cache and try again */
368
0
        if (pageflags & SPGIST_NULLS)
369
0
          blkFlags |= GBUF_NULLS;
370
0
        cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
371
0
        cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
372
0
          PageGetExactFreeSpace(BufferGetPage(buffer));
373
0
        UnlockReleaseBuffer(buffer);
374
0
      }
375
0
    }
376
0
  }
377
0
}
378
379
/*
380
 * Get a buffer of the type and parity specified by flags, having at least
381
 * as much free space as indicated by needSpace.  We use the lastUsedPages
382
 * cache to assign the same buffer previously requested when possible.
383
 * The returned buffer is already pinned and exclusive-locked.
384
 *
385
 * *isNew is set true if the page was initialized here, false if it was
386
 * already valid.
387
 */
388
Buffer
389
SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
390
0
{
391
0
  SpGistCache *cache = spgGetCache(index);
392
0
  SpGistLastUsedPage *lup;
393
394
  /* Bail out if even an empty page wouldn't meet the demand */
395
0
  if (needSpace > SPGIST_PAGE_CAPACITY)
396
0
    elog(ERROR, "desired SPGiST tuple size is too big");
397
398
  /*
399
   * If possible, increase the space request to include relation's
400
   * fillfactor.  This ensures that when we add unrelated tuples to a page,
401
   * we try to keep 100-fillfactor% available for adding tuples that are
402
   * related to the ones already on it.  But fillfactor mustn't cause an
403
   * error for requests that would otherwise be legal.
404
   */
405
0
  needSpace += RelationGetTargetPageFreeSpace(index,
406
0
                        SPGIST_DEFAULT_FILLFACTOR);
407
0
  needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
408
409
  /* Get the cache entry for this flags setting */
410
0
  lup = GET_LUP(cache, flags);
411
412
  /* If we have nothing cached, just turn it over to allocNewBuffer */
413
0
  if (lup->blkno == InvalidBlockNumber)
414
0
  {
415
0
    *isNew = true;
416
0
    return allocNewBuffer(index, flags);
417
0
  }
418
419
  /* fixed pages should never be in cache */
420
0
  Assert(!SpGistBlockIsFixed(lup->blkno));
421
422
  /* If cached freeSpace isn't enough, don't bother looking at the page */
423
0
  if (lup->freeSpace >= needSpace)
424
0
  {
425
0
    Buffer    buffer;
426
0
    Page    page;
427
428
0
    buffer = ReadBuffer(index, lup->blkno);
429
430
0
    if (!ConditionalLockBuffer(buffer))
431
0
    {
432
      /*
433
       * buffer is locked by another process, so return a new buffer
434
       */
435
0
      ReleaseBuffer(buffer);
436
0
      *isNew = true;
437
0
      return allocNewBuffer(index, flags);
438
0
    }
439
440
0
    page = BufferGetPage(buffer);
441
442
0
    if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
443
0
    {
444
      /* OK to initialize the page */
445
0
      uint16    pageflags = 0;
446
447
0
      if (GBUF_REQ_LEAF(flags))
448
0
        pageflags |= SPGIST_LEAF;
449
0
      if (GBUF_REQ_NULLS(flags))
450
0
        pageflags |= SPGIST_NULLS;
451
0
      SpGistInitBuffer(buffer, pageflags);
452
0
      lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
453
0
      *isNew = true;
454
0
      return buffer;
455
0
    }
456
457
    /*
458
     * Check that page is of right type and has enough space.  We must
459
     * recheck this since our cache isn't necessarily up to date.
460
     */
461
0
    if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
462
0
      (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
463
0
    {
464
0
      int     freeSpace = PageGetExactFreeSpace(page);
465
466
0
      if (freeSpace >= needSpace)
467
0
      {
468
        /* Success, update freespace info and return the buffer */
469
0
        lup->freeSpace = freeSpace - needSpace;
470
0
        *isNew = false;
471
0
        return buffer;
472
0
      }
473
0
    }
474
475
    /*
476
     * fallback to allocation of new buffer
477
     */
478
0
    UnlockReleaseBuffer(buffer);
479
0
  }
480
481
  /* No success with cache, so return a new buffer */
482
0
  *isNew = true;
483
0
  return allocNewBuffer(index, flags);
484
0
}
485
486
/*
487
 * Update lastUsedPages cache when done modifying a page.
488
 *
489
 * We update the appropriate cache entry if it already contained this page
490
 * (its freeSpace is likely obsolete), or if this page has more space than
491
 * whatever we had cached.
492
 */
493
void
494
SpGistSetLastUsedPage(Relation index, Buffer buffer)
495
0
{
496
0
  SpGistCache *cache = spgGetCache(index);
497
0
  SpGistLastUsedPage *lup;
498
0
  int     freeSpace;
499
0
  Page    page = BufferGetPage(buffer);
500
0
  BlockNumber blkno = BufferGetBlockNumber(buffer);
501
0
  int     flags;
502
503
  /* Never enter fixed pages (root pages) in cache, though */
504
0
  if (SpGistBlockIsFixed(blkno))
505
0
    return;
506
507
0
  if (SpGistPageIsLeaf(page))
508
0
    flags = GBUF_LEAF;
509
0
  else
510
0
    flags = GBUF_INNER_PARITY(blkno);
511
0
  if (SpGistPageStoresNulls(page))
512
0
    flags |= GBUF_NULLS;
513
514
0
  lup = GET_LUP(cache, flags);
515
516
0
  freeSpace = PageGetExactFreeSpace(page);
517
0
  if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
518
0
    lup->freeSpace < freeSpace)
519
0
  {
520
0
    lup->blkno = blkno;
521
0
    lup->freeSpace = freeSpace;
522
0
  }
523
0
}
524
525
/*
526
 * Initialize an SPGiST page to empty, with specified flags
527
 */
528
void
529
SpGistInitPage(Page page, uint16 f)
530
0
{
531
0
  SpGistPageOpaque opaque;
532
533
0
  PageInit(page, BLCKSZ, MAXALIGN(sizeof(SpGistPageOpaqueData)));
534
0
  opaque = SpGistPageGetOpaque(page);
535
0
  memset(opaque, 0, sizeof(SpGistPageOpaqueData));
536
0
  opaque->flags = f;
537
0
  opaque->spgist_page_id = SPGIST_PAGE_ID;
538
0
}
539
540
/*
541
 * Initialize a buffer's page to empty, with specified flags
542
 */
543
void
544
SpGistInitBuffer(Buffer b, uint16 f)
545
0
{
546
0
  Assert(BufferGetPageSize(b) == BLCKSZ);
547
0
  SpGistInitPage(BufferGetPage(b), f);
548
0
}
549
550
/*
551
 * Initialize metadata page
552
 */
553
void
554
SpGistInitMetapage(Page page)
555
0
{
556
0
  SpGistMetaPageData *metadata;
557
0
  int     i;
558
559
0
  SpGistInitPage(page, SPGIST_META);
560
0
  metadata = SpGistPageGetMeta(page);
561
0
  memset(metadata, 0, sizeof(SpGistMetaPageData));
562
0
  metadata->magicNumber = SPGIST_MAGIC_NUMBER;
563
564
  /* initialize last-used-page cache to empty */
565
0
  for (i = 0; i < SPGIST_CACHED_PAGES; i++)
566
0
    metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
567
568
  /*
569
   * Set pd_lower just past the end of the metadata.  This is essential,
570
   * because without doing so, metadata will be lost if xlog.c compresses
571
   * the page.
572
   */
573
0
  ((PageHeader) page)->pd_lower =
574
0
    ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) page;
575
0
}
576
577
/*
578
 * reloptions processing for SPGiST
579
 */
580
bytea *
581
spgoptions(Datum reloptions, bool validate)
582
0
{
583
0
  return default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
584
0
}
585
586
/*
587
 * Get the space needed to store a non-null datum of the indicated type.
588
 * Note the result is already rounded up to a MAXALIGN boundary.
589
 * Also, we follow the SPGiST convention that pass-by-val types are
590
 * just stored in their Datum representation (compare memcpyDatum).
591
 */
592
unsigned int
593
SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
594
0
{
595
0
  unsigned int size;
596
597
0
  if (att->attbyval)
598
0
    size = sizeof(Datum);
599
0
  else if (att->attlen > 0)
600
0
    size = att->attlen;
601
0
  else
602
0
    size = VARSIZE_ANY(datum);
603
604
0
  return MAXALIGN(size);
605
0
}
606
607
/*
608
 * Copy the given non-null datum to *target
609
 */
610
static void
611
memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
612
0
{
613
0
  unsigned int size;
614
615
0
  if (att->attbyval)
616
0
  {
617
0
    memcpy(target, &datum, sizeof(Datum));
618
0
  }
619
0
  else
620
0
  {
621
0
    size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
622
0
    memcpy(target, DatumGetPointer(datum), size);
623
0
  }
624
0
}
625
626
/*
627
 * Construct a leaf tuple containing the given heap TID and datum value
628
 */
629
SpGistLeafTuple
630
spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
631
         Datum datum, bool isnull)
632
0
{
633
0
  SpGistLeafTuple tup;
634
0
  unsigned int size;
635
636
  /* compute space needed (note result is already maxaligned) */
637
0
  size = SGLTHDRSZ;
638
0
  if (!isnull)
639
0
    size += SpGistGetTypeSize(&state->attLeafType, datum);
640
641
  /*
642
   * Ensure that we can replace the tuple with a dead tuple later.  This
643
   * test is unnecessary when !isnull, but let's be safe.
644
   */
645
0
  if (size < SGDTSIZE)
646
0
    size = SGDTSIZE;
647
648
  /* OK, form the tuple */
649
0
  tup = (SpGistLeafTuple) palloc0(size);
650
651
0
  tup->size = size;
652
0
  tup->nextOffset = InvalidOffsetNumber;
653
0
  tup->heapPtr = *heapPtr;
654
0
  if (!isnull)
655
0
    memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum);
656
657
0
  return tup;
658
0
}
659
660
/*
661
 * Construct a node (to go into an inner tuple) containing the given label
662
 *
663
 * Note that the node's downlink is just set invalid here.  Caller will fill
664
 * it in later.
665
 */
666
SpGistNodeTuple
667
spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
668
0
{
669
0
  SpGistNodeTuple tup;
670
0
  unsigned int size;
671
0
  unsigned short infomask = 0;
672
673
  /* compute space needed (note result is already maxaligned) */
674
0
  size = SGNTHDRSZ;
675
0
  if (!isnull)
676
0
    size += SpGistGetTypeSize(&state->attLabelType, label);
677
678
  /*
679
   * Here we make sure that the size will fit in the field reserved for it
680
   * in t_info.
681
   */
682
0
  if ((size & INDEX_SIZE_MASK) != size)
683
0
    ereport(ERROR,
684
0
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
685
0
         errmsg("index row requires %zu bytes, maximum size is %zu",
686
0
            (Size) size, (Size) INDEX_SIZE_MASK)));
687
688
0
  tup = (SpGistNodeTuple) palloc0(size);
689
690
0
  if (isnull)
691
0
    infomask |= INDEX_NULL_MASK;
692
  /* we don't bother setting the INDEX_VAR_MASK bit */
693
0
  infomask |= size;
694
0
  tup->t_info = infomask;
695
696
  /* The TID field will be filled in later */
697
0
  ItemPointerSetInvalid(&tup->t_tid);
698
699
0
  if (!isnull)
700
0
    memcpyDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
701
702
0
  return tup;
703
0
}
704
705
/*
706
 * Construct an inner tuple containing the given prefix and node array
707
 */
708
SpGistInnerTuple
709
spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
710
          int nNodes, SpGistNodeTuple *nodes)
711
0
{
712
0
  SpGistInnerTuple tup;
713
0
  unsigned int size;
714
0
  unsigned int prefixSize;
715
0
  int     i;
716
0
  char     *ptr;
717
718
  /* Compute size needed */
719
0
  if (hasPrefix)
720
0
    prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix);
721
0
  else
722
0
    prefixSize = 0;
723
724
0
  size = SGITHDRSZ + prefixSize;
725
726
  /* Note: we rely on node tuple sizes to be maxaligned already */
727
0
  for (i = 0; i < nNodes; i++)
728
0
    size += IndexTupleSize(nodes[i]);
729
730
  /*
731
   * Ensure that we can replace the tuple with a dead tuple later.  This
732
   * test is unnecessary given current tuple layouts, but let's be safe.
733
   */
734
0
  if (size < SGDTSIZE)
735
0
    size = SGDTSIZE;
736
737
  /*
738
   * Inner tuple should be small enough to fit on a page
739
   */
740
0
  if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
741
0
    ereport(ERROR,
742
0
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
743
0
         errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
744
0
            (Size) size,
745
0
            SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
746
0
         errhint("Values larger than a buffer page cannot be indexed.")));
747
748
  /*
749
   * Check for overflow of header fields --- probably can't fail if the
750
   * above succeeded, but let's be paranoid
751
   */
752
0
  if (size > SGITMAXSIZE ||
753
0
    prefixSize > SGITMAXPREFIXSIZE ||
754
0
    nNodes > SGITMAXNNODES)
755
0
    elog(ERROR, "SPGiST inner tuple header field is too small");
756
757
  /* OK, form the tuple */
758
0
  tup = (SpGistInnerTuple) palloc0(size);
759
760
0
  tup->nNodes = nNodes;
761
0
  tup->prefixSize = prefixSize;
762
0
  tup->size = size;
763
764
0
  if (hasPrefix)
765
0
    memcpyDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
766
767
0
  ptr = (char *) SGITNODEPTR(tup);
768
769
0
  for (i = 0; i < nNodes; i++)
770
0
  {
771
0
    SpGistNodeTuple node = nodes[i];
772
773
0
    memcpy(ptr, node, IndexTupleSize(node));
774
0
    ptr += IndexTupleSize(node);
775
0
  }
776
777
0
  return tup;
778
0
}
779
780
/*
781
 * Construct a "dead" tuple to replace a tuple being deleted.
782
 *
783
 * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
784
 * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
785
 * the xid field is filled in automatically.
786
 *
787
 * This is called in critical sections, so we don't use palloc; the tuple
788
 * is built in preallocated storage.  It should be copied before another
789
 * call with different parameters can occur.
790
 */
791
SpGistDeadTuple
792
spgFormDeadTuple(SpGistState *state, int tupstate,
793
         BlockNumber blkno, OffsetNumber offnum)
794
0
{
795
0
  SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
796
797
0
  tuple->tupstate = tupstate;
798
0
  tuple->size = SGDTSIZE;
799
0
  tuple->nextOffset = InvalidOffsetNumber;
800
801
0
  if (tupstate == SPGIST_REDIRECT)
802
0
  {
803
0
    ItemPointerSet(&tuple->pointer, blkno, offnum);
804
0
    Assert(TransactionIdIsValid(state->myXid));
805
0
    tuple->xid = state->myXid;
806
0
  }
807
0
  else
808
0
  {
809
0
    ItemPointerSetInvalid(&tuple->pointer);
810
0
    tuple->xid = InvalidTransactionId;
811
0
  }
812
813
0
  return tuple;
814
0
}
815
816
/*
817
 * Extract the label datums of the nodes within innerTuple
818
 *
819
 * Returns NULL if label datums are NULLs
820
 */
821
Datum *
822
spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
823
0
{
824
0
  Datum    *nodeLabels;
825
0
  int     i;
826
0
  SpGistNodeTuple node;
827
828
  /* Either all the labels must be NULL, or none. */
829
0
  node = SGITNODEPTR(innerTuple);
830
0
  if (IndexTupleHasNulls(node))
831
0
  {
832
0
    SGITITERATE(innerTuple, i, node)
833
0
    {
834
0
      if (!IndexTupleHasNulls(node))
835
0
        elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
836
0
    }
837
    /* They're all null, so just return NULL */
838
0
    return NULL;
839
0
  }
840
0
  else
841
0
  {
842
0
    nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
843
0
    SGITITERATE(innerTuple, i, node)
844
0
    {
845
0
      if (IndexTupleHasNulls(node))
846
0
        elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
847
0
      nodeLabels[i] = SGNTDATUM(node, state);
848
0
    }
849
0
    return nodeLabels;
850
0
  }
851
0
}
852
853
/*
854
 * Add a new item to the page, replacing a PLACEHOLDER item if possible.
855
 * Return the location it's inserted at, or InvalidOffsetNumber on failure.
856
 *
857
 * If startOffset isn't NULL, we start searching for placeholders at
858
 * *startOffset, and update that to the next place to search.  This is just
859
 * an optimization for repeated insertions.
860
 *
861
 * If errorOK is false, we throw error when there's not enough room,
862
 * rather than returning InvalidOffsetNumber.
863
 */
864
OffsetNumber
865
SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
866
           OffsetNumber *startOffset, bool errorOK)
867
0
{
868
0
  SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
869
0
  OffsetNumber i,
870
0
        maxoff,
871
0
        offnum;
872
873
0
  if (opaque->nPlaceholder > 0 &&
874
0
    PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
875
0
  {
876
    /* Try to replace a placeholder */
877
0
    maxoff = PageGetMaxOffsetNumber(page);
878
0
    offnum = InvalidOffsetNumber;
879
880
0
    for (;;)
881
0
    {
882
0
      if (startOffset && *startOffset != InvalidOffsetNumber)
883
0
        i = *startOffset;
884
0
      else
885
0
        i = FirstOffsetNumber;
886
0
      for (; i <= maxoff; i++)
887
0
      {
888
0
        SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
889
0
                                   PageGetItemId(page, i));
890
891
0
        if (it->tupstate == SPGIST_PLACEHOLDER)
892
0
        {
893
0
          offnum = i;
894
0
          break;
895
0
        }
896
0
      }
897
898
      /* Done if we found a placeholder */
899
0
      if (offnum != InvalidOffsetNumber)
900
0
        break;
901
902
0
      if (startOffset && *startOffset != InvalidOffsetNumber)
903
0
      {
904
        /* Hint was no good, re-search from beginning */
905
0
        *startOffset = InvalidOffsetNumber;
906
0
        continue;
907
0
      }
908
909
      /* Hmm, no placeholder found? */
910
0
      opaque->nPlaceholder = 0;
911
0
      break;
912
0
    }
913
914
0
    if (offnum != InvalidOffsetNumber)
915
0
    {
916
      /* Replace the placeholder tuple */
917
0
      PageIndexTupleDelete(page, offnum);
918
919
0
      offnum = PageAddItem(page, item, size, offnum, false, false);
920
921
      /*
922
       * We should not have failed given the size check at the top of
923
       * the function, but test anyway.  If we did fail, we must PANIC
924
       * because we've already deleted the placeholder tuple, and
925
       * there's no other way to keep the damage from getting to disk.
926
       */
927
0
      if (offnum != InvalidOffsetNumber)
928
0
      {
929
0
        Assert(opaque->nPlaceholder > 0);
930
0
        opaque->nPlaceholder--;
931
0
        if (startOffset)
932
0
          *startOffset = offnum + 1;
933
0
      }
934
0
      else
935
0
        elog(PANIC, "failed to add item of size %u to SPGiST index page",
936
0
           (int) size);
937
938
0
      return offnum;
939
0
    }
940
0
  }
941
942
  /* No luck in replacing a placeholder, so just add it to the page */
943
0
  offnum = PageAddItem(page, item, size,
944
0
             InvalidOffsetNumber, false, false);
945
946
0
  if (offnum == InvalidOffsetNumber && !errorOK)
947
0
    elog(ERROR, "failed to add item of size %u to SPGiST index page",
948
0
       (int) size);
949
950
0
  return offnum;
951
0
}