YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/postgres/src/backend/access/transam/twophase.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * twophase.c
4
 *    Two-phase commit support functions.
5
 *
6
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 * IDENTIFICATION
10
 *    src/backend/access/transam/twophase.c
11
 *
12
 * NOTES
13
 *    Each global transaction is associated with a global transaction
14
 *    identifier (GID). The client assigns a GID to a postgres
15
 *    transaction with the PREPARE TRANSACTION command.
16
 *
17
 *    We keep all active global transactions in a shared memory array.
18
 *    When the PREPARE TRANSACTION command is issued, the GID is
19
 *    reserved for the transaction in the array. This is done before
20
 *    a WAL entry is made, because the reservation checks for duplicate
21
 *    GIDs and aborts the transaction if there already is a global
22
 *    transaction in prepared state with the same GID.
23
 *
24
 *    A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
25
 *    what keeps the XID considered running by TransactionIdIsInProgress.
26
 *    It is also convenient as a PGPROC to hook the gxact's locks to.
27
 *
28
 *    Information to recover prepared transactions in case of crash is
29
 *    now stored in WAL for the common case. In some cases there will be
30
 *    an extended period between preparing a GXACT and commit/abort, in
31
 *    which case we need to separately record prepared transaction data
32
 *    in permanent storage. This includes locking information, pending
33
 *    notifications etc. All that state information is written to the
34
 *    per-transaction state file in the pg_twophase directory.
35
 *    All prepared transactions will be written prior to shutdown.
36
 *
37
 *    Life track of state data is following:
38
 *
39
 *    * On PREPARE TRANSACTION backend writes state data only to the WAL and
40
 *      stores pointer to the start of the WAL record in
41
 *      gxact->prepare_start_lsn.
42
 *    * If COMMIT occurs before checkpoint then backend reads data from WAL
43
 *      using prepare_start_lsn.
44
 *    * On checkpoint state data copied to files in pg_twophase directory and
45
 *      fsynced
46
 *    * If COMMIT happens after checkpoint then backend reads state data from
47
 *      files
48
 *
49
 *    During replay and replication, TwoPhaseState also holds information
50
 *    about active prepared transactions that haven't been moved to disk yet.
51
 *
52
 *    Replay of twophase records happens by the following rules:
53
 *
54
 *    * At the beginning of recovery, pg_twophase is scanned once, filling
55
 *      TwoPhaseState with entries marked with gxact->inredo and
56
 *      gxact->ondisk.  Two-phase file data older than the XID horizon of
57
 *      the redo position are discarded.
58
 *    * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59
 *      gxact->inredo is set to true for such entries.
60
 *    * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61
 *      that have gxact->inredo set and are behind the redo_horizon. We
62
 *      save them to disk and then switch gxact->ondisk to true.
63
 *    * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64
 *      If gxact->ondisk is true, the corresponding entry from the disk
65
 *      is additionally deleted.
66
 *    * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67
 *      and PrescanPreparedTransactions() have been modified to go through
68
 *      gxact->inredo entries that have not made it to disk.
69
 *
70
 *-------------------------------------------------------------------------
71
 */
72
#include "postgres.h"
73
74
#include <fcntl.h>
75
#include <sys/stat.h>
76
#include <time.h>
77
#include <unistd.h>
78
79
#include "access/commit_ts.h"
80
#include "access/htup_details.h"
81
#include "access/subtrans.h"
82
#include "access/transam.h"
83
#include "access/twophase.h"
84
#include "access/twophase_rmgr.h"
85
#include "access/xact.h"
86
#include "access/xlog.h"
87
#include "access/xloginsert.h"
88
#include "access/xlogutils.h"
89
#include "access/xlogreader.h"
90
#include "catalog/pg_type.h"
91
#include "catalog/storage.h"
92
#include "funcapi.h"
93
#include "miscadmin.h"
94
#include "pg_trace.h"
95
#include "pgstat.h"
96
#include "replication/origin.h"
97
#include "replication/syncrep.h"
98
#include "replication/walsender.h"
99
#include "storage/fd.h"
100
#include "storage/ipc.h"
101
#include "storage/predicate.h"
102
#include "storage/proc.h"
103
#include "storage/procarray.h"
104
#include "storage/sinvaladt.h"
105
#include "storage/smgr.h"
106
#include "utils/builtins.h"
107
#include "utils/memutils.h"
108
#include "utils/timestamp.h"
109
110
111
/*
112
 * Directory where Two-phase commit files reside within PGDATA
113
 */
114
15.9k
#define TWOPHASE_DIR "pg_twophase"
115
116
/* GUC variable, can't be changed after startup */
117
int     max_prepared_xacts = 0;
118
119
/*
120
 * This struct describes one global transaction that is in prepared state
121
 * or attempting to become prepared.
122
 *
123
 * The lifecycle of a global transaction is:
124
 *
125
 * 1. After checking that the requested GID is not in use, set up an entry in
126
 * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127
 * and mark it as locked by my backend.
128
 *
129
 * 2. After successfully completing prepare, set valid = true and enter the
130
 * referenced PGPROC into the global ProcArray.
131
 *
132
 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133
 * valid and not locked, then mark the entry as locked by storing my current
134
 * backend ID into locking_backend.  This prevents concurrent attempts to
135
 * commit or rollback the same prepared xact.
136
 *
137
 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138
 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139
 * the freelist.
140
 *
141
 * Note that if the preparing transaction fails between steps 1 and 2, the
142
 * entry must be removed so that the GID and the GlobalTransaction struct
143
 * can be reused.  See AtAbort_Twophase().
144
 *
145
 * typedef struct GlobalTransactionData *GlobalTransaction appears in
146
 * twophase.h
147
 */
148
149
typedef struct GlobalTransactionData
150
{
151
  GlobalTransaction next;   /* list link for free list */
152
  int     pgprocno;   /* ID of associated dummy PGPROC */
153
  BackendId dummyBackendId; /* similar to backend id for backends */
154
  TimestampTz prepared_at;  /* time of preparation */
155
156
  /*
157
   * Note that we need to keep track of two LSNs for each GXACT. We keep
158
   * track of the start LSN because this is the address we must use to read
159
   * state data back from WAL when committing a prepared GXACT. We keep
160
   * track of the end LSN because that is the LSN we need to wait for prior
161
   * to commit.
162
   */
163
  XLogRecPtr  prepare_start_lsn;  /* XLOG offset of prepare record start */
164
  XLogRecPtr  prepare_end_lsn;  /* XLOG offset of prepare record end */
165
  TransactionId xid;      /* The GXACT id */
166
167
  Oid     owner;      /* ID of user that executed the xact */
168
  BackendId locking_backend;  /* backend currently working on the xact */
169
  bool    valid;      /* true if PGPROC entry is in proc array */
170
  bool    ondisk;     /* true if prepare state file is on disk */
171
  bool    inredo;     /* true if entry was added via xlog_redo */
172
  char    gid[GIDSIZE]; /* The GID assigned to the prepared xact */
173
}     GlobalTransactionData;
174
175
/*
176
 * Two Phase Commit shared state.  Access to this struct is protected
177
 * by TwoPhaseStateLock.
178
 */
179
typedef struct TwoPhaseStateData
180
{
181
  /* Head of linked list of free GlobalTransactionData structs */
182
  GlobalTransaction freeGXacts;
183
184
  /* Number of valid prepXacts entries. */
185
  int     numPrepXacts;
186
187
  /* There are max_prepared_xacts items in this array */
188
  GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
189
} TwoPhaseStateData;
190
191
static TwoPhaseStateData *TwoPhaseState;
192
193
/*
194
 * Global transaction entry currently locked by us, if any.  Note that any
195
 * access to the entry pointed to by this variable must be protected by
196
 * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
197
 * (since it's just local memory).
198
 */
199
static GlobalTransaction MyLockedGxact = NULL;
200
201
static bool twophaseExitRegistered = false;
202
203
static void RecordTransactionCommitPrepared(TransactionId xid,
204
                int nchildren,
205
                TransactionId *children,
206
                int nrels,
207
                RelFileNode *rels,
208
                int ninvalmsgs,
209
                SharedInvalidationMessage *invalmsgs,
210
                bool initfileinval,
211
                const char *gid);
212
static void RecordTransactionAbortPrepared(TransactionId xid,
213
                 int nchildren,
214
                 TransactionId *children,
215
                 int nrels,
216
                 RelFileNode *rels,
217
                 const char *gid);
218
static void ProcessRecords(char *bufptr, TransactionId xid,
219
         const TwoPhaseCallback callbacks[]);
220
static void RemoveGXact(GlobalTransaction gxact);
221
222
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
223
static char *ProcessTwoPhaseBuffer(TransactionId xid,
224
            XLogRecPtr prepare_start_lsn,
225
            bool fromdisk, bool setParent, bool setNextXid);
226
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
227
          const char *gid, TimestampTz prepared_at, Oid owner,
228
          Oid databaseid);
229
static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
230
static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
231
232
/*
233
 * Initialization of shared memory
234
 */
235
Size
236
TwoPhaseShmemSize(void)
237
16.0k
{
238
16.0k
  Size    size;
239
240
  /* Need the fixed struct, the array of pointers, and the GTD structs */
241
16.0k
  size = offsetof(TwoPhaseStateData, prepXacts);
242
16.0k
  size = add_size(size, mul_size(max_prepared_xacts,
243
16.0k
                   sizeof(GlobalTransaction)));
244
16.0k
  size = MAXALIGN(size);
245
16.0k
  size = add_size(size, mul_size(max_prepared_xacts,
246
16.0k
                   sizeof(GlobalTransactionData)));
247
248
16.0k
  return size;
249
16.0k
}
250
251
void
252
TwoPhaseShmemInit(void)
253
8.01k
{
254
8.01k
  bool    found;
255
256
8.01k
  TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
257
8.01k
                  TwoPhaseShmemSize(),
258
8.01k
                  &found);
259
8.01k
  if (!IsUnderPostmaster)
260
8.01k
  {
261
8.01k
    GlobalTransaction gxacts;
262
8.01k
    int     i;
263
264
8.01k
    Assert(!found);
265
8.01k
    TwoPhaseState->freeGXacts = NULL;
266
8.01k
    TwoPhaseState->numPrepXacts = 0;
267
268
    /*
269
     * Initialize the linked list of free GlobalTransactionData structs
270
     */
271
8.01k
    gxacts = (GlobalTransaction)
272
8.01k
      ((char *) TwoPhaseState +
273
8.01k
       MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
274
8.01k
            sizeof(GlobalTransaction) * max_prepared_xacts));
275
8.01k
    for (i = 0; i < max_prepared_xacts; 
i++0
)
276
0
    {
277
      /* insert into linked list */
278
0
      gxacts[i].next = TwoPhaseState->freeGXacts;
279
0
      TwoPhaseState->freeGXacts = &gxacts[i];
280
281
      /* associate it with a PGPROC assigned by InitProcGlobal */
282
0
      gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
283
284
      /*
285
       * Assign a unique ID for each dummy proc, so that the range of
286
       * dummy backend IDs immediately follows the range of normal
287
       * backend IDs. We don't dare to assign a real backend ID to dummy
288
       * procs, because prepared transactions don't take part in cache
289
       * invalidation like a real backend ID would imply, but having a
290
       * unique ID for them is nevertheless handy. This arrangement
291
       * allows you to allocate an array of size (MaxBackends +
292
       * max_prepared_xacts + 1), and have a slot for every backend and
293
       * prepared transaction. Currently multixact.c uses that
294
       * technique.
295
       */
296
0
      gxacts[i].dummyBackendId = MaxBackends + 1 + i;
297
0
    }
