YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/postgres/src/bin/pg_waldump/xlogreader.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * xlogreader.c
4
 *    Generic XLog reading facility
5
 *
6
 * Portions Copyright (c) 2013-2018, PostgreSQL Global Development Group
7
 *
8
 * IDENTIFICATION
9
 *    src/backend/access/transam/xlogreader.c
10
 *
11
 * NOTES
12
 *    See xlogreader.h for more notes on this facility.
13
 *
14
 *    This file is compiled as both front-end and backend code, so it
15
 *    may not use ereport, server-defined static variables, etc.
16
 *-------------------------------------------------------------------------
17
 */
18
#include "postgres.h"
19
20
#include "access/transam.h"
21
#include "access/xlogrecord.h"
22
#include "access/xlog_internal.h"
23
#include "access/xlogreader.h"
24
#include "catalog/pg_control.h"
25
#include "common/pg_lzcompress.h"
26
#include "replication/origin.h"
27
28
#ifndef FRONTEND
29
#include "utils/memutils.h"
30
#endif
31
32
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
33
34
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
35
            XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
36
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
37
        XLogRecPtr recptr);
38
static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
39
         int reqLen);
40
static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3);
41
42
static void ResetDecoder(XLogReaderState *state);
43
44
/* size of the buffer allocated for error message. */
45
3.99k
#define MAX_ERRORMSG_LEN 1000
46
47
/*
48
 * Construct a string in state->errormsg_buf explaining what's wrong with
49
 * the current record being read.
50
 */
51
static void
52
report_invalid_record(XLogReaderState *state, const char *fmt,...)
53
5
{
54
5
  va_list   args;
55
56
5
  fmt = _(fmt);
57
58
5
  va_start(args, fmt);
59
5
  vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
60
5
  va_end(args);
61
5
}
62
63
/*
64
 * Allocate and initialize a new XLogReader.
65
 *
66
 * Returns NULL if the xlogreader couldn't be allocated.
67
 */
68
XLogReaderState *
69
XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
70
           void *private_data)
71
3.99k
{
72
3.99k
  XLogReaderState *state;
73
74
3.99k
  state = (XLogReaderState *)
75
3.99k
    palloc_extended(sizeof(XLogReaderState),
76
3.99k
            MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
77
3.99k
  if (!state)
78
0
    return NULL;
79
80
3.99k
  state->max_block_id = -1;
81
82
  /*
83
   * Permanently allocate readBuf.  We do it this way, rather than just
84
   * making a static array, for two reasons: (1) no need to waste the
85
   * storage in most instantiations of the backend; (2) a static char array
86
   * isn't guaranteed to have any particular alignment, whereas
87
   * palloc_extended() will provide MAXALIGN'd storage.
88
   */
89
3.99k
  state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
90
3.99k
                        MCXT_ALLOC_NO_OOM);
91
3.99k
  if (!state->readBuf)
92
0
  {
93
0
    pfree(state);
94
0
    return NULL;
95
0
  }
96
97
3.99k
  state->wal_segment_size = wal_segment_size;
98
3.99k
  state->read_page = pagereadfunc;
99
  /* system_identifier initialized to zeroes above */
100
3.99k
  state->private_data = private_data;
101
  /* ReadRecPtr and EndRecPtr initialized to zeroes above */
102
  /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
103
3.99k
  state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
104
3.99k
                      MCXT_ALLOC_NO_OOM);
105
3.99k
  if (!state->errormsg_buf)
106
0
  {
107
0
    pfree(state->readBuf);
108
0
    pfree(state);
109
0
    return NULL;
110
0
  }
111
3.99k
  state->errormsg_buf[0] = '\0';
112
113
  /*
114
   * Allocate an initial readRecordBuf of minimal size, which can later be
115
   * enlarged if necessary.
116
   */
117
3.99k
  if (!allocate_recordbuf(state, 0))
118
0
  {
119
0
    pfree(state->errormsg_buf);
120
0
    pfree(state->readBuf);
121
0
    pfree(state);
122
0
    return NULL;
123
0
  }
124
125
3.99k
  return state;
126
3.99k
}
127
128
void
129
XLogReaderFree(XLogReaderState *state)
130
3.99k
{
131
3.99k
  int     block_id;
132
133
135k
  for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; 
block_id++131k
)
134
131k
  {
135
131k
    if (state->blocks[block_id].data)
136
0
      pfree(state->blocks[block_id].data);
137
131k
  }
138
3.99k
  if (state->main_data)
139
3.99k
    pfree(state->main_data);
140
141
3.99k
  pfree(state->errormsg_buf);
142
3.99k
  if (state->readRecordBuf)
143
3.99k
    pfree(state->readRecordBuf);
144
3.99k
  pfree(state->readBuf);
145
3.99k
  pfree(state);
146
3.99k
}
147
148
/*
149
 * Allocate readRecordBuf to fit a record of at least the given length.
150
 * Returns true if successful, false if out of memory.
151
 *
152
 * readRecordBufSize is set to the new buffer size.
153
 *
154
 * To avoid useless small increases, round its size to a multiple of
155
 * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
156
 * with.  (That is enough for all "normal" records, but very large commit or
157
 * abort records might need more space.)
158
 */