298
8.01k
  }
299
0
  else
300
8.01k
    Assert(found);
301
8.01k
}
302
303
/*
304
 * Exit hook to unlock the global transaction entry we're working on.
305
 */
306
static void
307
AtProcExit_Twophase(int code, Datum arg)
308
0
{
309
  /* same logic as abort */
310
0
  AtAbort_Twophase();
311
0
}
312
313
/*
314
 * Abort hook to unlock the global transaction entry we're working on.
315
 */
316
void
317
AtAbort_Twophase(void)
318
50.6k
{
319
50.6k
  if (MyLockedGxact == NULL)
320
50.6k
    return;
321
322
  /*
323
   * What to do with the locked global transaction entry?  If we were in the
324
   * process of preparing the transaction, but haven't written the WAL
325
   * record and state file yet, the transaction must not be considered as
326
   * prepared.  Likewise, if we are in the process of finishing an
327
   * already-prepared transaction, and fail after having already written the
328
   * 2nd phase commit or rollback record to the WAL, the transaction should
329
   * not be considered as prepared anymore.  In those cases, just remove the
330
   * entry from shared memory.
331
   *
332
   * Otherwise, the entry must be left in place so that the transaction can
333
   * be finished later, so just unlock it.
334
   *
335
   * If we abort during prepare, after having written the WAL record, we
336
   * might not have transferred all locks and other state to the prepared
337
   * transaction yet.  Likewise, if we abort during commit or rollback,
338
   * after having written the WAL record, we might not have released all the
339
   * resources held by the transaction yet.  In those cases, the in-memory
340
   * state can be wrong, but it's too late to back out.
341
   */
342
2
  LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
343
2
  if (!MyLockedGxact->valid)
344
0
    RemoveGXact(MyLockedGxact);
345
2
  else
346
2
    MyLockedGxact->locking_backend = InvalidBackendId;
347
2
  LWLockRelease(TwoPhaseStateLock);
348
349
2
  MyLockedGxact = NULL;
350
2
}
351
352
/*
353
 * This is called after we have finished transferring state to the prepared
354
 * PGXACT entry.
355
 */
356
void
357
PostPrepare_Twophase(void)
358
0
{
359
0
  LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
360
0
  MyLockedGxact->locking_backend = InvalidBackendId;
361
0
  LWLockRelease(TwoPhaseStateLock);
362
363
0
  MyLockedGxact = NULL;
364
0
}
365
366
367
/*
368
 * MarkAsPreparing
369
 *    Reserve the GID for the given transaction.
370
 */
371
GlobalTransaction
372
MarkAsPreparing(TransactionId xid, const char *gid,
373
        TimestampTz prepared_at, Oid owner, Oid databaseid)
374
0
{
375
0
  GlobalTransaction gxact;
376
0
  int     i;
377
378
0
  if (strlen(gid) >= GIDSIZE)
379
0
    ereport(ERROR,
380
0
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
381
0
         errmsg("transaction identifier \"%s\" is too long",
382
0
            gid)));
383
384
  /* fail immediately if feature is disabled */
385
0
  if (max_prepared_xacts == 0)
386
0
    ereport(ERROR,
387
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
388
0
         errmsg("prepared transactions are disabled"),
389
0
         errhint("Set max_prepared_transactions to a nonzero value.")));
390
391
  /* on first call, register the exit hook */
392
0
  if (!twophaseExitRegistered)
393
0
  {
394
0
    before_shmem_exit(AtProcExit_Twophase, 0);
395
0
    twophaseExitRegistered = true;
396
0
  }
397
398
0
  LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
399
400
  /* Check for conflicting GID */
401
0
  for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
402
0
  {
403
0
    gxact = TwoPhaseState->prepXacts[i];
404
0
    if (strcmp(gxact->gid, gid) == 0)
405
0
    {
406
0
      ereport(ERROR,
407
0
          (errcode(ERRCODE_DUPLICATE_OBJECT),
408
0
           errmsg("transaction identifier \"%s\" is already in use",
409
0
              gid)));
410
0
    }
411
0
  }
412
413
  /* Get a free gxact from the freelist */
414
0
  if (TwoPhaseState->freeGXacts == NULL)
415
0
    ereport(ERROR,
416
0
        (errcode(ERRCODE_OUT_OF_MEMORY),
417
0
         errmsg("maximum number of prepared transactions reached"),
418
0
         errhint("Increase max_prepared_transactions (currently %d).",
419
0
             max_prepared_xacts)));
420
0
  gxact = TwoPhaseState->freeGXacts;
421
0
  TwoPhaseState->freeGXacts = gxact->next;
422
423
0
  MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
424
425
0
  gxact->ondisk = false;
426
427
  /* And insert it into the active array */
428
0
  Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
429
0
  TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
430
431
0
  LWLockRelease(TwoPhaseStateLock);
432
433
0
  return gxact;
434
0
}
435
436
/*
437
 * MarkAsPreparingGuts
438
 *
439
 * This uses a gxact struct and puts it into the active array.
440
 * NOTE: this is also used when reloading a gxact after a crash; so avoid
441
 * assuming that we can use very much backend context.
442
 *
443
 * Note: This function should be called with appropriate locks held.
444
 */
445
static void
446
MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
447
          TimestampTz prepared_at, Oid owner, Oid databaseid)
448
0
{
449
0
  PGPROC     *proc;
450
0
  PGXACT     *pgxact;
451
0
  int     i;
452
453
0
  Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
454
455
0
  Assert(gxact != NULL);
456
0
  proc = &ProcGlobal->allProcs[gxact->pgprocno];
457
0
  pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
458
459
  /* Initialize the PGPROC entry */
460
0
  MemSet(proc, 0, sizeof(PGPROC));
461
0
  proc->pgprocno = gxact->pgprocno;
462
0
  SHMQueueElemInit(&(proc->links));
463
0
  proc->waitStatus = STATUS_OK;
464
  /* We set up the gxact's VXID as InvalidBackendId/XID */
465
0
  proc->lxid = (LocalTransactionId) xid;
466
0
  pgxact->xid = xid;
467
0
  pgxact->xmin = InvalidTransactionId;
468
0
  pgxact->delayChkpt = false;
469
0
  pgxact->vacuumFlags = 0;
470
0
  proc->pid = 0;
471
0
  proc->backendId = InvalidBackendId;
472
0
  proc->databaseId = databaseid;
473
0
  proc->roleId = owner;
474
0
  proc->tempNamespaceId = InvalidOid;
475
0
  proc->isBackgroundWorker = false;
476
0
  proc->lwWaiting = false;
477
0
  proc->lwWaitMode = 0;
478
0
  proc->waitLock = NULL;
479
0
  proc->waitProcLock = NULL;
480
0
  for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
481
0
    SHMQueueInit(&(proc->myProcLocks[i]));
482
  /* subxid data must be filled later by GXactLoadSubxactData */
483
0
  pgxact->overflowed = false;
484
0
  pgxact->nxids = 0;
485
486
0
  gxact->prepared_at = prepared_at;
487
0
  gxact->xid = xid;
488
0
  gxact->owner = owner;
489
0
  gxact->locking_backend = MyBackendId;
490
0
  gxact->valid = false;
491
0
  gxact->inredo = false;
492
0
  strcpy(gxact->gid, gid);
493
494
  /*
495
   * Remember that we have this GlobalTransaction entry locked for us. If we
496
   * abort after this, we must release it.
497
   */
498
0
  MyLockedGxact = gxact;
499
0
}
500
501
/*
502
 * GXactLoadSubxactData
503
 *
504
 * If the transaction being persisted had any subtransactions, this must
505
 * be called before MarkAsPrepared() to load information into the dummy
506
 * PGPROC.
507
 */
508
static void
509
GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
510
           TransactionId *children)
511
0
{
512
0
  PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
513
0
  PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
514
515
  /* We need no extra lock since the GXACT isn't valid yet */
516
0
  if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
517
0
  {
518
0
    pgxact->overflowed = true;
519
0
    nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
520
0
  }
521
0
  if (nsubxacts > 0)
522
0
  {
523
0
    memcpy(proc->subxids.xids, children,
524
0
         nsubxacts * sizeof(TransactionId));
525
0
    pgxact->nxids = nsubxacts;
526
0
  }
527
0
}
528
529
/*
530
 * MarkAsPrepared
531
 *    Mark the GXACT as fully valid, and enter it into the global ProcArray.
532
 *
533
 * lock_held indicates whether caller already holds TwoPhaseStateLock.
534
 */
535
static void
536
MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
537
0
{
538
  /* Lock here may be overkill, but I'm not convinced of that ... */
539
0
  if (!lock_held)
540
0
    LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
541
0
  Assert(!gxact->valid);
542
0
  gxact->valid = true;
543
0
  if (!lock_held)
544
0
    LWLockRelease(TwoPhaseStateLock);
545
546
  /*
547
   * Put it into the global ProcArray so TransactionIdIsInProgress considers
548
   * the XID as still running.
549
   */
550
0
  ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
551
0
}
552
553
/*
554
 * LockGXact
555
 *    Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
556
 */
557
static GlobalTransaction
558
LockGXact(const char *gid, Oid user)
559
0
{
560
0
  int     i;
561
562
  /* on first call, register the exit hook */
563
0
  if (!twophaseExitRegistered)
564
0
  {
565
0
    before_shmem_exit(AtProcExit_Twophase, 0);
566
0
    twophaseExitRegistered = true;
567
0
  }
568
569
0
  LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
570
571
0
  for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
572
0
  {
573
0
    GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
574
0
    PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
575
576
    /* Ignore not-yet-valid GIDs */
577
0
    if (!gxact->valid)
578
0
      continue;
579
0
    if (strcmp(gxact->gid, gid) != 0)
580
0
      continue;
581
582
    /* Found it, but has someone else got it locked? */
583
0
    if (gxact->locking_backend != InvalidBackendId)
584
0
      ereport(ERROR,
585
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
586
0
           errmsg("prepared transaction with identifier \"%s\" is busy",
587
0
              gid)));
588
589
0
    if (user != gxact->owner && !superuser_arg(user))
590
0
      ereport(ERROR,
591
0
          (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
592
0
           errmsg("permission denied to finish prepared transaction"),
593
0
           errhint("Must be superuser or the user that prepared the transaction.")));
594
595
    /*
596
     * Note: it probably would be possible to allow committing from
597
     * another database; but at the moment NOTIFY is known not to work and
598
     * there may be some other issues as well.  Hence disallow until
599
     * someone gets motivated to make it work.
600
     */
601
0
    if (MyDatabaseId != proc->databaseId)
602
0
      ereport(ERROR,
603
0
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
604
0
           errmsg("prepared transaction belongs to another database"),
605
0
           errhint("Connect to the database where the transaction was prepared to finish it.")));
606
607
    /* OK for me to lock it */
608
0
    gxact->locking_backend = MyBackendId;
609
0
    MyLockedGxact = gxact;
610
611
0
    LWLockRelease(TwoPhaseStateLock);
612
613
0
    return gxact;
614
0
  }
615
616
0
  LWLockRelease(TwoPhaseStateLock);
617
618
0
  ereport(ERROR,
619
0
      (errcode(ERRCODE_UNDEFINED_OBJECT),
620
0
       errmsg("prepared transaction with identifier \"%s\" does not exist",
621
0
          gid)));
622
623
  /* NOTREACHED */
624
0
  return NULL;
625
0
}
626
627
/*
628
 * RemoveGXact
629
 *    Remove the prepared transaction from the shared memory array.
630
 *
631
 * NB: caller should have already removed it from ProcArray
632
 */
633
static void
634
RemoveGXact(GlobalTransaction gxact)
635
0
{
636
0
  int     i;
637
638
0
  Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
639
640
0
  for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
641
0
  {
642
0
    if (gxact == TwoPhaseState->prepXacts[i])
643
0
    {
644
      /* remove from the active array */
645
0
      TwoPhaseState->numPrepXacts--;
646
0
      TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
647
648
      /* and put it back in the freelist */
649
0
      gxact->next = TwoPhaseState->freeGXacts;
650
0
      TwoPhaseState->freeGXacts = gxact;
651
652
0
      return;
653
0
    }
654
0
  }
655
656
0
  elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
657
0
}
658
659
/*
660
 * Returns an array of all prepared transactions for the user-level
661
 * function pg_prepared_xact.
662
 *
663
 * The returned array and all its elements are copies of internal data
664
 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
665
 *
666
 * WARNING -- we return even those transactions that are not fully prepared
667
 * yet.  The caller should filter them out if he doesn't want them.
668
 *
669
 * The returned array is palloc'd.
670
 */
671
static int
672
GetPreparedTransactionList(GlobalTransaction *gxacts)
673
0
{
674
0
  GlobalTransaction array;
675
0
  int     num;
676
0
  int     i;
677
678
0
  LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
679
680
0
  if (TwoPhaseState->numPrepXacts == 0)
681
0
  {
682
0
    LWLockRelease(TwoPhaseStateLock);
683
684
0
    *gxacts = NULL;
685
0
    return 0;
686
0
  }
687
688
0
  num = TwoPhaseState->numPrepXacts;
689
0
  array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
690
0
  *gxacts = array;
691
0
  for (i = 0; i < num; i++)
692
0
    memcpy(array + i, TwoPhaseState->prepXacts[i],
693
0
         sizeof(GlobalTransactionData));
694
695
0
  LWLockRelease(TwoPhaseStateLock);
696
697
0
  return num;
698
0
}
699
700
701
/* Working status for pg_prepared_xact */
702
typedef struct
703
{
704
  GlobalTransaction array;
705
  int     ngxacts;
706
  int     currIdx;
707
} Working_State;
708
709
/*
710
 * pg_prepared_xact
711
 *    Produce a view with one row per prepared transaction.
712
 *
713
 * This function is here so we don't have to export the
714
 * GlobalTransactionData struct definition.
715
 */
716
Datum
717
pg_prepared_xact(PG_FUNCTION_ARGS)
718
0
{
719
0
  FuncCallContext *funcctx;
720
0
  Working_State *status;
721
722
0
  if (SRF_IS_FIRSTCALL())
723
0
  {
724
0
    TupleDesc tupdesc;
725
0
    MemoryContext oldcontext;
726
727
    /* create a function context for cross-call persistence */
728
0
    funcctx = SRF_FIRSTCALL_INIT();
729
730
    /*
731
     * Switch to memory context appropriate for multiple function calls
732
     */
733
0
    oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
734
735
    /* build tupdesc for result tuples */
736
    /* this had better match pg_prepared_xacts view in system_views.sql */
737
0
    tupdesc = CreateTemplateTupleDesc(5, false);
738
0
    TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
739
0
               XIDOID, -1, 0);
740
0
    TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
741
0
               TEXTOID, -1, 0);
742
0
    TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
743
0
               TIMESTAMPTZOID, -1, 0);
744
0
    TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
745
0
               OIDOID, -1, 0);
746
0
    TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
747
0
               OIDOID, -1, 0);
748
749
0
    funcctx->tuple_desc = BlessTupleDesc(tupdesc);
750
751
    /*
752
     * Collect all the 2PC status information that we will format and send
753
     * out as a result set.
754
     */
755
0
    status = (Working_State *) palloc(sizeof(Working_State));
756
0
    funcctx->user_fctx = (void *) status;
757
758
0
    status->ngxacts = GetPreparedTransactionList(&status->array);
759
0
    status->currIdx = 0;
760
761
0
    MemoryContextSwitchTo(oldcontext);
762
0
  }
763
764
0
  funcctx = SRF_PERCALL_SETUP();
765
0
  status = (Working_State *) funcctx->user_fctx;
766
767
0
  while (status->array != NULL && status->currIdx < status->ngxacts)
768
0
  {
769
0
    GlobalTransaction gxact = &status->array[status->currIdx++];
770
0
    PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
771
0
    PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
772
0
    Datum   values[5];
773
0
    bool    nulls[5];
774
0
    HeapTuple tuple;
775
0
    Datum   result;
776
777
0
    if (!gxact->valid)
778
0
      continue;
779
780
    /*
781
     * Form tuple with appropriate data.
782
     */
783
0
    MemSet(values, 0, sizeof(values));
784
0
    MemSet(nulls, 0, sizeof(nulls));
785
786
0
    values[0] = TransactionIdGetDatum(pgxact->xid);
787
0
    values[1] = CStringGetTextDatum(gxact->gid);
788
0
    values[2] = TimestampTzGetDatum(gxact->prepared_at);
789
0
    values[3] = ObjectIdGetDatum(gxact->owner);
790
0
    values[4] = ObjectIdGetDatum(proc->databaseId);
791
792
0
    tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
793
0
    result = HeapTupleGetDatum(tuple);
794
0
    SRF_RETURN_NEXT(funcctx, result);
795
0
  }
796
797
0
  SRF_RETURN_DONE(funcctx);
798
0
}
799
800
/*
801
 * TwoPhaseGetGXact
802
 *    Get the GlobalTransaction struct for a prepared transaction
803
 *    specified by XID
804
 */
805
static GlobalTransaction
806
TwoPhaseGetGXact(TransactionId xid)
807
0
{
808
0
  GlobalTransaction result = NULL;
809
0
  int     i;
810
811
0
  static TransactionId cached_xid = InvalidTransactionId;
812
0
  static GlobalTransaction cached_gxact = NULL;
813
814
  /*
815
   * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
816
   * repeatedly for the same XID.  We can save work with a simple cache.
817
   */
818
0
  if (xid == cached_xid)
819
0
    return cached_gxact;
820
821
0
  LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
822
823
0
  for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
824
0
  {
825
0
    GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
826
0
    PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
827
828
0
    if (pgxact->xid == xid)
829
0
    {
830
0
      result = gxact;
831
0
      break;
832
0
    }
833
0
  }
834
835
0
  LWLockRelease(TwoPhaseStateLock);
836
837
0
  if (result == NULL)     /* should not happen */
838
0
    elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
839
840
0
  cached_xid = xid;
841
0
  cached_gxact = result;
842
843
0
  return result;
844
0
}
845
846
/*
847
 * TwoPhaseGetDummyProc
848
 *    Get the dummy backend ID for prepared transaction specified by XID
849
 *
850
 * Dummy backend IDs are similar to real backend IDs of real backends.
851
 * They start at MaxBackends + 1, and are unique across all currently active
852
 * real backends and prepared transactions.
853
 */
854
BackendId
855
TwoPhaseGetDummyBackendId(TransactionId xid)
856
0
{
857
0
  GlobalTransaction gxact = TwoPhaseGetGXact(xid);
858
859
0
  return gxact->dummyBackendId;
860
0
}
861
862
/*
863
 * TwoPhaseGetDummyProc
864
 *    Get the PGPROC that represents a prepared transaction specified by XID
865
 */
866
PGPROC *
867
TwoPhaseGetDummyProc(TransactionId xid)
868
0
{
869
0
  GlobalTransaction gxact = TwoPhaseGetGXact(xid);
870
871
0
  return &ProcGlobal->allProcs[gxact->pgprocno];
872
0
}
873
874
/************************************************************************/
875
/* State file support                         */
876
/************************************************************************/
877
878
#define TwoPhaseFilePath(path, xid) \
879
0
  snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
880
881
/*
882
 * 2PC state file format:
883
 *
884
 *  1. TwoPhaseFileHeader
885
 *  2. TransactionId[] (subtransactions)
886
 *  3. RelFileNode[] (files to be deleted at commit)
887
 *  4. RelFileNode[] (files to be deleted at abort)
888
 *  5. SharedInvalidationMessage[] (inval messages to be sent at commit)
889
 *  6. TwoPhaseRecordOnDisk
890
 *  7. ...
891
 *  8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
892
 *  9. checksum (CRC-32C)
893
 *
894
 * Each segment except the final checksum is MAXALIGN'd.
895
 */
896
897
/*
898
 * Header for a 2PC state file
899
 */
900
0
#define TWOPHASE_MAGIC  0x57F94534  /* format identifier */
901
902
typedef struct TwoPhaseFileHeader
903
{
904
  uint32    magic;      /* format identifier */
905
  uint32    total_len;    /* actual file length */
906
  TransactionId xid;      /* original transaction XID */
907
  Oid     database;   /* OID of database it was in */
908
  TimestampTz prepared_at;  /* time of preparation */
909
  Oid     owner;      /* user running the transaction */
910
  int32   nsubxacts;    /* number of following subxact XIDs */
911
  int32   ncommitrels;  /* number of delete-on-commit rels */
912
  int32   nabortrels;   /* number of delete-on-abort rels */
913
  int32   ninvalmsgs;   /* number of cache invalidation messages */
914
  bool    initfileinval;  /* does relcache init file need invalidation? */
915
  uint16    gidlen;     /* length of the GID - GID follows the header */
916
  XLogRecPtr  origin_lsn;   /* lsn of this record at origin node */
917
  TimestampTz origin_timestamp; /* time of prepare at origin node */
918
} TwoPhaseFileHeader;
919
920
/*
921
 * Header for each record in a state file
922
 *
923
 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
924
 * The rmgr data will be stored starting on a MAXALIGN boundary.
925
 */
926
typedef struct TwoPhaseRecordOnDisk
927
{
928
  uint32    len;      /* length of rmgr data */
929
  TwoPhaseRmgrId rmid;    /* resource manager for this record */
930
  uint16    info;     /* flag bits for use by rmgr */
931
} TwoPhaseRecordOnDisk;
932
933
/*
934
 * During prepare, the state file is assembled in memory before writing it
935
 * to WAL and the actual state file.  We use a chain of StateFileChunk blocks
936
 * for that.
937
 */
938
typedef struct StateFileChunk
939
{
940
  char     *data;
941
  uint32    len;
942
  struct StateFileChunk *next;
943
} StateFileChunk;
944
945
static struct xllist
946
{
947
  StateFileChunk *head;   /* first data block in the chain */
948
  StateFileChunk *tail;   /* last block in chain */
949
  uint32    num_chunks;
950
  uint32    bytes_free;   /* free bytes left in tail block */
951
  uint32    total_len;    /* total data bytes in chain */
952
}     records;
953
954
955
/*
956
 * Append a block of data to records data structure.
957
 *
958
 * NB: each block is padded to a MAXALIGN multiple.  This must be
959
 * accounted for when the file is later read!
960
 *
961
 * The data is copied, so the caller is free to modify it afterwards.
962
 */