159
static bool
160
allocate_recordbuf(XLogReaderState *state, uint32 reclength)
161
{
162
  uint32    newSize = reclength;
163
164
  newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
165
  newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
166
167
#ifndef FRONTEND
168
169
  /*
170
   * Note that in much unlucky circumstances, the random data read from a
171
   * recycled segment can cause this routine to be called with a size
172
   * causing a hard failure at allocation.  For a standby, this would cause
173
   * the instance to stop suddenly with a hard failure, preventing it to
174
   * retry fetching WAL from one of its sources which could allow it to move
175
   * on with replay without a manual restart. If the data comes from a past
176
   * recycled segment and is still valid, then the allocation may succeed
177
   * but record checks are going to fail so this would be short-lived.  If
178
   * the allocation fails because of a memory shortage, then this is not a
179
   * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
180
   */
181
  if (!AllocSizeIsValid(newSize))
182
    return false;
183
184
#endif
185
186
  if (state->readRecordBuf)
187
    pfree(state->readRecordBuf);
188
  state->readRecordBuf =
189
    (char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
190
  if (state->readRecordBuf == NULL)
191
  {
192
    state->readRecordBufSize = 0;
193
    return false;
194
  }
195
  state->readRecordBufSize = newSize;
196
  return true;
197
}
198
199
/*
200
 * Attempt to read an XLOG record.
201
 *
202
 * If RecPtr is valid, try to read a record at that position.  Otherwise
203
 * try to read a record just after the last one previously read.
204
 *
205
 * If the read_page callback fails to read the requested data, NULL is
206
 * returned.  The callback is expected to have reported the error; errormsg
207
 * is set to NULL.
208
 *
209
 * If the reading fails for some other reason, NULL is also returned, and
210
 * *errormsg is set to a string with details of the failure.
211
 *
212
 * The returned pointer (or *errormsg) points to an internal buffer that's
213
 * valid until the next call to XLogReadRecord.
214
 */
215
XLogRecord *
216
XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
217
{
218
  XLogRecord *record;
219
  XLogRecPtr  targetPagePtr;
220
  bool    randAccess;
221
  uint32    len,
222
        total_len;
223
  uint32    targetRecOff;
224
  uint32    pageHeaderSize;
225
  bool    gotheader;
226
  int     readOff;
227
228
  /*
229
   * randAccess indicates whether to verify the previous-record pointer of
230
   * the record we're reading.  We only do this if we're reading
231
   * sequentially, which is what we initially assume.
232
   */
233
  randAccess = false;
234
235
  /* reset error state */
236
  *errormsg = NULL;
237
  state->errormsg_buf[0] = '\0';
238
239
  ResetDecoder(state);
240
241
  if (RecPtr == InvalidXLogRecPtr)
242
  {
243
    /* No explicit start point; read the record after the one we just read */
244
    RecPtr = state->EndRecPtr;
245
246
    if (state->ReadRecPtr == InvalidXLogRecPtr)
247
      randAccess = true;
248
249
    /*
250
     * RecPtr is pointing to end+1 of the previous WAL record.  If we're
251
     * at a page boundary, no more records can fit on the current page. We
252
     * must skip over the page header, but we can't do that until we've
253
     * read in the page, since the header size is variable.
254
     */
255
  }
256
  else
257
  {
258
    /*
259
     * Caller supplied a position to start at.
260
     *
261
     * In this case, the passed-in record pointer should already be
262
     * pointing to a valid record starting position.
263
     */
264
    Assert(XRecOffIsValid(RecPtr));
265
    randAccess = true;
266
  }
267
268
  state->currRecPtr = RecPtr;
269
270
  targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
271
  targetRecOff = RecPtr % XLOG_BLCKSZ;
272
273
  /*
274
   * Read the page containing the record into state->readBuf. Request enough
275
   * byte to cover the whole record header, or at least the part of it that
276
   * fits on the same page.
277
   */
278
  readOff = ReadPageInternal(state,
279
                 targetPagePtr,
280
                 Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
281
  if (readOff < 0)
282
    goto err;
283
284
  /*
285
   * ReadPageInternal always returns at least the page header, so we can
286
   * examine it now.
287
   */
288
  pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
289
  if (targetRecOff == 0)
290
  {
291
    /*
292
     * At page start, so skip over page header.
293
     */
294
    RecPtr += pageHeaderSize;
295
    targetRecOff = pageHeaderSize;
296
  }
297
  else if (targetRecOff < pageHeaderSize)
298
  {
299
    report_invalid_record(state, "invalid record offset at %X/%X",
300
                (uint32) (RecPtr >> 32), (uint32) RecPtr);
301
    goto err;
302
  }
303
304
  if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
305
    targetRecOff == pageHeaderSize)
306
  {
307
    report_invalid_record(state, "contrecord is requested by %X/%X",
308
                (uint32) (RecPtr >> 32), (uint32) RecPtr);
309
    goto err;
310
  }
311
312
  /* ReadPageInternal has verified the page header */
313
  Assert(pageHeaderSize <= readOff);
314
315
  /*
316
   * Read the record length.
317
   *
318
   * NB: Even though we use an XLogRecord pointer here, the whole record
319
   * header might not fit on this page. xl_tot_len is the first field of the
320
   * struct, so it must be on this page (the records are MAXALIGNed), but we
321
   * cannot access any other fields until we've verified that we got the
322
   * whole header.
323
   */
324
  record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
325
  total_len = record->xl_tot_len;
326
327
  /*
328
   * If the whole record header is on this page, validate it immediately.
329
   * Otherwise do just a basic sanity check on xl_tot_len, and validate the
330
   * rest of the header after reading it from the next page.  The xl_tot_len
331
   * check is necessary here to ensure that we enter the "Need to reassemble
332
   * record" code path below; otherwise we might fail to apply
333
   * ValidXLogRecordHeader at all.
334
   */
335
  if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
336
  {
337
    if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
338
                   randAccess))
339
      goto err;
340
    gotheader = true;
341
  }
342
  else
343
  {
344
    /* XXX: more validation should be done here */
345
    if (total_len < SizeOfXLogRecord)
346
    {
347
      report_invalid_record(state,
348
                  "invalid record length at %X/%X: wanted %u, got %u",
349
                  (uint32) (RecPtr >> 32), (uint32) RecPtr,
350
                  (uint32) SizeOfXLogRecord, total_len);
351
      goto err;
352
    }
353
    gotheader = false;
354
  }