963
static void
964
save_state_data(const void *data, uint32 len)
965
0
{
966
0
  uint32    padlen = MAXALIGN(len);
967
968
0
  if (padlen > records.bytes_free)
969
0
  {
970
0
    records.tail->next = palloc0(sizeof(StateFileChunk));
971
0
    records.tail = records.tail->next;
972
0
    records.tail->len = 0;
973
0
    records.tail->next = NULL;
974
0
    records.num_chunks++;
975
976
0
    records.bytes_free = Max(padlen, 512);
977
0
    records.tail->data = palloc(records.bytes_free);
978
0
  }
979
980
0
  memcpy(((char *) records.tail->data) + records.tail->len, data, len);
981
0
  records.tail->len += padlen;
982
0
  records.bytes_free -= padlen;
983
0
  records.total_len += padlen;
984
0
}
985
986
/*
987
 * Start preparing a state file.
988
 *
989
 * Initializes data structure and inserts the 2PC file header record.
990
 */
991
void
992
StartPrepare(GlobalTransaction gxact)
993
0
{
994
0
  PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
995
0
  PGXACT     *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
996
0
  TransactionId xid = pgxact->xid;
997
0
  TwoPhaseFileHeader hdr;
998
0
  TransactionId *children;
999
0
  RelFileNode *commitrels;
1000
0
  RelFileNode *abortrels;
1001
0
  SharedInvalidationMessage *invalmsgs;
1002
1003
  /* Initialize linked list */
1004
0
  records.head = palloc0(sizeof(StateFileChunk));
1005
0
  records.head->len = 0;
1006
0
  records.head->next = NULL;
1007
1008
0
  records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1009
0
  records.head->data = palloc(records.bytes_free);
1010
1011
0
  records.tail = records.head;
1012
0
  records.num_chunks = 1;
1013
1014
0
  records.total_len = 0;
1015
1016
  /* Create header */
1017
0
  hdr.magic = TWOPHASE_MAGIC;
1018
0
  hdr.total_len = 0;      /* EndPrepare will fill this in */
1019
0
  hdr.xid = xid;
1020
0
  hdr.database = proc->databaseId;
1021
0
  hdr.prepared_at = gxact->prepared_at;
1022
0
  hdr.owner = gxact->owner;
1023
0
  hdr.nsubxacts = xactGetCommittedChildren(&children);
1024
0
  hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1025
0
  hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1026
0
  hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1027
0
                              &hdr.initfileinval);
1028
0
  hdr.gidlen = strlen(gxact->gid) + 1;  /* Include '\0' */
1029
1030
0
  save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1031
0
  save_state_data(gxact->gid, hdr.gidlen);
1032
1033
  /*
1034
   * Add the additional info about subxacts, deletable files and cache
1035
   * invalidation messages.
1036
   */
1037
0
  if (hdr.nsubxacts > 0)
1038
0
  {
1039
0
    save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1040
    /* While we have the child-xact data, stuff it in the gxact too */
1041
0
    GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1042
0
  }
1043
0
  if (hdr.ncommitrels > 0)
1044
0
  {
1045
0
    save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
1046
0
    pfree(commitrels);
1047
0
  }
1048
0
  if (hdr.nabortrels > 0)
1049
0
  {
1050
0
    save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
1051
0
    pfree(abortrels);
1052
0
  }
1053
0
  if (hdr.ninvalmsgs > 0)
1054
0
  {
1055
0
    save_state_data(invalmsgs,
1056
0
            hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1057
0
    pfree(invalmsgs);
1058
0
  }
1059
0
}
1060
1061
/*
1062
 * Finish preparing state data and writing it to WAL.
1063
 */
1064
void
1065
EndPrepare(GlobalTransaction gxact)
1066
0
{
1067
0
  TwoPhaseFileHeader *hdr;
1068
0
  StateFileChunk *record;
1069
0
  bool    replorigin;
1070
1071
  /* Add the end sentinel to the list of 2PC records */
1072
0
  RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1073
0
               NULL, 0);
1074
1075
  /* Go back and fill in total_len in the file header record */
1076
0
  hdr = (TwoPhaseFileHeader *) records.head->data;
1077
0
  Assert(hdr->magic == TWOPHASE_MAGIC);
1078
0
  hdr->total_len = records.total_len + sizeof(pg_crc32c);
1079
1080
0
  replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1081
0
          replorigin_session_origin != DoNotReplicateId);
1082
1083
0
  if (replorigin)
1084
0
  {
1085
0
    Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
1086
0
    hdr->origin_lsn = replorigin_session_origin_lsn;
1087
0
    hdr->origin_timestamp = replorigin_session_origin_timestamp;
1088
0
  }
1089
0
  else
1090
0
  {
1091
0
    hdr->origin_lsn = InvalidXLogRecPtr;
1092
0
    hdr->origin_timestamp = 0;
1093
0
  }
1094
1095
  /*
1096
   * If the data size exceeds MaxAllocSize, we won't be able to read it in
1097
   * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1098
   * where we write data to file and then re-read at commit time.
1099
   */
1100
0
  if (hdr->total_len > MaxAllocSize)
1101
0
    ereport(ERROR,
1102
0
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1103
0
         errmsg("two-phase state file maximum length exceeded")));
1104
1105
  /*
1106
   * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1107
   * cover us, so no need to calculate a separate CRC.
1108
   *
1109
   * We have to set delayChkpt here, too; otherwise a checkpoint starting
1110
   * immediately after the WAL record is inserted could complete without
1111
   * fsync'ing our state file.  (This is essentially the same kind of race
1112
   * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1113
   * uses delayChkpt for; see notes there.)
1114
   *
1115
   * We save the PREPARE record's location in the gxact for later use by
1116
   * CheckPointTwoPhase.
1117
   */
1118
0
  XLogEnsureRecordSpace(0, records.num_chunks);
1119
1120
0
  START_CRIT_SECTION();
1121
1122
0
  MyPgXact->delayChkpt = true;
1123
1124
0
  XLogBeginInsert();
1125
0
  for (record = records.head; record != NULL; record = record->next)
1126
0
    XLogRegisterData(record->data, record->len);
1127
1128
0
  XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1129
1130
0
  gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1131
1132
0
  if (replorigin)
1133
0
  {
1134
    /* Move LSNs forward for this replication origin */
1135
0
    replorigin_session_advance(replorigin_session_origin_lsn,
1136
0
                   gxact->prepare_end_lsn);
1137
0
  }
1138
1139
0
  XLogFlush(gxact->prepare_end_lsn);
1140
1141
  /* If we crash now, we have prepared: WAL replay will fix things */
1142
1143
  /* Store record's start location to read that later on Commit */
1144
0
  gxact->prepare_start_lsn = ProcLastRecPtr;
1145
1146
  /*
1147
   * Mark the prepared transaction as valid.  As soon as xact.c marks
1148
   * MyPgXact as not running our XID (which it will do immediately after
1149
   * this function returns), others can commit/rollback the xact.
1150
   *
1151
   * NB: a side effect of this is to make a dummy ProcArray entry for the
1152
   * prepared XID.  This must happen before we clear the XID from MyPgXact,
1153
   * else there is a window where the XID is not running according to
1154
   * TransactionIdIsInProgress, and onlookers would be entitled to assume
1155
   * the xact crashed.  Instead we have a window where the same XID appears
1156
   * twice in ProcArray, which is OK.
1157
   */
1158
0
  MarkAsPrepared(gxact, false);
1159
1160
  /*
1161
   * Now we can mark ourselves as out of the commit critical section: a
1162
   * checkpoint starting after this will certainly see the gxact as a
1163
   * candidate for fsyncing.
1164
   */
1165
0
  MyPgXact->delayChkpt = false;
1166
1167
  /*
1168
   * Remember that we have this GlobalTransaction entry locked for us.  If
1169
   * we crash after this point, it's too late to abort, but we must unlock
1170
   * it so that the prepared transaction can be committed or rolled back.
1171
   */
1172
0
  MyLockedGxact = gxact;
1173
1174
0
  END_CRIT_SECTION();
1175
1176
  /*
1177
   * Wait for synchronous replication, if required.
1178
   *
1179
   * Note that at this stage we have marked the prepare, but still show as
1180
   * running in the procarray (twice!) and continue to hold locks.
1181
   */
1182
0
  SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1183
1184
0
  records.tail = records.head = NULL;
1185
0
  records.num_chunks = 0;
1186
0
}
1187
1188
/*
1189
 * Register a 2PC record to be written to state file.
1190
 */
1191
void
1192
RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1193
             const void *data, uint32 len)
1194
0
{
1195
0
  TwoPhaseRecordOnDisk record;
1196
1197
0
  record.rmid = rmid;
1198
0
  record.info = info;
1199
0
  record.len = len;
1200
0
  save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1201
0
  if (len > 0)
1202
0
    save_state_data(data, len);
1203
0
}
1204
1205
1206
/*
1207
 * Read and validate the state file for xid.
1208
 *
1209
 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1210
 * contents of the file.  Otherwise return NULL.
1211
 */
1212
static char *
1213
ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
1214
0
{
1215
0
  char    path[MAXPGPATH];
1216
0
  char     *buf;
1217
0
  TwoPhaseFileHeader *hdr;
1218
0
  int     fd;
1219
0
  struct stat stat;
1220
0
  uint32    crc_offset;
1221
0
  pg_crc32c calc_crc,
1222
0
        file_crc;
1223
1224
0
  TwoPhaseFilePath(path, xid);
1225
1226
0
  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1227
0
  if (fd < 0)
1228
0
  {
1229
0
    if (give_warnings)
1230
0
      ereport(WARNING,
1231
0
          (errcode_for_file_access(),
1232
0
           errmsg("could not open two-phase state file \"%s\": %m",
1233
0
              path)));
1234
0
    return NULL;
1235
0
  }
1236
1237
  /*
1238
   * Check file length.  We can determine a lower bound pretty easily. We
1239
   * set an upper bound to avoid palloc() failure on a corrupt file, though
1240
   * we can't guarantee that we won't get an out of memory error anyway,
1241
   * even on a valid file.
1242
   */
1243
0
  if (fstat(fd, &stat))
1244
0
  {
1245
0
    int     save_errno = errno;
1246
1247
0
    CloseTransientFile(fd);
1248
0
    if (give_warnings)
1249
0
    {
1250
0
      errno = save_errno;
1251
0
      ereport(WARNING,
1252
0
          (errcode_for_file_access(),
1253
0
           errmsg("could not stat two-phase state file \"%s\": %m",
1254
0
              path)));
1255
0
    }
1256
0
    return NULL;
1257
0
  }
1258
1259
0
  if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1260
0
            MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1261
0
            sizeof(pg_crc32c)) ||
1262
0
    stat.st_size > MaxAllocSize)
1263
0
  {
1264
0
    CloseTransientFile(fd);
1265
0
    return NULL;
1266
0
  }
1267
1268
0
  crc_offset = stat.st_size - sizeof(pg_crc32c);
1269
0
  if (crc_offset != MAXALIGN(crc_offset))
1270
0
  {
1271
0
    CloseTransientFile(fd);
1272
0
    return NULL;
1273
0
  }
1274
1275
  /*
1276
   * OK, slurp in the file.
1277
   */
1278
0
  buf = (char *) palloc(stat.st_size);
1279
1280
0
  pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1281
0
  if (read(fd, buf, stat.st_size) != stat.st_size)
1282
0
  {
1283
0
    int     save_errno = errno;
1284
1285
0
    pgstat_report_wait_end();
1286
0
    CloseTransientFile(fd);
1287
0
    if (give_warnings)
1288
0
    {
1289
0
      errno = save_errno;
1290
0
      ereport(WARNING,
1291
0
          (errcode_for_file_access(),
1292
0
           errmsg("could not read two-phase state file \"%s\": %m",
1293
0
              path)));
1294
0
    }
1295
0
    pfree(buf);
1296
0
    return NULL;
1297
0
  }
1298
1299
0
  pgstat_report_wait_end();
1300
0
  CloseTransientFile(fd);
1301
1302
0
  hdr = (TwoPhaseFileHeader *) buf;
1303
0
  if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1304
0
  {
1305
0
    pfree(buf);
1306
0
    return NULL;
1307
0
  }
1308
1309
0
  INIT_CRC32C(calc_crc);
1310
0
  COMP_CRC32C(calc_crc, buf, crc_offset);
1311
0
  FIN_CRC32C(calc_crc);
1312
1313
0
  file_crc = *((pg_crc32c *) (buf + crc_offset));
1314
1315
0
  if (!EQ_CRC32C(calc_crc, file_crc))
1316
0
  {
1317
0
    pfree(buf);
1318
0
    return NULL;
1319
0
  }
1320
1321
0
  return buf;
1322
0
}
1323
1324
/*
1325
 * ParsePrepareRecord
1326
 */
1327
void
1328
ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
1329
0
{
1330
0
  TwoPhaseFileHeader *hdr;
1331
0
  char     *bufptr;
1332
1333
0
  hdr = (TwoPhaseFileHeader *) xlrec;
1334
0
  bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
1335
1336
0
  parsed->origin_lsn = hdr->origin_lsn;
1337
0
  parsed->origin_timestamp = hdr->origin_timestamp;
1338
0
  parsed->twophase_xid = hdr->xid;
1339
0
  parsed->dbId = hdr->database;
1340
0
  parsed->nsubxacts = hdr->nsubxacts;
1341
0
  parsed->nrels = hdr->ncommitrels;
1342
0
  parsed->nabortrels = hdr->nabortrels;
1343
0
  parsed->nmsgs = hdr->ninvalmsgs;
1344
1345
0
  strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
1346
0
  bufptr += MAXALIGN(hdr->gidlen);
1347
1348
0
  parsed->subxacts = (TransactionId *) bufptr;
1349
0
  bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1350
1351
0
  parsed->xnodes = (RelFileNode *) bufptr;
1352
0
  bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1353
1354
0
  parsed->abortnodes = (RelFileNode *) bufptr;
1355
0
  bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1356
1357
0
  parsed->msgs = (SharedInvalidationMessage *) bufptr;
1358
0
  bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1359
0
}
1360
1361
1362
1363
/*
1364
 * Reads 2PC data from xlog. During checkpoint this data will be moved to
1365
 * twophase files and ReadTwoPhaseFile should be used instead.
1366
 *
1367
 * Note clearly that this function can access WAL during normal operation,
1368
 * similarly to the way WALSender or Logical Decoding would do.
1369
 *
1370
 */
1371
static void
1372
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1373
0
{
1374
0
  XLogRecord *record;
1375
0
  XLogReaderState *xlogreader;
1376
0
  char     *errormsg;
1377
1378
0
  xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
1379
0
                  NULL);
1380
0
  if (!xlogreader)
1381
0
    ereport(ERROR,
1382
0
        (errcode(ERRCODE_OUT_OF_MEMORY),
1383
0
         errmsg("out of memory"),
1384
0
         errdetail("Failed while allocating a WAL reading processor.")));
1385
1386
0
  record = XLogReadRecord(xlogreader, lsn, &errormsg);
1387
0
  if (record == NULL)
1388
0
    ereport(ERROR,
1389
0
        (errcode_for_file_access(),
1390
0
         errmsg("could not read two-phase state from WAL at %X/%X",
1391
0
            (uint32) (lsn >> 32),
1392
0
            (uint32) lsn)));
1393
1394
0
  if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1395
0
    (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1396
0
    ereport(ERROR,
1397
0
        (errcode_for_file_access(),
1398
0
         errmsg("expected two-phase state data is not present in WAL at %X/%X",
1399
0
            (uint32) (lsn >> 32),
1400
0
            (uint32) lsn)));
1401
1402
0
  if (len != NULL)
1403
0
    *len = XLogRecGetDataLen(xlogreader);
1404
1405
0
  *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1406
0
  memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1407
1408
0
  XLogReaderFree(xlogreader);
1409
0
}
1410
1411
1412
/*
1413
 * Confirms an xid is prepared, during recovery
1414
 */
1415
bool
1416
StandbyTransactionIdIsPrepared(TransactionId xid)
1417
0
{
1418
0
  char     *buf;
1419
0
  TwoPhaseFileHeader *hdr;
1420
0
  bool    result;
1421
1422
0
  Assert(TransactionIdIsValid(xid));
1423
1424
0
  if (max_prepared_xacts <= 0)
1425
0
    return false;     /* nothing to do */
1426
1427
  /* Read and validate file */
1428
0
  buf = ReadTwoPhaseFile(xid, false);
1429
0
  if (buf == NULL)
1430
0
    return false;
1431
1432
  /* Check header also */
1433
0
  hdr = (TwoPhaseFileHeader *) buf;
1434
0
  result = TransactionIdEquals(hdr->xid, xid);
1435
0
  pfree(buf);
1436
1437
0
  return result;
1438
0
}
1439
1440
/*
1441
 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1442
 */
1443
void
1444
FinishPreparedTransaction(const char *gid, bool isCommit)
1445
0
{
1446
0
  GlobalTransaction gxact;
1447
0
  PGPROC     *proc;
1448
0
  PGXACT     *pgxact;
1449
0
  TransactionId xid;
1450
0
  char     *buf;
1451
0
  char     *bufptr;
1452
0
  TwoPhaseFileHeader *hdr;
1453
0
  TransactionId latestXid;
1454
0
  TransactionId *children;
1455
0
  RelFileNode *commitrels;
1456
0
  RelFileNode *abortrels;
1457
0
  RelFileNode *delrels;
1458
0
  int     ndelrels;
1459
0
  SharedInvalidationMessage *invalmsgs;
1460
1461
  /*
1462
   * Validate the GID, and lock the GXACT to ensure that two backends do not
1463
   * try to commit the same GID at once.
1464
   */
1465
0
  gxact = LockGXact(gid, GetUserId());
1466
0
  proc = &ProcGlobal->allProcs[gxact->pgprocno];
1467
0
  pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1468
0
  xid = pgxact->xid;
1469
1470
  /*
1471
   * Read and validate 2PC state data. State data will typically be stored
1472
   * in WAL files if the LSN is after the last checkpoint record, or moved
1473
   * to disk if for some reason they have lived for a long time.
1474
   */
1475
0
  if (gxact->ondisk)
1476
0
    buf = ReadTwoPhaseFile(xid, true);
1477
0
  else
1478
0
    XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1479
1480
1481
  /*
1482
   * Disassemble the header area
1483
   */
1484
0
  hdr = (TwoPhaseFileHeader *) buf;
1485
0
  Assert(TransactionIdEquals(hdr->xid, xid));
1486
0
  bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1487
0
  bufptr += MAXALIGN(hdr->gidlen);
1488
0
  children = (TransactionId *) bufptr;
1489
0
  bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1490
0
  commitrels = (RelFileNode *) bufptr;
1491
0
  bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1492
0
  abortrels = (RelFileNode *) bufptr;
1493
0
  bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1494
0
  invalmsgs = (SharedInvalidationMessage *) bufptr;
1495
0
  bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1496
1497
  /* compute latestXid among all children */
1498
0
  latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1499
1500
  /* Prevent cancel/die interrupt while cleaning up */
1501
0
  HOLD_INTERRUPTS();
1502
1503
  /*
1504
   * The order of operations here is critical: make the XLOG entry for
1505
   * commit or abort, then mark the transaction committed or aborted in
1506
   * pg_xact, then remove its PGPROC from the global ProcArray (which means
1507
   * TransactionIdIsInProgress will stop saying the prepared xact is in
1508
   * progress), then run the post-commit or post-abort callbacks. The
1509
   * callbacks will release the locks the transaction held.
1510
   */
1511
0
  if (isCommit)
1512
0
    RecordTransactionCommitPrepared(xid,
1513
0
                    hdr->nsubxacts, children,
1514
0
                    hdr->ncommitrels, commitrels,
1515
0
                    hdr->ninvalmsgs, invalmsgs,
1516
0
                    hdr->initfileinval, gid);
1517
0
  else
1518
0
    RecordTransactionAbortPrepared(xid,
1519
0
                     hdr->nsubxacts, children,
1520
0
                     hdr->nabortrels, abortrels,
1521
0
                     gid);
1522
1523
0
  ProcArrayRemove(proc, latestXid);
1524
1525
  /*
1526
   * In case we fail while running the callbacks, mark the gxact invalid so
1527
   * no one else will try to commit/rollback, and so it will be recycled if
1528
   * we fail after this point.  It is still locked by our backend so it
1529
   * won't go away yet.
1530
   *
1531
   * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1532
   */
1533
0
  gxact->valid = false;
1534
1535
  /*
1536
   * We have to remove any files that were supposed to be dropped. For
1537
   * consistency with the regular xact.c code paths, must do this before
1538
   * releasing locks, so do it before running the callbacks.
1539
   *
1540
   * NB: this code knows that we couldn't be dropping any temp rels ...
1541
   */
1542
0
  if (isCommit)
1543
0
  {
1544
0
    delrels = commitrels;
1545
0
    ndelrels = hdr->ncommitrels;
1546
0
  }
1547
0
  else
1548
0
  {
1549
0
    delrels = abortrels;
1550
0
    ndelrels = hdr->nabortrels;
1551
0
  }
1552
1553
  /* Make sure files supposed to be dropped are dropped */
1554
0
  DropRelationFiles(delrels, ndelrels, false);
1555
1556
  /*
1557
   * Handle cache invalidation messages.
1558
   *
1559
   * Relcache init file invalidation requires processing both before and
1560
   * after we send the SI messages. See AtEOXact_Inval()
1561
   */
1562
0
  if (hdr->initfileinval)
1563
0
    RelationCacheInitFilePreInvalidate();
1564
0
  SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1565
0
  if (hdr->initfileinval)
1566
0
    RelationCacheInitFilePostInvalidate();
1567
1568
  /* And now do the callbacks */
1569
0
  if (isCommit)
1570
0
    ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1571
0
  else
1572
0
    ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1573
1574
0
  PredicateLockTwoPhaseFinish(xid, isCommit);
1575
1576
  /* Count the prepared xact as committed or aborted */
1577
0
  AtEOXact_PgStat(isCommit);
1578
1579
  /*
1580
   * And now we can clean up any files we may have left.
1581
   */
1582
0
  if (gxact->ondisk)
1583
0
    RemoveTwoPhaseFile(xid, true);
1584
1585
0
  LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1586
0
  RemoveGXact(gxact);
1587
0
  LWLockRelease(TwoPhaseStateLock);
1588
0
  MyLockedGxact = NULL;
1589
1590
0
  RESUME_INTERRUPTS();
1591
1592
0
  pfree(buf);
1593
0
}
1594
1595
/*
1596
 * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1597
 */