355
356
  /*
357
   * Enlarge readRecordBuf as needed.
358
   */
359
  if (total_len > state->readRecordBufSize &&
360
    !allocate_recordbuf(state, total_len))
361
  {
362
    /* We treat this as a "bogus data" condition */
363
    report_invalid_record(state, "record length %u at %X/%X too long",
364
                total_len,
365
                (uint32) (RecPtr >> 32), (uint32) RecPtr);
366
    goto err;
367
  }
368
369
  len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
370
  if (total_len > len)
371
  {
372
    /* Need to reassemble record */
373
    char     *contdata;
374
    XLogPageHeader pageHeader;
375
    char     *buffer;
376
    uint32    gotlen;
377
378
    /* Copy the first fragment of the record from the first page. */
379
    memcpy(state->readRecordBuf,
380
         state->readBuf + RecPtr % XLOG_BLCKSZ, len);
381
    buffer = state->readRecordBuf + len;
382
    gotlen = len;
383
384
    do
385
    {
386
      /* Calculate pointer to beginning of next page */
387
      targetPagePtr += XLOG_BLCKSZ;
388
389
      /* Wait for the next page to become available */
390
      readOff = ReadPageInternal(state, targetPagePtr,
391
                     Min(total_len - gotlen + SizeOfXLogShortPHD,
392
                       XLOG_BLCKSZ));
393
394
      if (readOff < 0)
395
        goto err;
396
397
      Assert(SizeOfXLogShortPHD <= readOff);
398
399
      /* Check that the continuation on next page looks valid */
400
      pageHeader = (XLogPageHeader) state->readBuf;
401
      if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
402
      {
403
        report_invalid_record(state,
404
                    "there is no contrecord flag at %X/%X",
405
                    (uint32) (RecPtr >> 32), (uint32) RecPtr);
406
        goto err;
407
      }
408
409
      /*
410
       * Cross-check that xlp_rem_len agrees with how much of the record
411
       * we expect there to be left.
412
       */
413
      if (pageHeader->xlp_rem_len == 0 ||
414
        total_len != (pageHeader->xlp_rem_len + gotlen))
415
      {
416
        report_invalid_record(state,
417
                    "invalid contrecord length %u at %X/%X",
418
                    pageHeader->xlp_rem_len,
419
                    (uint32) (RecPtr >> 32), (uint32) RecPtr);
420
        goto err;
421
      }
422
423
      /* Append the continuation from this page to the buffer */
424
      pageHeaderSize = XLogPageHeaderSize(pageHeader);
425
426
      if (readOff < pageHeaderSize)
427
        readOff = ReadPageInternal(state, targetPagePtr,
428
                       pageHeaderSize);
429
430
      Assert(pageHeaderSize <= readOff);
431
432
      contdata = (char *) state->readBuf + pageHeaderSize;
433
      len = XLOG_BLCKSZ - pageHeaderSize;
434
      if (pageHeader->xlp_rem_len < len)
435
        len = pageHeader->xlp_rem_len;
436
437
      if (readOff < pageHeaderSize + len)
438
        readOff = ReadPageInternal(state, targetPagePtr,
439
                       pageHeaderSize + len);
440
441
      memcpy(buffer, (char *) contdata, len);
442
      buffer += len;
443
      gotlen += len;
444
445
      /* If we just reassembled the record header, validate it. */
446
      if (!gotheader)
447
      {
448
        record = (XLogRecord *) state->readRecordBuf;
449
        if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
450
                       record, randAccess))
451
          goto err;
452
        gotheader = true;
453
      }
454
    } while (gotlen < total_len);
455
456
    Assert(gotheader);
457
458
    record = (XLogRecord *) state->readRecordBuf;
459
    if (!ValidXLogRecord(state, record, RecPtr))
460
      goto err;
461
462
    pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
463
    state->ReadRecPtr = RecPtr;
464
    state->EndRecPtr = targetPagePtr + pageHeaderSize
465
      + MAXALIGN(pageHeader->xlp_rem_len);
466
  }
467
  else
468
  {
469
    /* Wait for the record data to become available */
470
    readOff = ReadPageInternal(state, targetPagePtr,
471
                   Min(targetRecOff + total_len, XLOG_BLCKSZ));
472
    if (readOff < 0)
473
      goto err;
474
475
    /* Record does not cross a page boundary */
476
    if (!ValidXLogRecord(state, record, RecPtr))
477
      goto err;
478
479
    state->EndRecPtr = RecPtr + MAXALIGN(total_len);
480
481
    state->ReadRecPtr = RecPtr;
482
    memcpy(state->readRecordBuf, record, total_len);
483
  }
484
485
  /*
486
   * Special processing if it's an XLOG SWITCH record
487
   */