1598
static void
1599
ProcessRecords(char *bufptr, TransactionId xid,
1600
         const TwoPhaseCallback callbacks[])
1601
0
{
1602
0
  for (;;)
1603
0
  {
1604
0
    TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1605
1606
0
    Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1607
0
    if (record->rmid == TWOPHASE_RM_END_ID)
1608
0
      break;
1609
1610
0
    bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1611
1612
0
    if (callbacks[record->rmid] != NULL)
1613
0
      callbacks[record->rmid] (xid, record->info,
1614
0
                   (void *) bufptr, record->len);
1615
1616
0
    bufptr += MAXALIGN(record->len);
1617
0
  }
1618
0
}
1619
1620
/*
1621
 * Remove the 2PC file for the specified XID.
1622
 *
1623
 * If giveWarning is false, do not complain about file-not-present;
1624
 * this is an expected case during WAL replay.
1625
 */
1626
static void
1627
RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1628
0
{
1629
0
  char    path[MAXPGPATH];
1630
1631
0
  TwoPhaseFilePath(path, xid);
1632
0
  if (unlink(path))
1633
0
    if (errno != ENOENT || giveWarning)
1634
0
      ereport(WARNING,
1635
0
          (errcode_for_file_access(),
1636
0
           errmsg("could not remove two-phase state file \"%s\": %m",
1637
0
              path)));
1638
0
}
1639
1640
/*
1641
 * Recreates a state file. This is used in WAL replay and during
1642
 * checkpoint creation.
1643
 *
1644
 * Note: content and len don't include CRC.
1645
 */
1646
static void
1647
RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1648
0
{
1649
0
  char    path[MAXPGPATH];
1650
0
  pg_crc32c statefile_crc;
1651
0
  int     fd;
1652
1653
  /* Recompute CRC */
1654
0
  INIT_CRC32C(statefile_crc);
1655
0
  COMP_CRC32C(statefile_crc, content, len);
1656
0
  FIN_CRC32C(statefile_crc);
1657
1658
0
  TwoPhaseFilePath(path, xid);
1659
1660
0
  fd = OpenTransientFile(path,
1661
0
               O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1662
0
  if (fd < 0)
1663
0
    ereport(ERROR,
1664
0
        (errcode_for_file_access(),
1665
0
         errmsg("could not recreate two-phase state file \"%s\": %m",
1666
0
            path)));
1667
1668
  /* Write content and CRC */
1669
0
  errno = 0;
1670
0
  pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1671
0
  if (write(fd, content, len) != len)
1672
0
  {
1673
0
    int     save_errno = errno;
1674
1675
0
    pgstat_report_wait_end();
1676
0
    CloseTransientFile(fd);
1677
1678
    /* if write didn't set errno, assume problem is no disk space */
1679
0
    errno = save_errno ? save_errno : ENOSPC;
1680
0
    ereport(ERROR,
1681
0
        (errcode_for_file_access(),
1682
0
         errmsg("could not write two-phase state file: %m")));
1683
0
  }
1684
0
  if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1685
0
  {
1686
0
    int     save_errno = errno;
1687
1688
0
    pgstat_report_wait_end();
1689
0
    CloseTransientFile(fd);
1690
1691
    /* if write didn't set errno, assume problem is no disk space */
1692
0
    errno = save_errno ? save_errno : ENOSPC;
1693
0
    ereport(ERROR,
1694
0
        (errcode_for_file_access(),
1695
0
         errmsg("could not write two-phase state file: %m")));
1696
0
  }
1697
0
  pgstat_report_wait_end();
1698
1699
  /*
1700
   * We must fsync the file because the end-of-replay checkpoint will not do
1701
   * so, there being no GXACT in shared memory yet to tell it to.
1702
   */
1703
0
  pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1704
0
  if (pg_fsync(fd) != 0)
1705
0
  {
1706
0
    int     save_errno = errno;
1707
1708
0
    CloseTransientFile(fd);
1709
0
    errno = save_errno;
1710
0
    ereport(ERROR,
1711
0
        (errcode_for_file_access(),
1712
0
         errmsg("could not fsync two-phase state file: %m")));
1713
0
  }
1714
0
  pgstat_report_wait_end();
1715
1716
0
  if (CloseTransientFile(fd) != 0)
1717
0
    ereport(ERROR,
1718
0
        (errcode_for_file_access(),
1719
0
         errmsg("could not close two-phase state file: %m")));
1720
0
}
1721
1722
/*
1723
 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1724
 *
1725
 * We must fsync the state file of any GXACT that is valid or has been
1726
 * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1727
 * horizon.  (If the gxact isn't valid yet, has not been generated in
1728
 * redo, or has a later LSN, this checkpoint is not responsible for
1729
 * fsyncing it.)
1730
 *
1731
 * This is deliberately run as late as possible in the checkpoint sequence,
1732
 * because GXACTs ordinarily have short lifespans, and so it is quite
1733
 * possible that GXACTs that were valid at checkpoint start will no longer
1734
 * exist if we wait a little bit. With typical checkpoint settings this
1735
 * will be about 3 minutes for an online checkpoint, so as a result we
1736
 * we expect that there will be no GXACTs that need to be copied to disk.
1737
 *
1738
 * If a GXACT remains valid across multiple checkpoints, it will already
1739
 * be on disk so we don't bother to repeat that write.
1740
 */
1741
void
1742
CheckPointTwoPhase(XLogRecPtr redo_horizon)
1743
2.35k
{
1744
2.35k
  int     i;
1745
2.35k
  int     serialized_xacts = 0;
1746
1747
2.35k
  if (max_prepared_xacts <= 0)
1748
2.35k
    return;          /* nothing to do */
1749
1750
0
  TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1751
1752
  /*
1753
   * We are expecting there to be zero GXACTs that need to be copied to
1754
   * disk, so we perform all I/O while holding TwoPhaseStateLock for
1755
   * simplicity. This prevents any new xacts from preparing while this
1756
   * occurs, which shouldn't be a problem since the presence of long-lived
1757
   * prepared xacts indicates the transaction manager isn't active.
1758
   *
1759
   * It's also possible to move I/O out of the lock, but on every error we
1760
   * should check whether somebody committed our transaction in different
1761
   * backend. Let's leave this optimization for future, if somebody will
1762
   * spot that this place cause bottleneck.
1763
   *
1764
   * Note that it isn't possible for there to be a GXACT with a
1765
   * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1766
   * because of the efforts with delayChkpt.
1767
   */
1768
0
  LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1769
0
  for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1770
0
  {
1771
    /*
1772
     * Note that we are using gxact not pgxact so this works in recovery
1773
     * also
1774
     */
1775
0
    GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1776
1777
0
    if ((gxact->valid || gxact->inredo) &&
1778
0
      !gxact->ondisk &&
1779
0
      gxact->prepare_end_lsn <= redo_horizon)
1780
0
    {
1781
0
      char     *buf;
1782
0
      int     len;
1783
1784
0
      XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1785
0
      RecreateTwoPhaseFile(gxact->xid, buf, len);
1786
0
      gxact->ondisk = true;
1787
0
      gxact->prepare_start_lsn = InvalidXLogRecPtr;
1788
0
      gxact->prepare_end_lsn = InvalidXLogRecPtr;
1789
0
      pfree(buf);
1790
0
      serialized_xacts++;
1791
0
    }
1792
0
  }
1793
0
  LWLockRelease(TwoPhaseStateLock);
1794
1795
  /*
1796
   * Flush unconditionally the parent directory to make any information
1797
   * durable on disk.  Two-phase files could have been removed and those
1798
   * removals need to be made persistent as well as any files newly created
1799
   * previously since the last checkpoint.
1800
   */
1801
0
  fsync_fname(TWOPHASE_DIR, true);
1802
1803
0
  TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1804
1805
0
  if (log_checkpoints && serialized_xacts > 0)
1806
0
    ereport(LOG,
1807
0
        (errmsg_plural("%u two-phase state file was written "
1808
0
                 "for a long-running prepared transaction",
1809
0
                 "%u two-phase state files were written "
1810
0
                 "for long-running prepared transactions",
1811
0
                 serialized_xacts,
1812
0
                 serialized_xacts)));
1813
0
}
1814
1815
/*
1816
 * restoreTwoPhaseData
1817
 *
1818
 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1819
 * This is called once at the beginning of recovery, saving any extra
1820
 * lookups in the future.  Two-phase files that are newer than the
1821
 * minimum XID horizon are discarded on the way.
1822
 */
1823
void
1824
restoreTwoPhaseData(void)
1825
3.99k
{
1826
3.99k
  DIR      *cldir;
1827
3.99k
  struct dirent *clde;
1828
1829
3.99k
  LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1830
3.99k
  cldir = AllocateDir(TWOPHASE_DIR);
1831
11.9k
  while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1832
7.99k
  {
1833
7.99k
    if (strlen(clde->d_name) == 8 &&
1834
7.99k
      
strspn(clde->d_name, "0123456789ABCDEF") == 80
)
1835
0
    {
1836
0
      TransactionId xid;
1837
0
      char     *buf;
1838
1839
0
      xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1840
1841
0
      buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1842
0
                    true, false, false);
1843
0
      if (buf == NULL)
1844
0
        continue;
1845
1846
0
      PrepareRedoAdd(buf, InvalidXLogRecPtr,
1847
0
               InvalidXLogRecPtr, InvalidRepOriginId);
1848
0
    }
1849
7.99k
  }
1850
3.99k
  LWLockRelease(TwoPhaseStateLock);
1851
3.99k
  FreeDir(cldir);
1852
3.99k
}
1853
1854
/*
1855
 * PrescanPreparedTransactions
1856
 *
1857
 * Scan the shared memory entries of TwoPhaseState and determine the range
1858
 * of valid XIDs present.  This is run during database startup, after we
1859
 * have completed reading WAL.  ShmemVariableCache->nextXid has been set to
1860
 * one more than the highest XID for which evidence exists in WAL.
1861
 *
1862
 * We throw away any prepared xacts with main XID beyond nextXid --- if any
1863
 * are present, it suggests that the DBA has done a PITR recovery to an
1864
 * earlier point in time without cleaning out pg_twophase.  We dare not
1865
 * try to recover such prepared xacts since they likely depend on database
1866
 * state that doesn't exist now.
1867
 *
1868
 * However, we will advance nextXid beyond any subxact XIDs belonging to
1869
 * valid prepared xacts.  We need to do this since subxact commit doesn't
1870
 * write a WAL entry, and so there might be no evidence in WAL of those
1871
 * subxact XIDs.
1872
 *
1873
 * Our other responsibility is to determine and return the oldest valid XID
1874
 * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1875
 * This is needed to synchronize pg_subtrans startup properly.
1876
 *
1877
 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1878
 * top-level xids is stored in *xids_p. The number of entries in the array
1879
 * is returned in *nxids_p.
1880
 */