488
  if (record->xl_rmid == RM_XLOG_ID &&
489
    (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
490
  {
491
    /* Pretend it extends to end of segment */
492
    state->EndRecPtr += state->wal_segment_size - 1;
493
    state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
494
  }
495
496
  if (DecodeXLogRecord(state, record, errormsg))
497
    return record;
498
  else
499
    return NULL;
500
501
err:
502
503
  /*
504
   * Invalidate the read state. We might read from a different source after
505
   * failure.
506
   */
507
  XLogReaderInvalReadState(state);
508
509
  if (state->errormsg_buf[0] != '\0')
510
    *errormsg = state->errormsg_buf;
511
512
  return NULL;
513
}
514
515
/*
516
 * Read a single xlog page including at least [pageptr, reqLen] of valid data
517
 * via the read_page() callback.
518
 *
519
 * Returns -1 if the required page cannot be read for some reason; errormsg_buf
520
 * is set in that case (unless the error occurs in the read_page callback).
521
 *
522
 * We fetch the page from a reader-local cache if we know we have the required
523
 * data and if there hasn't been any error since caching the data.
524
 */
525
static int
526
ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
527
{
528
  int     readLen;
529
  uint32    targetPageOff;
530
  XLogSegNo targetSegNo;
531
  XLogPageHeader hdr;
532
533
  Assert((pageptr % XLOG_BLCKSZ) == 0);
534
535
  XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
536
  targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
537
538
  /* check whether we have all the requested data already */
539
  if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
540
    reqLen < state->readLen)
541
    return state->readLen;
542
543
  /*
544
   * Data is not in our buffer.
545
   *
546
   * Every time we actually read the page, even if we looked at parts of it
547
   * before, we need to do verification as the read_page callback might now
548
   * be rereading data from a different source.
549
   *
550
   * Whenever switching to a new WAL segment, we read the first page of the
551
   * file and validate its header, even if that's not where the target
552
   * record is.  This is so that we can check the additional identification
553
   * info that is present in the first page's "long" header.
554
   */
555
  if (targetSegNo != state->readSegNo && targetPageOff != 0)
556
  {
557
    XLogRecPtr  targetSegmentPtr = pageptr - targetPageOff;
558
559
    readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
560
                   state->currRecPtr,
561
                   state->readBuf, &state->readPageTLI);
562
    if (readLen < 0)
563
      goto err;
564
565
    /* we can be sure to have enough WAL available, we scrolled back */
566
    Assert(readLen == XLOG_BLCKSZ);
567
568
    if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
569
                      state->readBuf))
570
      goto err;
571
  }
572
573
  /*
574
   * First, read the requested data length, but at least a short page header
575
   * so that we can validate it.
576
   */
577
  readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
578
                 state->currRecPtr,
579
                 state->readBuf, &state->readPageTLI);
580
  if (readLen < 0)
581
    goto err;
582
583
  Assert(readLen <= XLOG_BLCKSZ);
584
585
  /* Do we have enough data to check the header length? */
586
  if (readLen <= SizeOfXLogShortPHD)
587
    goto err;
588
589
  Assert(readLen >= reqLen);
590
591
  hdr = (XLogPageHeader) state->readBuf;
592
593
  /* still not enough */
594
  if (readLen < XLogPageHeaderSize(hdr))
595
  {
596
    readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
597
                   state->currRecPtr,
598
                   state->readBuf, &state->readPageTLI);
599
    if (readLen < 0)
600
      goto err;
601
  }
602
603
  /*
604
   * Now that we know we have the full header, validate it.
605
   */
606
  if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
607
    goto err;
608
609
  /* update read state information */
610
  state->readSegNo = targetSegNo;
611
  state->readOff = targetPageOff;
612
  state->readLen = readLen;
613
614
  return readLen;
615
616
err:
617
  XLogReaderInvalReadState(state);
618
  return -1;
619
}
620
621
/*
622
 * Invalidate the xlogreader's read state to force a re-read.
623
 */
624
void
625
XLogReaderInvalReadState(XLogReaderState *state)
626
5
{
627
5
  state->readSegNo = 0;
628
5
  state->readOff = 0;
629
5
  state->readLen = 0;
630
5
}
631
632
/*
633
 * Validate an XLOG record header.
634
 *
635
 * This is just a convenience subroutine to avoid duplicated code in
636
 * XLogReadRecord.  It's not intended for use from anywhere else.
637
 */
638
static bool
639
ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
640
            XLogRecPtr PrevRecPtr, XLogRecord *record,
641
            bool randAccess)
642
8.00k
{
643
8.00k
  if (record->xl_tot_len < SizeOfXLogRecord)
644
5
  {
645
5
    report_invalid_record(state,
646
5
                "invalid record length at %X/%X: wanted %u, got %u",
647
5
                (uint32) (RecPtr >> 32), (uint32) RecPtr,
648
5
                (uint32) SizeOfXLogRecord, record->xl_tot_len);
649
5
    return false;
650
5
  }
651
8.00k
  if (record->xl_rmid > RM_MAX_ID)
652
0
  {
653
0
    report_invalid_record(state,
654
0
                "invalid resource manager ID %u at %X/%X",
655
0
                record->xl_rmid, (uint32) (RecPtr >> 32),
656
0
                (uint32) RecPtr);
657
0
    return false;
658
0
  }
659
8.00k
  if (randAccess)
660
7.99k
  {
661
    /*
662
     * We can't exactly verify the prev-link, but surely it should be less
663
     * than the record's own address.
664
     */
665
7.99k
    if (!(record->xl_prev < RecPtr))
666
0
    {
667
0
      report_invalid_record(state,
668
0
                  "record with incorrect prev-link %X/%X at %X/%X",
669
0
                  (uint32) (record->xl_prev >> 32),
670
0
                  (uint32) record->xl_prev,
671
0
                  (uint32) (RecPtr >> 32), (uint32) RecPtr);
672
0
      return false;
673
0
    }
674
7.99k
  }
675
1
  else
676
1
  {
677
    /*
678
     * Record's prev-link should exactly match our previous location. This
679
     * check guards against torn WAL pages where a stale but valid-looking
680
     * WAL record starts on a sector boundary.
681
     */
682
1
    if (record->xl_prev != PrevRecPtr)
683
0
    {
684
0
      report_invalid_record(state,
685
0
                  "record with incorrect prev-link %X/%X at %X/%X",
686
0
                  (uint32) (record->xl_prev >> 32),
687
0
                  (uint32) record->xl_prev,
688
0
                  (uint32) (RecPtr >> 32), (uint32) RecPtr);
689
0
      return false;
690
0
    }
691
1
  }
692
693
8.00k
  return true;
694
8.00k
}
695
696
697
/*
698
 * CRC-check an XLOG record.  We do not believe the contents of an XLOG
699
 * record (other than to the minimal extent of computing the amount of
700
 * data to read in) until we've checked the CRCs.
701
 *
702
 * We assume all of the record (that is, xl_tot_len bytes) has been read
703
 * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
704
 * record's header, which means in particular that xl_tot_len is at least
705
 * SizeOfXlogRecord.
706
 */