1881
TransactionId
1882
PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1883
3.99k
{
1884
3.99k
  TransactionId origNextXid = ShmemVariableCache->nextXid;
1885
3.99k
  TransactionId result = origNextXid;
1886
3.99k
  TransactionId *xids = NULL;
1887
3.99k
  int     nxids = 0;
1888
3.99k
  int     allocsize = 0;
1889
3.99k
  int     i;
1890
1891
3.99k
  LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1892
3.99k
  for (i = 0; i < TwoPhaseState->numPrepXacts; 
i++0
)
1893
0
  {
1894
0
    TransactionId xid;
1895
0
    char     *buf;
1896
0
    GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1897
1898
0
    Assert(gxact->inredo);
1899
1900
0
    xid = gxact->xid;
1901
1902
0
    buf = ProcessTwoPhaseBuffer(xid,
1903
0
                  gxact->prepare_start_lsn,
1904
0
                  gxact->ondisk, false, true);
1905
1906
0
    if (buf == NULL)
1907
0
      continue;
1908
1909
    /*
1910
     * OK, we think this file is valid.  Incorporate xid into the
1911
     * running-minimum result.
1912
     */
1913
0
    if (TransactionIdPrecedes(xid, result))
1914
0
      result = xid;
1915
1916
0
    if (xids_p)
1917
0
    {
1918
0
      if (nxids == allocsize)
1919
0
      {
1920
0
        if (nxids == 0)
1921
0
        {
1922
0
          allocsize = 10;
1923
0
          xids = palloc(allocsize * sizeof(TransactionId));
1924
0
        }
1925
0
        else
1926
0
        {
1927
0
          allocsize = allocsize * 2;
1928
0
          xids = repalloc(xids, allocsize * sizeof(TransactionId));
1929
0
        }
1930
0
      }
1931
0
      xids[nxids++] = xid;
1932
0
    }
1933
1934
0
    pfree(buf);
1935
0
  }
1936
3.99k
  LWLockRelease(TwoPhaseStateLock);
1937
1938
3.99k
  if (xids_p)
1939
0
  {
1940
0
    *xids_p = xids;
1941
0
    *nxids_p = nxids;
1942
0
  }
1943
1944
3.99k
  return result;
1945
3.99k
}
1946
1947
/*
1948
 * StandbyRecoverPreparedTransactions
1949
 *
1950
 * Scan the shared memory entries of TwoPhaseState and setup all the required
1951
 * information to allow standby queries to treat prepared transactions as still
1952
 * active.
1953
 *
1954
 * This is never called at the end of recovery - we use
1955
 * RecoverPreparedTransactions() at that point.
1956
 *
1957
 * The lack of calls to SubTransSetParent() calls here is by design;
1958
 * those calls are made by RecoverPreparedTransactions() at the end of recovery
1959
 * for those xacts that need this.
1960
 */
1961
void
1962
StandbyRecoverPreparedTransactions(void)
1963
0
{
1964
0
  int     i;
1965
1966
0
  LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1967
0
  for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1968
0
  {
1969
0
    TransactionId xid;
1970
0
    char     *buf;
1971
0
    GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1972
1973
0
    Assert(gxact->inredo);
1974
1975
0
    xid = gxact->xid;
1976
1977
0
    buf = ProcessTwoPhaseBuffer(xid,
1978
0
                  gxact->prepare_start_lsn,
1979
0
                  gxact->ondisk, false, false);
1980
0
    if (buf != NULL)
1981
0
      pfree(buf);
1982
0
  }
1983
0
  LWLockRelease(TwoPhaseStateLock);
1984
0
}
1985
1986
/*
1987
 * RecoverPreparedTransactions
1988
 *
1989
 * Scan the shared memory entries of TwoPhaseState and reload the state for
1990
 * each prepared transaction (reacquire locks, etc).
1991
 *
1992
 * This is run at the end of recovery, but before we allow backends to write
1993
 * WAL.
1994
 *
1995
 * At the end of recovery the way we take snapshots will change. We now need
1996
 * to mark all running transactions with their full SubTransSetParent() info
1997
 * to allow normal snapshots to work correctly if snapshots overflow.
1998
 * We do this here because by definition prepared transactions are the only
1999
 * type of write transaction still running, so this is necessary and
2000
 * complete.
2001
 */
2002
void
2003
RecoverPreparedTransactions(void)
2004
3.99k
{
2005
3.99k
  int     i;
2006
2007
3.99k
  LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2008
3.99k
  for (i = 0; i < TwoPhaseState->numPrepXacts; 
i++0
)
2009
0
  {
2010
0
    TransactionId xid;
2011
0
    char     *buf;
2012
0
    GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2013
0
    char     *bufptr;
2014
0
    TwoPhaseFileHeader *hdr;
2015
0
    TransactionId *subxids;
2016
0
    const char *gid;
2017
2018
0
    xid = gxact->xid;
2019
2020
    /*
2021
     * Reconstruct subtrans state for the transaction --- needed because
2022
     * pg_subtrans is not preserved over a restart.  Note that we are
2023
     * linking all the subtransactions directly to the top-level XID;
2024
     * there may originally have been a more complex hierarchy, but
2025
     * there's no need to restore that exactly. It's possible that
2026
     * SubTransSetParent has been set before, if the prepared transaction
2027
     * generated xid assignment records.
2028
     */
2029
0
    buf = ProcessTwoPhaseBuffer(xid,
2030
0
                  gxact->prepare_start_lsn,
2031
0
                  gxact->ondisk, true, false);
2032
0
    if (buf == NULL)
2033
0
      continue;
2034
2035
0
    ereport(LOG,
2036
0
        (errmsg("recovering prepared transaction %u from shared memory", xid)));
2037
2038
0
    hdr = (TwoPhaseFileHeader *) buf;
2039
0
    Assert(TransactionIdEquals(hdr->xid, xid));
2040
0
    bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2041
0
    gid = (const char *) bufptr;
2042
0
    bufptr += MAXALIGN(hdr->gidlen);
2043
0
    subxids = (TransactionId *) bufptr;
2044
0
    bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2045
0
    bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2046
0
    bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2047
0
    bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2048
2049
    /*
2050
     * Recreate its GXACT and dummy PGPROC. But, check whether it was
2051
     * added in redo and already has a shmem entry for it.
2052
     */
2053
0
    MarkAsPreparingGuts(gxact, xid, gid,
2054
0
              hdr->prepared_at,
2055
0
              hdr->owner, hdr->database);
2056
2057
    /* recovered, so reset the flag for entries generated by redo */
2058
0
    gxact->inredo = false;
2059
2060
0
    GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2061
0
    MarkAsPrepared(gxact, true);
2062
2063
0
    LWLockRelease(TwoPhaseStateLock);
2064
2065
    /*
2066
     * Recover other state (notably locks) using resource managers.
2067
     */
2068
0
    ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2069
2070
    /*
2071
     * Release locks held by the standby process after we process each
2072
     * prepared transaction. As a result, we don't need too many
2073
     * additional locks at any one time.
2074
     */
2075
0
    if (InHotStandby)
2076
0
      StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2077
2078
    /*
2079
     * We're done with recovering this transaction. Clear MyLockedGxact,
2080
     * like we do in PrepareTransaction() during normal operation.
2081
     */
2082
0
    PostPrepare_Twophase();
2083
2084
0
    pfree(buf);
2085
2086
0
    LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2087
0
  }
2088
2089
3.99k
  LWLockRelease(TwoPhaseStateLock);
2090
3.99k
}
2091
2092
/*
2093
 * ProcessTwoPhaseBuffer
2094
 *
2095
 * Given a transaction id, read it either from disk or read it directly
2096
 * via shmem xlog record pointer using the provided "prepare_start_lsn".
2097
 *
2098
 * If setParent is true, set up subtransaction parent linkages.
2099
 *
2100
 * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
2101
 * value scanned.
2102
 */
2103
static char *
2104
ProcessTwoPhaseBuffer(TransactionId xid,
2105
            XLogRecPtr prepare_start_lsn,
2106
            bool fromdisk,
2107
            bool setParent, bool setNextXid)
2108
0
{
2109
0
  TransactionId origNextXid = ShmemVariableCache->nextXid;
2110
0
  TransactionId *subxids;
2111
0
  char     *buf;
2112
0
  TwoPhaseFileHeader *hdr;
2113
0
  int     i;
2114
2115
0
  Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2116
2117
0
  if (!fromdisk)
2118
0
    Assert(prepare_start_lsn != InvalidXLogRecPtr);
2119
2120
  /* Already processed? */
2121
0
  if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2122
0
  {
2123
0
    if (fromdisk)
2124
0
    {
2125
0
      ereport(WARNING,
2126
0
          (errmsg("removing stale two-phase state file for transaction %u",
2127
0
              xid)));
2128
0
      RemoveTwoPhaseFile(xid, true);
2129
0
    }
2130
0
    else
2131
0
    {
2132
0
      ereport(WARNING,
2133
0
          (errmsg("removing stale two-phase state from memory for transaction %u",
2134
0
              xid)));
2135
0
      PrepareRedoRemove(xid, true);
2136
0
    }
2137
0
    return NULL;
2138
0
  }
2139
2140
  /* Reject XID if too new */
2141
0
  if (TransactionIdFollowsOrEquals(xid, origNextXid))
2142
0
  {
2143
0
    if (fromdisk)
2144
0
    {
2145
0
      ereport(WARNING,
2146
0
          (errmsg("removing future two-phase state file for transaction %u",
2147
0
              xid)));
2148
0
      RemoveTwoPhaseFile(xid, true);
2149
0
    }
2150
0
    else
2151
0
    {
2152
0
      ereport(WARNING,
2153
0
          (errmsg("removing future two-phase state from memory for transaction %u",
2154
0
              xid)));
2155
0
      PrepareRedoRemove(xid, true);
2156
0
    }
2157
0
    return NULL;
2158
0
  }
2159
2160
0
  if (fromdisk)
2161
0
  {
2162
    /* Read and validate file */
2163
0
    buf = ReadTwoPhaseFile(xid, true);
2164
0
    if (buf == NULL)
2165
0
    {
2166
0
      ereport(WARNING,
2167
0
          (errmsg("removing corrupt two-phase state file for transaction %u",
2168
0
              xid)));
2169
0
      RemoveTwoPhaseFile(xid, true);
2170
0
      return NULL;
2171
0
    }
2172
0
  }
2173
0
  else
2174
0
  {
2175
    /* Read xlog data */
2176
0
    XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2177
0
  }
2178
2179
  /* Deconstruct header */
2180
0
  hdr = (TwoPhaseFileHeader *) buf;
2181
0
  if (!TransactionIdEquals(hdr->xid, xid))
2182
0
  {
2183
0
    if (fromdisk)
2184
0
    {
2185
0
      ereport(WARNING,
2186
0
          (errmsg("removing corrupt two-phase state file for transaction %u",
2187
0
              xid)));
2188
0
      RemoveTwoPhaseFile(xid, true);
2189
0
    }
2190
0
    else
2191
0
    {
2192
0
      ereport(WARNING,
2193
0
          (errmsg("removing corrupt two-phase state from memory for transaction %u",
2194
0
              xid)));
2195
0
      PrepareRedoRemove(xid, true);
2196
0
    }
2197
0
    pfree(buf);
2198
0
    return NULL;
2199
0
  }
2200
2201
  /*
2202
   * Examine subtransaction XIDs ... they should all follow main XID, and
2203
   * they may force us to advance nextXid.
2204
   */
2205
0
  subxids = (TransactionId *) (buf +
2206
0
                 MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2207
0
                 MAXALIGN(hdr->gidlen));
2208
0
  for (i = 0; i < hdr->nsubxacts; i++)
2209
0
  {
2210
0
    TransactionId subxid = subxids[i];
2211
2212
0
    Assert(TransactionIdFollows(subxid, xid));
2213
2214
    /* update nextXid if needed */
2215
0
    if (setNextXid &&
2216
0
      TransactionIdFollowsOrEquals(subxid,
2217
0
                     ShmemVariableCache->nextXid))
2218
0
    {
2219
      /*
2220
       * We don't expect anyone else to modify nextXid, hence we don't
2221
       * need to hold a lock while examining it.  We still acquire the
2222
       * lock to modify it, though, so we recheck.
2223
       */
2224
0
      LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
2225
0
      if (TransactionIdFollowsOrEquals(subxid,
2226
0
                       ShmemVariableCache->nextXid))
2227
0
      {
2228
0
        ShmemVariableCache->nextXid = subxid;
2229
0
        TransactionIdAdvance(ShmemVariableCache->nextXid);
2230
0
      }
2231
0
      LWLockRelease(XidGenLock);
2232
0
    }
2233
2234
0
    if (setParent)
2235
0
      SubTransSetParent(subxid, xid);
2236
0
  }
2237
2238
0
  return buf;
2239
0
}
2240
2241
2242
/*
2243
 *  RecordTransactionCommitPrepared
2244
 *
2245
 * This is basically the same as RecordTransactionCommit (q.v. if you change
2246
 * this function): in particular, we must set the delayChkpt flag to avoid a
2247
 * race condition.
2248
 *
2249
 * We know the transaction made at least one XLOG entry (its PREPARE),
2250
 * so it is never possible to optimize out the commit record.
2251
 */