707
static bool
708
ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
709
8.00k
{
710
8.00k
  pg_crc32c crc;
711
712
  /* Calculate the CRC */
713
8.00k
  INIT_CRC32C(crc);
714
8.00k
  COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
715
  /* include the record header last */
716
8.00k
  COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
717
8.00k
  FIN_CRC32C(crc);
718
719
8.00k
  if (!EQ_CRC32C(record->xl_crc, crc))
720
0
  {
721
0
    report_invalid_record(state,
722
0
                "incorrect resource manager data checksum in record at %X/%X",
723
0
                (uint32) (recptr >> 32), (uint32) recptr);
724
0
    return false;
725
0
  }
726
727
8.00k
  return true;
728
8.00k
}
729
730
/*
731
 * Validate a page header.
732
 *
733
 * Check if 'phdr' is valid as the header of the XLog page at position
734
 * 'recptr'.
735
 */
736
bool
737
XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
738
               char *phdr)
739
{
740
  XLogRecPtr  recaddr;
741
  XLogSegNo segno;
742
  int32   offset;
743
  XLogPageHeader hdr = (XLogPageHeader) phdr;
744
745
  Assert((recptr % XLOG_BLCKSZ) == 0);
746
747
  XLByteToSeg(recptr, segno, state->wal_segment_size);
748
  offset = XLogSegmentOffset(recptr, state->wal_segment_size);
749
750
  XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr);
751
752
  if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
753
  {
754
    char    fname[MAXFNAMELEN];
755
756
    XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
757
758
    report_invalid_record(state,
759
                "invalid magic number %04X in log segment %s, offset %u",
760
                hdr->xlp_magic,
761
                fname,
762
                offset);
763
    return false;
764
  }
765
766
  if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
767
  {
768
    char    fname[MAXFNAMELEN];
769
770
    XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
771
772
    report_invalid_record(state,
773
                "invalid info bits %04X in log segment %s, offset %u",
774
                hdr->xlp_info,
775
                fname,
776
                offset);
777
    return false;
778
  }
779
780
  if (hdr->xlp_info & XLP_LONG_HEADER)
781
  {
782
    XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
783
784
    if (state->system_identifier &&
785
      longhdr->xlp_sysid != state->system_identifier)
786
    {
787
      char    fhdrident_str[32];
788
      char    sysident_str[32];
789
790
      /*
791
       * Format sysids separately to keep platform-dependent format code
792
       * out of the translatable message string.
793
       */
794
      snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
795
           longhdr->xlp_sysid);
796
      snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
797
           state->system_identifier);
798
      report_invalid_record(state,
799
                  "WAL file is from different database system: WAL file database system identifier is %s, pg_control database system identifier is %s",
800
                  fhdrident_str, sysident_str);
801
      return false;
802
    }
803
    else if (longhdr->xlp_seg_size != state->wal_segment_size)
804
    {
805
      report_invalid_record(state,
806
                  "WAL file is from different database system: incorrect segment size in page header");
807
      return false;
808
    }
809
    else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
810
    {
811
      report_invalid_record(state,
812
                  "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
813
      return false;
814
    }
815
  }
816
  else if (offset == 0)
817
  {
818
    char    fname[MAXFNAMELEN];
819
820
    XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
821
822
    /* hmm, first page of file doesn't have a long header? */
823
    report_invalid_record(state,
824
                "invalid info bits %04X in log segment %s, offset %u",
825
                hdr->xlp_info,
826
                fname,
827
                offset);
828
    return false;
829
  }
830
831
  /*
832
   * Check that the address on the page agrees with what we expected. This
833
   * check typically fails when an old WAL segment is recycled, and hasn't
834
   * yet been overwritten with new data yet.
835
   */
836
  if (hdr->xlp_pageaddr != recaddr)
837
  {
838
    char    fname[MAXFNAMELEN];
839
840
    XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
841
842
    report_invalid_record(state,
843
                "unexpected pageaddr %X/%X in log segment %s, offset %u",
844
                (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
845
                fname,
846
                offset);
847
    return false;
848
  }
849
850
  /*
851
   * Since child timelines are always assigned a TLI greater than their
852
   * immediate parent's TLI, we should never see TLI go backwards across
853
   * successive pages of a consistent WAL sequence.
854
   *
855
   * Sometimes we re-read a segment that's already been (partially) read. So
856
   * we only verify TLIs for pages that are later than the last remembered
857
   * LSN.
858
   */
859
  if (recptr > state->latestPagePtr)
860
  {
861
    if (hdr->xlp_tli < state->latestPageTLI)
862
    {
863
      char    fname[MAXFNAMELEN];
864
865
      XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
866
867
      report_invalid_record(state,
868
                  "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
869
                  hdr->xlp_tli,
870
                  state->latestPageTLI,
871
                  fname,
872
                  offset);
873
      return false;
874
    }
875
  }
876
  state->latestPagePtr = recptr;
877
  state->latestPageTLI = hdr->xlp_tli;
878
879
  return true;
880
}
881
882
#ifdef FRONTEND
883
/*
884
 * Functions that are currently not needed in the backend, but are better
885
 * implemented inside xlogreader.c because of the internal facilities available
886
 * here.
887
 */
888
889
/*
890
 * Find the first record with an lsn >= RecPtr.
891
 *
892
 * Useful for checking whether RecPtr is a valid xlog address for reading, and
893
 * to find the first valid address after some address when dumping records for
894
 * debugging purposes.
895
 */
896
XLogRecPtr
897
XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
898
0
{
899
0
  XLogReaderState saved_state = *state;
900
0
  XLogRecPtr  tmpRecPtr;
901
0
  XLogRecPtr  found = InvalidXLogRecPtr;
902
0
  XLogPageHeader header;
903
0
  char     *errormsg;
904
905
0
  Assert(!XLogRecPtrIsInvalid(RecPtr));
906
907
  /*
908
   * skip over potential continuation data, keeping in mind that it may span
909
   * multiple pages
910
   */
911
0
  tmpRecPtr = RecPtr;
912
0
  while (true)
913
0
  {
914
0
    XLogRecPtr  targetPagePtr;
915
0
    int     targetRecOff;
916
0
    uint32    pageHeaderSize;
917
0
    int     readLen;
918
919
    /*
920
     * Compute targetRecOff. It should typically be equal or greater than
921
     * short page-header since a valid record can't start anywhere before
922
     * that, except when caller has explicitly specified the offset that
923
     * falls somewhere there or when we are skipping multi-page
924
     * continuation record. It doesn't matter though because
925
     * ReadPageInternal() is prepared to handle that and will read at
926
     * least short page-header worth of data
927
     */
928
0
    targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
929
930
    /* scroll back to page boundary */
931
0
    targetPagePtr = tmpRecPtr - targetRecOff;
932
933
    /* Read the page containing the record */
934
0
    readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
935
0
    if (readLen < 0)
936
0
      goto err;
937
938
0
    header = (XLogPageHeader) state->readBuf;
939
940
0
    pageHeaderSize = XLogPageHeaderSize(header);
941
942
    /* make sure we have enough data for the page header */
943
0
    readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
944
0
    if (readLen < 0)
945
0
      goto err;
946
947
    /* skip over potential continuation data */
948
0
    if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
949
0
    {
950
      /*
951
       * If the length of the remaining continuation data is more than
952
       * what can fit in this page, the continuation record crosses over
953
       * this page. Read the next page and try again. xlp_rem_len in the
954
       * next page header will contain the remaining length of the
955
       * continuation data
956
       *
957
       * Note that record headers are MAXALIGN'ed
958
       */
959
0
      if (MAXALIGN(header->xlp_rem_len) > (XLOG_BLCKSZ - pageHeaderSize))
960
0
        tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
961
0
      else
962
0
      {
963
        /*
964
         * The previous continuation record ends in this page. Set
965
         * tmpRecPtr to point to the first valid record
966
         */
967
0
        tmpRecPtr = targetPagePtr + pageHeaderSize
968
0
          + MAXALIGN(header->xlp_rem_len);
969
0
        break;
970
0
      }
971
0
    }
972
0
    else
973
0
    {
974
0
      tmpRecPtr = targetPagePtr + pageHeaderSize;
975
0
      break;
976
0
    }
977
0
  }
978
979
  /*
980
   * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
981
   * because either we're at the first record after the beginning of a page
982
   * or we just jumped over the remaining data of a continuation.
983
   */
984
0
  while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
985
0
  {
986
    /* continue after the record */
987
0
    tmpRecPtr = InvalidXLogRecPtr;
988
989
    /* past the record we've found, break out */
990
0
    if (RecPtr <= state->ReadRecPtr)
991
0
    {
992
0
      found = state->ReadRecPtr;
993
0
      goto out;
994
0
    }
995
0
  }
996
997
0
err:
998
0
out:
999
  /* Reset state to what we had before finding the record */
1000
0
  state->ReadRecPtr = saved_state.ReadRecPtr;
1001
0
  state->EndRecPtr = saved_state.EndRecPtr;
1002
0
  XLogReaderInvalReadState(state);
1003
1004
0
  return found;
1005
0
}
1006
1007
#endif              /* FRONTEND */
1008
1009
1010
/* ----------------------------------------
1011
 * Functions for decoding the data and block references in a record.
1012
 * ----------------------------------------
1013
 */
1014
1015
/* private function to reset the state between records */
1016
static void
1017
ResetDecoder(XLogReaderState *state)
1018
16.0k
{
1019
16.0k
  int     block_id;
1020
1021
16.0k
  state->decoded_record = NULL;
1022
1023
16.0k
  state->main_data_len = 0;
1024
1025
16.0k
  for (block_id = 0; block_id <= state->max_block_id; 
block_id++0
)
1026
0
  {
1027
0
    state->blocks[block_id].in_use = false;
1028
0
    state->blocks[block_id].has_image = false;
1029
0
    state->blocks[block_id].has_data = false;
1030
0
    state->blocks[block_id].apply_image = false;
1031
0
  }
1032
16.0k
  state->max_block_id = -1;
1033
16.0k
}
1034
1035
/*
1036
 * Decode the previously read record.
1037
 *
1038
 * On error, a human-readable error message is returned in *errormsg, and
1039
 * the return value is false.
1040
 */