2252
static void
2253
RecordTransactionCommitPrepared(TransactionId xid,
2254
                int nchildren,
2255
                TransactionId *children,
2256
                int nrels,
2257
                RelFileNode *rels,
2258
                int ninvalmsgs,
2259
                SharedInvalidationMessage *invalmsgs,
2260
                bool initfileinval,
2261
                const char *gid)
2262
0
{
2263
0
  XLogRecPtr  recptr;
2264
0
  TimestampTz committs = GetCurrentTimestamp();
2265
0
  bool    replorigin;
2266
2267
  /*
2268
   * Are we using the replication origins feature?  Or, in other words, are
2269
   * we replaying remote actions?
2270
   */
2271
0
  replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2272
0
          replorigin_session_origin != DoNotReplicateId);
2273
2274
0
  START_CRIT_SECTION();
2275
2276
  /* See notes in RecordTransactionCommit */
2277
0
  MyPgXact->delayChkpt = true;
2278
2279
  /*
2280
   * Emit the XLOG commit record. Note that we mark 2PC commits as
2281
   * potentially having AccessExclusiveLocks since we don't know whether or
2282
   * not they do.
2283
   */
2284
0
  recptr = XactLogCommitRecord(committs,
2285
0
                 nchildren, children, nrels, rels,
2286
0
                 ninvalmsgs, invalmsgs,
2287
0
                 initfileinval, false,
2288
0
                 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2289
0
                 xid, gid);
2290
2291
2292
0
  if (replorigin)
2293
    /* Move LSNs forward for this replication origin */
2294
0
    replorigin_session_advance(replorigin_session_origin_lsn,
2295
0
                   XactLastRecEnd);
2296
2297
  /*
2298
   * Record commit timestamp.  The value comes from plain commit timestamp
2299
   * if replorigin is not enabled, or replorigin already set a value for us
2300
   * in replorigin_session_origin_timestamp otherwise.
2301
   *
2302
   * We don't need to WAL-log anything here, as the commit record written
2303
   * above already contains the data.
2304
   */
2305
0
  if (!replorigin || replorigin_session_origin_timestamp == 0)
2306
0
    replorigin_session_origin_timestamp = committs;
2307
2308
0
  TransactionTreeSetCommitTsData(xid, nchildren, children,
2309
0
                   replorigin_session_origin_timestamp,
2310
0
                   replorigin_session_origin, false);
2311
2312
  /*
2313
   * We don't currently try to sleep before flush here ... nor is there any
2314
   * support for async commit of a prepared xact (the very idea is probably
2315
   * a contradiction)
2316
   */
2317
2318
  /* Flush XLOG to disk */
2319
0
  XLogFlush(recptr);
2320
2321
  /* Mark the transaction committed in pg_xact */
2322
0
  TransactionIdCommitTree(xid, nchildren, children);
2323
2324
  /* Checkpoint can proceed now */
2325
0
  MyPgXact->delayChkpt = false;
2326
2327
0
  END_CRIT_SECTION();
2328
2329
  /*
2330
   * Wait for synchronous replication, if required.
2331
   *
2332
   * Note that at this stage we have marked clog, but still show as running
2333
   * in the procarray and continue to hold locks.
2334
   */
2335
0
  SyncRepWaitForLSN(recptr, true);
2336
0
}
2337
2338
/*
2339
 *  RecordTransactionAbortPrepared
2340
 *
2341
 * This is basically the same as RecordTransactionAbort.
2342
 *
2343
 * We know the transaction made at least one XLOG entry (its PREPARE),
2344
 * so it is never possible to optimize out the abort record.
2345
 */
2346
static void
2347
RecordTransactionAbortPrepared(TransactionId xid,
2348
                 int nchildren,
2349
                 TransactionId *children,
2350
                 int nrels,
2351
                 RelFileNode *rels,
2352
                 const char *gid)
2353
0
{
2354
0
  XLogRecPtr  recptr;
2355
2356
  /*
2357
   * Catch the scenario where we aborted partway through
2358
   * RecordTransactionCommitPrepared ...
2359
   */
2360
0
  if (TransactionIdDidCommit(xid))
2361
0
    elog(PANIC, "cannot abort transaction %u, it was already committed",
2362
0
       xid);
2363
2364
0
  START_CRIT_SECTION();
2365
2366
  /*
2367
   * Emit the XLOG commit record. Note that we mark 2PC aborts as
2368
   * potentially having AccessExclusiveLocks since we don't know whether or
2369
   * not they do.
2370
   */
2371
0
  recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2372
0
                nchildren, children,
2373
0
                nrels, rels,
2374
0
                MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2375
0
                xid, gid);
2376
2377
  /* Always flush, since we're about to remove the 2PC state file */
2378
0
  XLogFlush(recptr);
2379
2380
  /*
2381
   * Mark the transaction aborted in clog.  This is not absolutely necessary
2382
   * but we may as well do it while we are here.
2383
   */
2384
0
  TransactionIdAbortTree(xid, nchildren, children);
2385
2386
0
  END_CRIT_SECTION();
2387
2388
  /*
2389
   * Wait for synchronous replication, if required.
2390
   *
2391
   * Note that at this stage we have marked clog, but still show as running
2392
   * in the procarray and continue to hold locks.
2393
   */
2394
0
  SyncRepWaitForLSN(recptr, false);
2395
0
}
2396
2397
/*
2398
 * PrepareRedoAdd
2399
 *
2400
 * Store pointers to the start/end of the WAL record along with the xid in
2401
 * a gxact entry in shared memory TwoPhaseState structure.  If caller
2402
 * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2403
 * data, the entry is marked as located on disk.
2404
 */
2405
void
2406
PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2407
         XLogRecPtr end_lsn, RepOriginId origin_id)
2408
0
{
2409
0
  TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2410
0
  char     *bufptr;
2411
0
  const char *gid;
2412
0
  GlobalTransaction gxact;
2413
2414
0
  Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2415
0
  Assert(RecoveryInProgress());
2416
2417
0
  bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2418
0
  gid = (const char *) bufptr;
2419
2420
  /*
2421
   * Reserve the GID for the given transaction in the redo code path.
2422
   *
2423
   * This creates a gxact struct and puts it into the active array.
2424
   *
2425
   * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2426
   * shared memory. Hence, we only fill up the bare minimum contents here.
2427
   * The gxact also gets marked with gxact->inredo set to true to indicate
2428
   * that it got added in the redo phase
2429
   */
2430
2431
  /* Get a free gxact from the freelist */
2432
0
  if (TwoPhaseState->freeGXacts == NULL)
2433
0
    ereport(ERROR,
2434
0
        (errcode(ERRCODE_OUT_OF_MEMORY),
2435
0
         errmsg("maximum number of prepared transactions reached"),
2436
0
         errhint("Increase max_prepared_transactions (currently %d).",
2437
0
             max_prepared_xacts)));
2438
0
  gxact = TwoPhaseState->freeGXacts;
2439
0
  TwoPhaseState->freeGXacts = gxact->next;
2440
2441
0
  gxact->prepared_at = hdr->prepared_at;
2442
0
  gxact->prepare_start_lsn = start_lsn;
2443
0
  gxact->prepare_end_lsn = end_lsn;
2444
0
  gxact->xid = hdr->xid;
2445
0
  gxact->owner = hdr->owner;
2446
0
  gxact->locking_backend = InvalidBackendId;
2447
0
  gxact->valid = false;
2448
0
  gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2449
0
  gxact->inredo = true;   /* yes, added in redo */
2450
0
  strcpy(gxact->gid, gid);
2451
2452
  /* And insert it into the active array */
2453
0
  Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2454
0
  TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2455
2456
0
  if (origin_id != InvalidRepOriginId)
2457
0
  {
2458
    /* recover apply progress */
2459
0
    replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2460
0
               false /* backward */ , false /* WAL */ );
2461
0
  }
2462
2463
0
  elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2464
0
}
2465
2466
/*
2467
 * PrepareRedoRemove
2468
 *
2469
 * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2470
 * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2471
 *
2472
 * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2473
 * is updated.
2474
 */
2475
void
2476
PrepareRedoRemove(TransactionId xid, bool giveWarning)
2477
0
{
2478
0
  GlobalTransaction gxact = NULL;
2479
0
  int     i;
2480
0
  bool    found = false;
2481
2482
0
  Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2483
0
  Assert(RecoveryInProgress());
2484
2485
0
  for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2486
0
  {
2487
0
    gxact = TwoPhaseState->prepXacts[i];
2488
2489
0
    if (gxact->xid == xid)
2490
0
    {
2491
0
      Assert(gxact->inredo);
2492
0
      found = true;
2493
0
      break;
2494
0
    }
2495
0
  }
2496
2497
  /*
2498
   * Just leave if there is nothing, this is expected during WAL replay.
2499
   */
2500
0
  if (!found)
2501
0
    return;
2502
2503
  /*
2504
   * And now we can clean up any files we may have left.
2505
   */
2506
0
  elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2507
0
  if (gxact->ondisk)
2508
0
    RemoveTwoPhaseFile(xid, giveWarning);
2509
0
  RemoveGXact(gxact);
2510
2511
0
  return;
2512
0
}