1041
bool
1042
DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1043
{
1044
  /*
1045
   * read next _size bytes from record buffer, but check for overrun first.
1046
   */
1047
#define COPY_HEADER_FIELD(_dst, _size)      \
1048
  do {                    \
1049
    if (remaining < _size)          \
1050
      goto shortdata_err;         \
1051
    memcpy(_dst, ptr, _size);       \
1052
    ptr += _size;             \
1053
    remaining -= _size;           \
1054
  } while(0)
1055
1056
  char     *ptr;
1057
  uint32    remaining;
1058
  uint32    datatotal;
1059
  RelFileNode *rnode = NULL;
1060
  uint8   block_id;
1061
1062
  ResetDecoder(state);
1063
1064
  state->decoded_record = record;
1065
  state->record_origin = InvalidRepOriginId;
1066
1067
  ptr = (char *) record;
1068
  ptr += SizeOfXLogRecord;
1069
  remaining = record->xl_tot_len - SizeOfXLogRecord;
1070
1071
  /* Decode the headers */
1072
  datatotal = 0;
1073
  while (remaining > datatotal)
1074
  {
1075
    COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1076
1077
    if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1078
    {
1079
      /* XLogRecordDataHeaderShort */
1080
      uint8   main_data_len;
1081
1082
      COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1083
1084
      state->main_data_len = main_data_len;
1085
      datatotal += main_data_len;
1086
      break;        /* by convention, the main data fragment is
1087
                 * always last */
1088
    }
1089
    else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1090
    {
1091
      /* XLogRecordDataHeaderLong */
1092
      uint32    main_data_len;
1093
1094
      COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1095
      state->main_data_len = main_data_len;
1096
      datatotal += main_data_len;
1097
      break;        /* by convention, the main data fragment is
1098
                 * always last */
1099
    }
1100
    else if (block_id == XLR_BLOCK_ID_ORIGIN)
1101
    {
1102
      COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1103
    }
1104
    else if (block_id <= XLR_MAX_BLOCK_ID)
1105
    {
1106
      /* XLogRecordBlockHeader */
1107
      DecodedBkpBlock *blk;
1108
      uint8   fork_flags;
1109
1110
      if (block_id <= state->max_block_id)
1111
      {
1112
        report_invalid_record(state,
1113
                    "out-of-order block_id %u at %X/%X",
1114
                    block_id,
1115
                    (uint32) (state->ReadRecPtr >> 32),
1116
                    (uint32) state->ReadRecPtr);
1117
        goto err;
1118
      }
1119
      state->max_block_id = block_id;
1120
1121
      blk = &state->blocks[block_id];
1122
      blk->in_use = true;
1123
      blk->apply_image = false;
1124
1125
      COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1126
      blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1127
      blk->flags = fork_flags;
1128
      blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1129
      blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1130
1131
      COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1132
      /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1133
      if (blk->has_data && blk->data_len == 0)
1134
      {
1135
        report_invalid_record(state,
1136
                    "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1137
                    (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1138
        goto err;
1139
      }
1140
      if (!blk->has_data && blk->data_len != 0)
1141
      {
1142
        report_invalid_record(state,
1143
                    "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1144
                    (unsigned int) blk->data_len,
1145
                    (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1146
        goto err;
1147
      }
1148
      datatotal += blk->data_len;
1149
1150
      if (blk->has_image)
1151
      {
1152
        COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1153
        COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1154
        COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1155
1156
        blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
1157
1158
        if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1159
        {
1160
          if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1161
            COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1162
          else
1163
            blk->hole_length = 0;
1164
        }
1165
        else
1166
          blk->hole_length = BLCKSZ - blk->bimg_len;
1167
        datatotal += blk->bimg_len;
1168
1169
        /*
1170
         * cross-check that hole_offset > 0, hole_length > 0 and
1171
         * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1172
         */
1173
        if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1174
          (blk->hole_offset == 0 ||
1175
           blk->hole_length == 0 ||
1176
           blk->bimg_len == BLCKSZ))
1177
        {
1178
          report_invalid_record(state,
1179
                      "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1180
                      (unsigned int) blk->hole_offset,
1181
                      (unsigned int) blk->hole_length,
1182
                      (unsigned int) blk->bimg_len,
1183
                      (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1184
          goto err;
1185
        }
1186
1187
        /*
1188
         * cross-check that hole_offset == 0 and hole_length == 0 if
1189
         * the HAS_HOLE flag is not set.
1190
         */
1191
        if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1192
          (blk->hole_offset != 0 || blk->hole_length != 0))
1193
        {
1194
          report_invalid_record(state,
1195
                      "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1196
                      (unsigned int) blk->hole_offset,
1197
                      (unsigned int) blk->hole_length,
1198
                      (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1199
          goto err;
1200
        }
1201
1202
        /*
1203
         * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1204
         * flag is set.
1205
         */
1206
        if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1207
          blk->bimg_len == BLCKSZ)
1208
        {
1209
          report_invalid_record(state,
1210
                      "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1211
                      (unsigned int) blk->bimg_len,
1212
                      (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1213
          goto err;
1214
        }
1215
1216
        /*
1217
         * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1218
         * IS_COMPRESSED flag is set.
1219
         */
1220
        if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1221
          !(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1222
          blk->bimg_len != BLCKSZ)
1223
        {
1224
          report_invalid_record(state,
1225
                      "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1226
                      (unsigned int) blk->data_len,
1227
                      (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1228
          goto err;
1229
        }
1230
      }
1231
      if (!(fork_flags & BKPBLOCK_SAME_REL))
1232
      {
1233
        COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1234
        rnode = &blk->rnode;
1235
      }
1236
      else
1237
      {
1238
        if (rnode == NULL)
1239
        {
1240
          report_invalid_record(state,
1241
                      "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1242
                      (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1243
          goto err;
1244
        }
1245
1246
        blk->rnode = *rnode;
1247
      }
1248
      COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1249
    }
1250
    else
1251
    {
1252
      report_invalid_record(state,
1253
                  "invalid block_id %u at %X/%X",
1254
                  block_id,
1255
                  (uint32) (state->ReadRecPtr >> 32),
1256
                  (uint32) state->ReadRecPtr);
1257
      goto err;
1258
    }
1259
  }
1260
1261
  if (remaining != datatotal)
1262
    goto shortdata_err;
1263
1264
  /*
1265
   * Ok, we've parsed the fragment headers, and verified that the total
1266
   * length of the payload in the fragments is equal to the amount of data
1267
   * left. Copy the data of each fragment to a separate buffer.
1268
   *
1269
   * We could just set up pointers into readRecordBuf, but we want to align
1270
   * the data for the convenience of the callers. Backup images are not
1271
   * copied, however; they don't need alignment.
1272
   */
1273
1274
  /* block data first */
1275
  for (block_id = 0; block_id <= state->max_block_id; block_id++)
1276
  {
1277
    DecodedBkpBlock *blk = &state->blocks[block_id];
1278
1279
    if (!blk->in_use)
1280
      continue;
1281
1282
    Assert(blk->has_image || !blk->apply_image);
1283
1284
    if (blk->has_image)
1285
    {
1286
      blk->bkp_image = ptr;
1287
      ptr += blk->bimg_len;
1288
    }
1289
    if (blk->has_data)
1290
    {
1291
      if (!blk->data || blk->data_len > blk->data_bufsz)
1292
      {
1293
        if (blk->data)
1294
          pfree(blk->data);
1295
1296
        /*
1297
         * Force the initial request to be BLCKSZ so that we don't
1298
         * waste time with lots of trips through this stanza as a
1299
         * result of WAL compression.
1300
         */
1301
        blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
1302
        blk->data = palloc(blk->data_bufsz);
1303
      }
1304
      memcpy(blk->data, ptr, blk->data_len);
1305
      ptr += blk->data_len;
1306
    }
1307
  }
1308
1309
  /* and finally, the main data */
1310
  if (state->main_data_len > 0)
1311
  {
1312
    if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1313
    {
1314
      if (state->main_data)
1315
        pfree(state->main_data);
1316
1317
      /*
1318
       * main_data_bufsz must be MAXALIGN'ed.  In many xlog record
1319
       * types, we omit trailing struct padding on-disk to save a few
1320
       * bytes; but compilers may generate accesses to the xlog struct
1321
       * that assume that padding bytes are present.  If the palloc
1322
       * request is not large enough to include such padding bytes then
1323
       * we'll get valgrind complaints due to otherwise-harmless fetches
1324
       * of the padding bytes.
1325
       *
1326
       * In addition, force the initial request to be reasonably large
1327
       * so that we don't waste time with lots of trips through this
1328
       * stanza.  BLCKSZ / 2 seems like a good compromise choice.
1329
       */
1330
      state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1331
                          BLCKSZ / 2));
1332
      state->main_data = palloc(state->main_data_bufsz);
1333
    }
1334
    memcpy(state->main_data, ptr, state->main_data_len);
1335
    ptr += state->main_data_len;
1336
  }
1337
1338
  return true;
1339
1340
shortdata_err:
1341
  report_invalid_record(state,
1342
              "record with invalid length at %X/%X",
1343
              (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1344
err:
1345
  *errormsg = state->errormsg_buf;
1346
1347
  return false;
1348
}
1349
1350
/*
1351
 * Returns information about the block that a block reference refers to.
1352
 *
1353
 * If the WAL record contains a block reference with the given ID, *rnode,
1354
 * *forknum, and *blknum are filled in (if not NULL), and returns true.
1355
 * Otherwise returns false.
1356
 */
1357
bool
1358
XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1359
           RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1360
0
{
1361
0
  DecodedBkpBlock *bkpb;
1362
1363
0
  if (!record->blocks[block_id].in_use)
1364
0
    return false;
1365
1366
0
  bkpb = &record->blocks[block_id];
1367
0
  if (rnode)
1368
0
    *rnode = bkpb->rnode;
1369
0
  if (forknum)
1370
0
    *forknum = bkpb->forknum;
1371
0
  if (blknum)
1372
0
    *blknum = bkpb->blkno;
1373
0
  return true;
1374
0
}
1375
1376
/*
1377
 * Returns the data associated with a block reference, or NULL if there is
1378
 * no data (e.g. because a full-page image was taken instead). The returned
1379
 * pointer points to a MAXALIGNed buffer.
1380
 */
1381
char *
1382
XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1383
0
{
1384
0
  DecodedBkpBlock *bkpb;
1385
1386
0
  if (!record->blocks[block_id].in_use)
1387
0
    return NULL;
1388
1389
0
  bkpb = &record->blocks[block_id];
1390
1391
0
  if (!bkpb->has_data)
1392
0
  {
1393
0
    if (len)
1394
0
      *len = 0;
1395
0
    return NULL;
1396
0
  }
1397
0
  else
1398
0
  {
1399
0
    if (len)
1400
0
      *len = bkpb->data_len;
1401
0
    return bkpb->data;
1402
0
  }
1403
0
}
1404
1405
/*
1406
 * Restore a full-page image from a backup block attached to an XLOG record.
1407
 *
1408
 * Returns the buffer number containing the page.
1409
 */
1410
bool
1411
RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1412
0
{
1413
0
  DecodedBkpBlock *bkpb;
1414
0
  char     *ptr;
1415
0
  PGAlignedBlock tmp;
1416
1417
0
  if (!record->blocks[block_id].in_use)
1418
0
    return false;
1419
0
  if (!record->blocks[block_id].has_image)
1420
0
    return false;
1421
1422
0
  bkpb = &record->blocks[block_id];
1423
0
  ptr = bkpb->bkp_image;
1424
1425
0
  if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1426
0
  {
1427
    /* If a backup block image is compressed, decompress it */
1428
0
    if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1429
0
              BLCKSZ - bkpb->hole_length) < 0)
1430
0
    {
1431
0
      report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1432
0
                  (uint32) (record->ReadRecPtr >> 32),
1433
0
                  (uint32) record->ReadRecPtr,
1434
0
                  block_id);
1435
0
      return false;
1436
0
    }
1437
0
    ptr = tmp.data;
1438
0
  }
1439
1440
  /* generate page, taking into account hole if necessary */
1441
0
  if (bkpb->hole_length == 0)
1442
0
  {
1443
0
    memcpy(page, ptr, BLCKSZ);
1444
0
  }
1445
0
  else
1446
0
  {
1447
0
    memcpy(page, ptr, bkpb->hole_offset);
1448
    /* must zero-fill the hole */
1449
0
    MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1450
0
    memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1451
0
         ptr + bkpb->hole_offset,
1452
0
         BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1453
0
  }
1454
1455
0
  return true;
1456
0
}