YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/bin/pg_dump/parallel.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * parallel.c
4
 *
5
 *  Parallel support for pg_dump and pg_restore
6
 *
7
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * IDENTIFICATION
11
 *    src/bin/pg_dump/parallel.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
16
/*
17
 * Parallel operation works like this:
18
 *
19
 * The original, master process calls ParallelBackupStart(), which forks off
20
 * the desired number of worker processes, which each enter WaitForCommands().
21
 *
22
 * The master process dispatches an individual work item to one of the worker
23
 * processes in DispatchJobForTocEntry().  We send a command string such as
24
 * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25
 * The worker process receives and decodes the command and passes it to the
26
 * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27
 * which are routines of the current archive format.  That routine performs
28
 * the required action (dump or restore) and returns an integer status code.
29
 * This is passed back to the master where we pass it to the
30
 * ParallelCompletionPtr callback function that was passed to
31
 * DispatchJobForTocEntry().  The callback function does state updating
32
 * for the master control logic in pg_backup_archiver.c.
33
 *
34
 * In principle additional archive-format-specific information might be needed
35
 * in commands or worker status responses, but so far that hasn't proved
36
 * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37
 * data structures.  Remember that we have forked off the workers only after
38
 * we have read in the catalog.  That's why our worker processes can also
39
 * access the catalog information.  (In the Windows case, the workers are
40
 * threads in the same process.  To avoid problems, they work with cloned
41
 * copies of the Archive data structure; see RunWorker().)
42
 *
43
 * In the master process, the workerStatus field for each worker has one of
44
 * the following values:
45
 *    WRKR_IDLE: it's waiting for a command
46
 *    WRKR_WORKING: it's working on a command
47
 *    WRKR_TERMINATED: process ended
48
 * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
49
 * state, and must be NULL in other states.
50
 */
51
52
#include "postgres_fe.h"
53
54
#ifndef WIN32
55
#include <sys/wait.h>
56
#include <signal.h>
57
#include <unistd.h>
58
#include <fcntl.h>
59
#endif
60
#ifdef HAVE_SYS_SELECT_H
61
#include <sys/select.h>
62
#endif
63
64
#include "parallel.h"
65
#include "pg_backup_utils.h"
66
67
#include "fe_utils/string_utils.h"
68
#include "port/pg_bswap.h"
69
70
/* Mnemonic macros for indexing the fd array returned by pipe(2) */
71
0
#define PIPE_READ             0
72
0
#define PIPE_WRITE              1
73
74
0
#define NO_SLOT (-1)      /* Failure result for GetIdleWorker() */
75
76
/* Worker process statuses */
77
typedef enum
78
{
79
  WRKR_IDLE,
80
  WRKR_WORKING,
81
  WRKR_TERMINATED
82
} T_WorkerStatus;
83
84
/*
85
 * Private per-parallel-worker state (typedef for this is in parallel.h).
86
 *
87
 * Much of this is valid only in the master process (or, on Windows, should
88
 * be touched only by the master thread).  But the AH field should be touched
89
 * only by workers.  The pipe descriptors are valid everywhere.
90
 */
91
struct ParallelSlot
92
{
93
  T_WorkerStatus workerStatus;  /* see enum above */
94
95
  /* These fields are valid if workerStatus == WRKR_WORKING: */
96
  ParallelCompletionPtr callback; /* function to call on completion */
97
  void     *callback_data;  /* passthrough data for it */
98
99
  ArchiveHandle *AH;      /* Archive data worker is using */
100
101
  int     pipeRead;   /* master's end of the pipes */
102
  int     pipeWrite;
103
  int     pipeRevRead;  /* child's end of the pipes */
104
  int     pipeRevWrite;
105
106
  /* Child process/thread identity info: */
107
#ifdef WIN32
108
  uintptr_t hThread;
109
  unsigned int threadId;
110
#else
111
  pid_t   pid;
112
#endif
113
};
114
115
#ifdef WIN32
116
117
/*
118
 * Structure to hold info passed by _beginthreadex() to the function it calls
119
 * via its single allowed argument.
120
 */
121
typedef struct
122
{
123
  ArchiveHandle *AH;      /* master database connection */
124
  ParallelSlot *slot;     /* this worker's parallel slot */
125
} WorkerInfo;
126
127
/* Windows implementation of pipe access */
128
static int  pgpipe(int handles[2]);
129
static int  piperead(int s, char *buf, int len);
130
#define pipewrite(a,b,c)  send(a,b,c,0)
131
132
#else             /* !WIN32 */
133
134
/* Non-Windows implementation of pipe access */
135
0
#define pgpipe(a)     pipe(a)
136
0
#define piperead(a,b,c)   read(a,b,c)
137
0
#define pipewrite(a,b,c)  write(a,b,c)
138
139
#endif              /* WIN32 */
140
141
/*
142
 * State info for archive_close_connection() shutdown callback.
143
 */
144
typedef struct ShutdownInformation
145
{
146
  ParallelState *pstate;
147
  Archive    *AHX;
148
} ShutdownInformation;
149
150
static ShutdownInformation shutdown_info;
151
152
/*
153
 * State info for signal handling.
154
 * We assume signal_info initializes to zeroes.
155
 *
156
 * On Unix, myAH is the master DB connection in the master process, and the
157
 * worker's own connection in worker processes.  On Windows, we have only one
158
 * instance of signal_info, so myAH is the master connection and the worker
159
 * connections must be dug out of pstate->parallelSlot[].
160
 */
161
typedef struct DumpSignalInformation
162
{
163
  ArchiveHandle *myAH;    /* database connection to issue cancel for */
164
  ParallelState *pstate;    /* parallel state, if any */
165
  bool    handler_set;  /* signal handler set up in this process? */
166
#ifndef WIN32
167
  bool    am_worker;    /* am I a worker process? */
168
#endif
169
} DumpSignalInformation;
170
171
static volatile DumpSignalInformation signal_info;
172
173
#ifdef WIN32
174
static CRITICAL_SECTION signal_info_lock;
175
#endif
176
177
/*
178
 * Write a simple string to stderr --- must be safe in a signal handler.
179
 * We ignore the write() result since there's not much we could do about it.
180
 * Certain compilers make that harder than it ought to be.
181
 */
182
#define write_stderr(str) \
183
0
  do { \
184
0
    const char *str_ = (str); \
185
0
    int   rc_; \
186
0
    rc_ = write(fileno(stderr), str_, strlen(str_)); \
187
0
    (void) rc_; \
188
0
  } while (0)
189
190
191
#ifdef WIN32
192
/* file-scope variables */
193
static DWORD tls_index;
194
195
/* globally visible variables (needed by exit_nicely) */
196
bool    parallel_init_done = false;
197
DWORD   mainThreadId;
198
#endif              /* WIN32 */
199
200
static const char *modulename = gettext_noop("parallel archiver");
201
202
/* Local function prototypes */
203
static ParallelSlot *GetMyPSlot(ParallelState *pstate);
204
static void archive_close_connection(int code, void *arg);
205
static void ShutdownWorkersHard(ParallelState *pstate);
206
static void WaitForTerminatingWorkers(ParallelState *pstate);
207
static void setup_cancel_handler(void);
208
static void set_cancel_pstate(ParallelState *pstate);
209
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
210
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
211
static int  GetIdleWorker(ParallelState *pstate);
212
static bool HasEveryWorkerTerminated(ParallelState *pstate);
213
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
214
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
215
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
216
        bool do_wait);
217
static char *getMessageFromMaster(int pipefd[2]);
218
static void sendMessageToMaster(int pipefd[2], const char *str);
219
static int  select_loop(int maxFd, fd_set *workerset);
220
static char *getMessageFromWorker(ParallelState *pstate,
221
           bool do_wait, int *worker);
222
static void sendMessageToWorker(ParallelState *pstate,
223
          int worker, const char *str);
224
static char *readMessageFromPipe(int fd);
225
226
#define messageStartsWith(msg, prefix) \
227
0
  (strncmp(msg, prefix, strlen(prefix)) == 0)
228
#define messageEquals(msg, pattern) \
229
  (strcmp(msg, pattern) == 0)
230
231
232
/*
233
 * Shutdown callback to clean up socket access
234
 */
235
#ifdef WIN32
236
static void
237
shutdown_parallel_dump_utils(int code, void *unused)
238
{
239
  /* Call the cleanup function only from the main thread */
240
  if (mainThreadId == GetCurrentThreadId())
241
    WSACleanup();
242
}
243
#endif
244
245
/*
246
 * Initialize parallel dump support --- should be called early in process
247
 * startup.  (Currently, this is called whether or not we intend parallel
248
 * activity.)
249
 */
250
void
251
init_parallel_dump_utils(void)
252
0
{
253
#ifdef WIN32
254
  if (!parallel_init_done)
255
  {
256
    WSADATA   wsaData;
257
    int     err;
258
259
    /* Prepare for threaded operation */
260
    tls_index = TlsAlloc();
261
    mainThreadId = GetCurrentThreadId();
262
263
    /* Initialize socket access */
264
    err = WSAStartup(MAKEWORD(2, 2), &wsaData);
265
    if (err != 0)
266
    {
267
      fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err);
268
      exit_nicely(1);
269
    }
270
    /* ... and arrange to shut it down at exit */
271
    on_exit_nicely(shutdown_parallel_dump_utils, NULL);
272
    parallel_init_done = true;
273
  }
274
#endif
275
0
}
276
277
/*
278
 * Find the ParallelSlot for the current worker process or thread.
279
 *
280
 * Returns NULL if no matching slot is found (this implies we're the master).
281
 */
282
static ParallelSlot *
283
GetMyPSlot(ParallelState *pstate)
284
0
{
285
0
  int     i;
286
287
0
  for (i = 0; i < pstate->numWorkers; i++)
288
0
  {
289
#ifdef WIN32
290
    if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
291
#else
292
0
    if (pstate->parallelSlot[i].pid == getpid())
293
0
#endif
294
0
      return &(pstate->parallelSlot[i]);
295
0
  }
296
297
0
  return NULL;
298
0
}
299
300
/*
301
 * A thread-local version of getLocalPQExpBuffer().
302
 *
303
 * Non-reentrant but reduces memory leakage: we'll consume one buffer per
304
 * thread, which is much better than one per fmtId/fmtQualifiedId call.
305
 */
306
#ifdef WIN32
307
static PQExpBuffer
308
getThreadLocalPQExpBuffer(void)
309
{
310
  /*
311
   * The Tls code goes awry if we use a static var, so we provide for both
312
   * static and auto, and omit any use of the static var when using Tls. We
313
   * rely on TlsGetValue() to return 0 if the value is not yet set.
314
   */
315
  static PQExpBuffer s_id_return = NULL;
316
  PQExpBuffer id_return;
317
318
  if (parallel_init_done)
319
    id_return = (PQExpBuffer) TlsGetValue(tls_index);
320
  else
321
    id_return = s_id_return;
322
323
  if (id_return)        /* first time through? */
324
  {
325
    /* same buffer, just wipe contents */
326
    resetPQExpBuffer(id_return);
327
  }
328
  else
329
  {
330
    /* new buffer */
331
    id_return = createPQExpBuffer();
332
    if (parallel_init_done)
333
      TlsSetValue(tls_index, id_return);
334
    else
335
      s_id_return = id_return;
336
  }
337
338
  return id_return;
339
}
340
#endif              /* WIN32 */
341
342
/*
343
 * pg_dump and pg_restore call this to register the cleanup handler
344
 * as soon as they've created the ArchiveHandle.
345
 */
346
void
347
on_exit_close_archive(Archive *AHX)
348
0
{
349
0
  shutdown_info.AHX = AHX;
350
0
  on_exit_nicely(archive_close_connection, &shutdown_info);
351
0
}
352
353
/*
354
 * on_exit_nicely handler for shutting down database connections and
355
 * worker processes cleanly.
356
 */
357
static void
358
archive_close_connection(int code, void *arg)
359
0
{
360
0
  ShutdownInformation *si = (ShutdownInformation *) arg;
361
362
0
  if (si->pstate)
363
0
  {
364
    /* In parallel mode, must figure out who we are */
365
0
    ParallelSlot *slot = GetMyPSlot(si->pstate);
366
367
0
    if (!slot)
368
0
    {
369
      /*
370
       * We're the master.  Forcibly shut down workers, then close our
371
       * own database connection, if any.
372
       */
373
0
      ShutdownWorkersHard(si->pstate);
374
375
0
      if (si->AHX)
376
0
        DisconnectDatabase(si->AHX);
377
0
    }
378
0
    else
379
0
    {
380
      /*
381
       * We're a worker.  Shut down our own DB connection if any.  On
382
       * Windows, we also have to close our communication sockets, to
383
       * emulate what will happen on Unix when the worker process exits.
384
       * (Without this, if this is a premature exit, the master would
385
       * fail to detect it because there would be no EOF condition on
386
       * the other end of the pipe.)
387
       */
388
0
      if (slot->AH)
389
0
        DisconnectDatabase(&(slot->AH->public));
390
391
#ifdef WIN32
392
      closesocket(slot->pipeRevRead);
393
      closesocket(slot->pipeRevWrite);
394
#endif
395
0
    }
396
0
  }
397
0
  else
398
0
  {
399
    /* Non-parallel operation: just kill the master DB connection */
400
0
    if (si->AHX)
401
0
      DisconnectDatabase(si->AHX);
402
0
  }
403
0
}
404
405
/*
406
 * Forcibly shut down any remaining workers, waiting for them to finish.
407
 *
408
 * Note that we don't expect to come here during normal exit (the workers
409
 * should be long gone, and the ParallelState too).  We're only here in an
410
 * exit_horribly() situation, so intervening to cancel active commands is
411
 * appropriate.
412
 */
413
static void
414
ShutdownWorkersHard(ParallelState *pstate)
415
0
{
416
0
  int     i;
417
418
  /*
419
   * Close our write end of the sockets so that any workers waiting for
420
   * commands know they can exit.
421
   */
422
0
  for (i = 0; i < pstate->numWorkers; i++)
423
0
    closesocket(pstate->parallelSlot[i].pipeWrite);
424
425
  /*
426
   * Force early termination of any commands currently in progress.
427
   */
428
0
#ifndef WIN32
429
  /* On non-Windows, send SIGTERM to each worker process. */
430
0
  for (i = 0; i < pstate->numWorkers; i++)
431
0
  {
432
0
    pid_t   pid = pstate->parallelSlot[i].pid;
433
434
0
    if (pid != 0)
435
0
      kill(pid, SIGTERM);
436
0
  }
437
#else
438
439
  /*
440
   * On Windows, send query cancels directly to the workers' backends.  Use
441
   * a critical section to ensure worker threads don't change state.
442
   */
443
  EnterCriticalSection(&signal_info_lock);
444
  for (i = 0; i < pstate->numWorkers; i++)
445
  {
446
    ArchiveHandle *AH = pstate->parallelSlot[i].AH;
447
    char    errbuf[1];
448
449
    if (AH != NULL && AH->connCancel != NULL)
450
      (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
451
  }
452
  LeaveCriticalSection(&signal_info_lock);
453
#endif
454
455
  /* Now wait for them to terminate. */
456
0
  WaitForTerminatingWorkers(pstate);
457
0
}
458
459
/*
460
 * Wait for all workers to terminate.
461
 */
462
static void
463
WaitForTerminatingWorkers(ParallelState *pstate)
464
0
{
465
0
  while (!HasEveryWorkerTerminated(pstate))
466
0
  {
467
0
    ParallelSlot *slot = NULL;
468
0
    int     j;
469
470
0
#ifndef WIN32
471
    /* On non-Windows, use wait() to wait for next worker to end */
472
0
    int     status;
473
0
    pid_t   pid = wait(&status);
474
475
    /* Find dead worker's slot, and clear the PID field */
476
0
    for (j = 0; j < pstate->numWorkers; j++)
477
0
    {
478
0
      slot = &(pstate->parallelSlot[j]);
479
0
      if (slot->pid == pid)
480
0
      {
481
0
        slot->pid = 0;
482
0
        break;
483
0
      }
484
0
    }
485
#else             /* WIN32 */
486
    /* On Windows, we must use WaitForMultipleObjects() */
487
    HANDLE     *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
488
    int     nrun = 0;
489
    DWORD   ret;
490
    uintptr_t hThread;
491
492
    for (j = 0; j < pstate->numWorkers; j++)
493
    {
494
      if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
495
      {
496
        lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
497
        nrun++;
498
      }
499
    }
500
    ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
501
    Assert(ret != WAIT_FAILED);
502
    hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
503
    free(lpHandles);
504
505
    /* Find dead worker's slot, and clear the hThread field */
506
    for (j = 0; j < pstate->numWorkers; j++)
507
    {
508
      slot = &(pstate->parallelSlot[j]);
509
      if (slot->hThread == hThread)
510
      {
511
        /* For cleanliness, close handles for dead threads */
512
        CloseHandle((HANDLE) slot->hThread);
513
        slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
514
        break;
515
      }
516
    }
517
#endif              /* WIN32 */
518
519
    /* On all platforms, update workerStatus and te[] as well */
520
0
    Assert(j < pstate->numWorkers);
521
0
    slot->workerStatus = WRKR_TERMINATED;
522
0
    pstate->te[j] = NULL;
523
0
  }
524
0
}
525
526
527
/*
528
 * Code for responding to cancel interrupts (SIGINT, control-C, etc)
529
 *
530
 * This doesn't quite belong in this module, but it needs access to the
531
 * ParallelState data, so there's not really a better place either.
532
 *
533
 * When we get a cancel interrupt, we could just die, but in pg_restore that
534
 * could leave a SQL command (e.g., CREATE INDEX on a large table) running
535
 * for a long time.  Instead, we try to send a cancel request and then die.
536
 * pg_dump probably doesn't really need this, but we might as well use it
537
 * there too.  Note that sending the cancel directly from the signal handler
538
 * is safe because PQcancel() is written to make it so.
539
 *
540
 * In parallel operation on Unix, each process is responsible for canceling
541
 * its own connection (this must be so because nobody else has access to it).
542
 * Furthermore, the master process should attempt to forward its signal to
543
 * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
544
 * needed because typing control-C at the console would deliver SIGINT to
545
 * every member of the terminal process group --- but in other scenarios it
546
 * might be that only the master gets signaled.
547
 *
548
 * On Windows, the cancel handler runs in a separate thread, because that's
549
 * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
550
 * cancels on all active connections, and then return FALSE, which will allow
551
 * the process to die.  For safety's sake, we use a critical section to
552
 * protect the PGcancel structures against being changed while the signal
553
 * thread runs.
554
 */
555
556
#ifndef WIN32
557
558
/*
559
 * Signal handler (Unix only)
560
 */
561
static void
562
sigTermHandler(SIGNAL_ARGS)
563
0
{
564
0
  int     i;
565
0
  char    errbuf[1];
566
567
  /*
568
   * Some platforms allow delivery of new signals to interrupt an active
569
   * signal handler.  That could muck up our attempt to send PQcancel, so
570
   * disable the signals that setup_cancel_handler enabled.
571
   */
572
0
  pqsignal(SIGINT, SIG_IGN);
573
0
  pqsignal(SIGTERM, SIG_IGN);
574
0
  pqsignal(SIGQUIT, SIG_IGN);
575
576
  /*
577
   * If we're in the master, forward signal to all workers.  (It seems best
578
   * to do this before PQcancel; killing the master transaction will result
579
   * in invalid-snapshot errors from active workers, which maybe we can
580
   * quiet by killing workers first.)  Ignore any errors.
581
   */
582
0
  if (signal_info.pstate != NULL)
583
0
  {
584
0
    for (i = 0; i < signal_info.pstate->numWorkers; i++)
585
0
    {
586
0
      pid_t   pid = signal_info.pstate->parallelSlot[i].pid;
587
588
0
      if (pid != 0)
589
0
        kill(pid, SIGTERM);
590
0
    }
591
0
  }
592
593
  /*
594
   * Send QueryCancel if we have a connection to send to.  Ignore errors,
595
   * there's not much we can do about them anyway.
596
   */
597
0
  if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
598
0
    (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
599
600
  /*
601
   * Report we're quitting, using nothing more complicated than write(2).
602
   * When in parallel operation, only the master process should do this.
603
   */
604
0
  if (!signal_info.am_worker)
605
0
  {
606
0
    if (progname)
607
0
    {
608
0
      write_stderr(progname);
609
0
      write_stderr(": ");
610
0
    }
611
0
    write_stderr("terminated by user\n");
612
0
  }
613
614
  /* And die. */
615
0
  exit(1);
616
0
}
617
618
/*
619
 * Enable cancel interrupt handler, if not already done.
620
 */
621
static void
622
setup_cancel_handler(void)
623
0
{
624
  /*
625
   * When forking, signal_info.handler_set will propagate into the new
626
   * process, but that's fine because the signal handler state does too.
627
   */
628
0
  if (!signal_info.handler_set)
629
0
  {
630
0
    signal_info.handler_set = true;
631
632
0
    pqsignal(SIGINT, sigTermHandler);
633
0
    pqsignal(SIGTERM, sigTermHandler);
634
0
    pqsignal(SIGQUIT, sigTermHandler);
635
0
  }
636
0
}
637
638
#else             /* WIN32 */
639
640
/*
641
 * Console interrupt handler --- runs in a newly-started thread.
642
 *
643
 * After stopping other threads and sending cancel requests on all open
644
 * connections, we return FALSE which will allow the default ExitProcess()
645
 * action to be taken.
646
 */
647
static BOOL WINAPI
648
consoleHandler(DWORD dwCtrlType)
649
{
650
  int     i;
651
  char    errbuf[1];
652
653
  if (dwCtrlType == CTRL_C_EVENT ||
654
    dwCtrlType == CTRL_BREAK_EVENT)
655
  {
656
    /* Critical section prevents changing data we look at here */
657
    EnterCriticalSection(&signal_info_lock);
658
659
    /*
660
     * If in parallel mode, stop worker threads and send QueryCancel to
661
     * their connected backends.  The main point of stopping the worker
662
     * threads is to keep them from reporting the query cancels as errors,
663
     * which would clutter the user's screen.  We needn't stop the master
664
     * thread since it won't be doing much anyway.  Do this before
665
     * canceling the main transaction, else we might get invalid-snapshot
666
     * errors reported before we can stop the workers.  Ignore errors,
667
     * there's not much we can do about them anyway.
668
     */
669
    if (signal_info.pstate != NULL)
670
    {
671
      for (i = 0; i < signal_info.pstate->numWorkers; i++)
672
      {
673
        ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
674
        ArchiveHandle *AH = slot->AH;
675
        HANDLE    hThread = (HANDLE) slot->hThread;
676
677
        /*
678
         * Using TerminateThread here may leave some resources leaked,
679
         * but it doesn't matter since we're about to end the whole
680
         * process.
681
         */
682
        if (hThread != INVALID_HANDLE_VALUE)
683
          TerminateThread(hThread, 0);
684
685
        if (AH != NULL && AH->connCancel != NULL)
686
          (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
687
      }
688
    }
689
690
    /*
691
     * Send QueryCancel to master connection, if enabled.  Ignore errors,
692
     * there's not much we can do about them anyway.
693
     */
694
    if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
695
      (void) PQcancel(signal_info.myAH->connCancel,
696
              errbuf, sizeof(errbuf));
697
698
    LeaveCriticalSection(&signal_info_lock);
699
700
    /*
701
     * Report we're quitting, using nothing more complicated than
702
     * write(2).  (We might be able to get away with using write_msg()
703
     * here, but since we terminated other threads uncleanly above, it
704
     * seems better to assume as little as possible.)
705
     */
706
    if (progname)
707
    {
708
      write_stderr(progname);
709
      write_stderr(": ");
710
    }
711
    write_stderr("terminated by user\n");
712
  }
713
714
  /* Always return FALSE to allow signal handling to continue */
715
  return FALSE;
716
}
717
718
/*
719
 * Enable cancel interrupt handler, if not already done.
720
 */
721
static void
722
setup_cancel_handler(void)
723
{
724
  if (!signal_info.handler_set)
725
  {
726
    signal_info.handler_set = true;
727
728
    InitializeCriticalSection(&signal_info_lock);
729
730
    SetConsoleCtrlHandler(consoleHandler, TRUE);
731
  }
732
}
733
734
#endif              /* WIN32 */
735
736
737
/*
738
 * set_archive_cancel_info
739
 *
740
 * Fill AH->connCancel with cancellation info for the specified database
741
 * connection; or clear it if conn is NULL.
742
 */
743
void
744
set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
745
0
{
746
0
  PGcancel   *oldConnCancel;
747
748
  /*
749
   * Activate the interrupt handler if we didn't yet in this process.  On
750
   * Windows, this also initializes signal_info_lock; therefore it's
751
   * important that this happen at least once before we fork off any
752
   * threads.
753
   */
754
0
  setup_cancel_handler();
755
756
  /*
757
   * On Unix, we assume that storing a pointer value is atomic with respect
758
   * to any possible signal interrupt.  On Windows, use a critical section.
759
   */
760
761
#ifdef WIN32
762
  EnterCriticalSection(&signal_info_lock);
763
#endif
764
765
  /* Free the old one if we have one */
766
0
  oldConnCancel = AH->connCancel;
767
  /* be sure interrupt handler doesn't use pointer while freeing */
768
0
  AH->connCancel = NULL;
769
770
0
  if (oldConnCancel != NULL)
771
0
    PQfreeCancel(oldConnCancel);
772
773
  /* Set the new one if specified */
774
0
  if (conn)
775
0
    AH->connCancel = PQgetCancel(conn);
776
777
  /*
778
   * On Unix, there's only ever one active ArchiveHandle per process, so we
779
   * can just set signal_info.myAH unconditionally.  On Windows, do that
780
   * only in the main thread; worker threads have to make sure their
781
   * ArchiveHandle appears in the pstate data, which is dealt with in
782
   * RunWorker().
783
   */
784
0
#ifndef WIN32
785
0
  signal_info.myAH = AH;
786
#else
787
  if (mainThreadId == GetCurrentThreadId())
788
    signal_info.myAH = AH;
789
#endif
790
791
#ifdef WIN32
792
  LeaveCriticalSection(&signal_info_lock);
793
#endif
794
0
}
795
796
/*
797
 * set_cancel_pstate
798
 *
799
 * Set signal_info.pstate to point to the specified ParallelState, if any.
800
 * We need this mainly to have an interlock against Windows signal thread.
801
 */
802
static void
803
set_cancel_pstate(ParallelState *pstate)
804
0
{
805
#ifdef WIN32
806
  EnterCriticalSection(&signal_info_lock);
807
#endif
808
809
0
  signal_info.pstate = pstate;
810
811
#ifdef WIN32
812
  LeaveCriticalSection(&signal_info_lock);
813
#endif
814
0
}
815
816
/*
817
 * set_cancel_slot_archive
818
 *
819
 * Set ParallelSlot's AH field to point to the specified archive, if any.
820
 * We need this mainly to have an interlock against Windows signal thread.
821
 */
822
static void
823
set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
824
0
{
825
#ifdef WIN32
826
  EnterCriticalSection(&signal_info_lock);
827
#endif
828
829
0
  slot->AH = AH;
830
831
#ifdef WIN32
832
  LeaveCriticalSection(&signal_info_lock);
833
#endif
834
0
}
835
836
837
/*
838
 * This function is called by both Unix and Windows variants to set up
839
 * and run a worker process.  Caller should exit the process (or thread)
840
 * upon return.
841
 */
842
static void
843
RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
844
0
{
845
0
  int     pipefd[2];
846
847
  /* fetch child ends of pipes */
848
0
  pipefd[PIPE_READ] = slot->pipeRevRead;
849
0
  pipefd[PIPE_WRITE] = slot->pipeRevWrite;
850
851
  /*
852
   * Clone the archive so that we have our own state to work with, and in
853
   * particular our own database connection.
854
   *
855
   * We clone on Unix as well as Windows, even though technically we don't
856
   * need to because fork() gives us a copy in our own address space
857
   * already.  But CloneArchive resets the state information and also clones
858
   * the database connection which both seem kinda helpful.
859
   */
860
0
  AH = CloneArchive(AH);
861
862
  /* Remember cloned archive where signal handler can find it */
863
0
  set_cancel_slot_archive(slot, AH);
864
865
  /*
866
   * Call the setup worker function that's defined in the ArchiveHandle.
867
   */
868
0
  (AH->SetupWorkerPtr) ((Archive *) AH);
869
870
  /*
871
   * Execute commands until done.
872
   */
873
0
  WaitForCommands(AH, pipefd);
874
875
  /*
876
   * Disconnect from database and clean up.
877
   */
878
0
  set_cancel_slot_archive(slot, NULL);
879
0
  DisconnectDatabase(&(AH->public));
880
0
  DeCloneArchive(AH);
881
0
}
882
883
/*
884
 * Thread base function for Windows
885
 */
886
#ifdef WIN32
887
static unsigned __stdcall
888
init_spawned_worker_win32(WorkerInfo *wi)
889
{
890
  ArchiveHandle *AH = wi->AH;
891
  ParallelSlot *slot = wi->slot;
892
893
  /* Don't need WorkerInfo anymore */
894
  free(wi);
895
896
  /* Run the worker ... */
897
  RunWorker(AH, slot);
898
899
  /* Exit the thread */
900
  _endthreadex(0);
901
  return 0;
902
}
903
#endif              /* WIN32 */
904
905
/*
906
 * This function starts a parallel dump or restore by spawning off the worker
907
 * processes.  For Windows, it creates a number of threads; on Unix the
908
 * workers are created with fork().
909
 */
910
ParallelState *
911
ParallelBackupStart(ArchiveHandle *AH)
912
0
{
913
0
  ParallelState *pstate;
914
0
  int     i;
915
916
0
  Assert(AH->public.numWorkers > 0);
917
918
0
  pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
919
920
0
  pstate->numWorkers = AH->public.numWorkers;
921
0
  pstate->te = NULL;
922
0
  pstate->parallelSlot = NULL;
923
924
0
  if (AH->public.numWorkers == 1)
925
0
    return pstate;
926
927
0
  pstate->te = (TocEntry **)
928
0
    pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
929
0
  pstate->parallelSlot = (ParallelSlot *)
930
0
    pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
931
932
#ifdef WIN32
933
  /* Make fmtId() and fmtQualifiedId() use thread-local storage */
934
  getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
935
#endif
936
937
  /*
938
   * Set the pstate in shutdown_info, to tell the exit handler that it must
939
   * clean up workers as well as the main database connection.  But we don't
940
   * set this in signal_info yet, because we don't want child processes to
941
   * inherit non-NULL signal_info.pstate.
942
   */
943
0
  shutdown_info.pstate = pstate;
944
945
  /*
946
   * Temporarily disable query cancellation on the master connection.  This
947
   * ensures that child processes won't inherit valid AH->connCancel
948
   * settings and thus won't try to issue cancels against the master's
949
   * connection.  No harm is done if we fail while it's disabled, because
950
   * the master connection is idle at this point anyway.
951
   */
952
0
  set_archive_cancel_info(AH, NULL);
953
954
  /* Ensure stdio state is quiesced before forking */
955
0
  fflush(NULL);
956
957
  /* Create desired number of workers */
958
0
  for (i = 0; i < pstate->numWorkers; i++)
959
0
  {
960
#ifdef WIN32
961
    WorkerInfo *wi;
962
    uintptr_t handle;
963
#else
964
0
    pid_t   pid;
965
0
#endif
966
0
    ParallelSlot *slot = &(pstate->parallelSlot[i]);
967
0
    int     pipeMW[2],
968
0
          pipeWM[2];
969
970
    /* Create communication pipes for this worker */
971
0
    if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
972
0
      exit_horribly(modulename,
973
0
              "could not create communication channels: %s\n",
974
0
              strerror(errno));
975
976
0
    pstate->te[i] = NULL; /* just for safety */
977
978
0
    slot->workerStatus = WRKR_IDLE;
979
0
    slot->AH = NULL;
980
0
    slot->callback = NULL;
981
0
    slot->callback_data = NULL;
982
983
    /* master's ends of the pipes */
984
0
    slot->pipeRead = pipeWM[PIPE_READ];
985
0
    slot->pipeWrite = pipeMW[PIPE_WRITE];
986
    /* child's ends of the pipes */
987
0
    slot->pipeRevRead = pipeMW[PIPE_READ];
988
0
    slot->pipeRevWrite = pipeWM[PIPE_WRITE];
989
990
#ifdef WIN32
991
    /* Create transient structure to pass args to worker function */
992
    wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
993
994
    wi->AH = AH;
995
    wi->slot = slot;
996
997
    handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
998
                wi, 0, &(slot->threadId));
999
    slot->hThread = handle;
1000
#else             /* !WIN32 */
1001
0
    pid = fork();
1002
0
    if (pid == 0)
1003
0
    {
1004
      /* we are the worker */
1005
0
      int     j;
1006
1007
      /* this is needed for GetMyPSlot() */
1008
0
      slot->pid = getpid();
1009
1010
      /* instruct signal handler that we're in a worker now */
1011
0
      signal_info.am_worker = true;
1012
1013
      /* close read end of Worker -> Master */
1014
0
      closesocket(pipeWM[PIPE_READ]);
1015
      /* close write end of Master -> Worker */
1016
0
      closesocket(pipeMW[PIPE_WRITE]);
1017
1018
      /*
1019
       * Close all inherited fds for communication of the master with
1020
       * previously-forked workers.
1021
       */
1022
0
      for (j = 0; j < i; j++)
1023
0
      {
1024
0
        closesocket(pstate->parallelSlot[j].pipeRead);
1025
0
        closesocket(pstate->parallelSlot[j].pipeWrite);
1026
0
      }
1027
1028
      /* Run the worker ... */
1029
0
      RunWorker(AH, slot);
1030
1031
      /* We can just exit(0) when done */
1032
0
      exit(0);
1033
0
    }
1034
0
    else if (pid < 0)
1035
0
    {
1036
      /* fork failed */
1037
0
      exit_horribly(modulename,
1038
0
              "could not create worker process: %s\n",
1039
0
              strerror(errno));
1040
0
    }
1041
1042
    /* In Master after successful fork */
1043
0
    slot->pid = pid;
1044
1045
    /* close read end of Master -> Worker */
1046
0
    closesocket(pipeMW[PIPE_READ]);
1047
    /* close write end of Worker -> Master */
1048
0
    closesocket(pipeWM[PIPE_WRITE]);
1049
0
#endif              /* WIN32 */
1050
0
  }
1051
1052
  /*
1053
   * Having forked off the workers, disable SIGPIPE so that master isn't
1054
   * killed if it tries to send a command to a dead worker.  We don't want
1055
   * the workers to inherit this setting, though.
1056
   */
1057
0
#ifndef WIN32
1058
0
  pqsignal(SIGPIPE, SIG_IGN);
1059
0
#endif
1060
1061
  /*
1062
   * Re-establish query cancellation on the master connection.
1063
   */
1064
0
  set_archive_cancel_info(AH, AH->connection);
1065
1066
  /*
1067
   * Tell the cancel signal handler to forward signals to worker processes,
1068
   * too.  (As with query cancel, we did not need this earlier because the
1069
   * workers have not yet been given anything to do; if we die before this
1070
   * point, any already-started workers will see EOF and quit promptly.)
1071
   */
1072
0
  set_cancel_pstate(pstate);
1073
1074
0
  return pstate;
1075
0
}
1076
1077
/*
1078
 * Close down a parallel dump or restore.
1079
 */
1080
void
1081
ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1082
0
{
1083
0
  int     i;
1084
1085
  /* No work if non-parallel */
1086
0
  if (pstate->numWorkers == 1)
1087
0
    return;
1088
1089
  /* There should not be any unfinished jobs */
1090
0
  Assert(IsEveryWorkerIdle(pstate));
1091
1092
  /* Close the sockets so that the workers know they can exit */
1093
0
  for (i = 0; i < pstate->numWorkers; i++)
1094
0
  {
1095
0
    closesocket(pstate->parallelSlot[i].pipeRead);
1096
0
    closesocket(pstate->parallelSlot[i].pipeWrite);
1097
0
  }
1098
1099
  /* Wait for them to exit */
1100
0
  WaitForTerminatingWorkers(pstate);
1101
1102
  /*
1103
   * Unlink pstate from shutdown_info, so the exit handler will not try to
1104
   * use it; and likewise unlink from signal_info.
1105
   */
1106
0
  shutdown_info.pstate = NULL;
1107
0
  set_cancel_pstate(NULL);
1108
1109
  /* Release state (mere neatnik-ism, since we're about to terminate) */
1110
0
  free(pstate->te);
1111
0
  free(pstate->parallelSlot);
1112
0
  free(pstate);
1113
0
}
1114
1115
/*
1116
 * These next four functions handle construction and parsing of the command
1117
 * strings and response strings for parallel workers.
1118
 *
1119
 * Currently, these can be the same regardless of which archive format we are
1120
 * processing.  In future, we might want to let format modules override these
1121
 * functions to add format-specific data to a command or response.
1122
 */
1123
1124
/*
1125
 * buildWorkerCommand: format a command string to send to a worker.
1126
 *
1127
 * The string is built in the caller-supplied buffer of size buflen.
1128
 */
1129
static void
1130
buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1131
           char *buf, int buflen)
1132
0
{
1133
0
  if (act == ACT_DUMP)
1134
0
    snprintf(buf, buflen, "DUMP %d", te->dumpId);
1135
0
  else if (act == ACT_RESTORE)
1136
0
    snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1137
0
  else
1138
0
    Assert(false);
1139
0
}
1140
1141
/*
1142
 * parseWorkerCommand: interpret a command string in a worker.
1143
 */
1144
static void
1145
parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1146
           const char *msg)
1147
0
{
1148
0
  DumpId    dumpId;
1149
0
  int     nBytes;
1150
1151
0
  if (messageStartsWith(msg, "DUMP "))
1152
0
  {
1153
0
    *act = ACT_DUMP;
1154
0
    sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1155
0
    Assert(nBytes == strlen(msg));
1156
0
    *te = getTocEntryByDumpId(AH, dumpId);
1157
0
    Assert(*te != NULL);
1158
0
  }
1159
0
  else if (messageStartsWith(msg, "RESTORE "))
1160
0
  {
1161
0
    *act = ACT_RESTORE;
1162
0
    sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1163
0
    Assert(nBytes == strlen(msg));
1164
0
    *te = getTocEntryByDumpId(AH, dumpId);
1165
0
    Assert(*te != NULL);
1166
0
  }
1167
0
  else
1168
0
    exit_horribly(modulename,
1169
0
            "unrecognized command received from master: \"%s\"\n",
1170
0
            msg);
1171
0
}
1172
1173
/*
1174
 * buildWorkerResponse: format a response string to send to the master.
1175
 *
1176
 * The string is built in the caller-supplied buffer of size buflen.
1177
 */
1178
static void
1179
buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1180
          char *buf, int buflen)
1181
0
{
1182
0
  snprintf(buf, buflen, "OK %d %d %d",
1183
0
       te->dumpId,
1184
0
       status,
1185
0
       status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1186
0
}
1187
1188
/*
1189
 * parseWorkerResponse: parse the status message returned by a worker.
1190
 *
1191
 * Returns the integer status code, and may update fields of AH and/or te.
1192
 */
1193
static int
1194
parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1195
          const char *msg)
1196
0
{
1197
0
  DumpId    dumpId;
1198
0
  int     nBytes,
1199
0
        n_errors;
1200
0
  int     status = 0;
1201
1202
0
  if (messageStartsWith(msg, "OK "))
1203
0
  {
1204
0
    sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1205
1206
0
    Assert(dumpId == te->dumpId);
1207
0
    Assert(nBytes == strlen(msg));
1208
1209
0
    AH->public.n_errors += n_errors;
1210
0
  }
1211
0
  else
1212
0
    exit_horribly(modulename,
1213
0
            "invalid message received from worker: \"%s\"\n",
1214
0
            msg);
1215
1216
0
  return status;
1217
0
}
1218
1219
/*
1220
 * Dispatch a job to some free worker.
1221
 *
1222
 * te is the TocEntry to be processed, act is the action to be taken on it.
1223
 * callback is the function to call on completion of the job.
1224
 *
1225
 * If no worker is currently available, this will block, and previously
1226
 * registered callback functions may be called.
1227
 */
1228
void
1229
DispatchJobForTocEntry(ArchiveHandle *AH,
1230
             ParallelState *pstate,
1231
             TocEntry *te,
1232
             T_Action act,
1233
             ParallelCompletionPtr callback,
1234
             void *callback_data)
1235
0
{
1236
0
  int     worker;
1237
0
  char    buf[256];
1238
1239
  /* Get a worker, waiting if none are idle */
1240
0
  while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1241
0
    WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1242
1243
  /* Construct and send command string */
1244
0
  buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1245
1246
0
  sendMessageToWorker(pstate, worker, buf);
1247
1248
  /* Remember worker is busy, and which TocEntry it's working on */
1249
0
  pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1250
0
  pstate->parallelSlot[worker].callback = callback;
1251
0
  pstate->parallelSlot[worker].callback_data = callback_data;
1252
0
  pstate->te[worker] = te;
1253
0
}
1254
1255
/*
1256
 * Find an idle worker and return its slot number.
1257
 * Return NO_SLOT if none are idle.
1258
 */
1259
static int
1260
GetIdleWorker(ParallelState *pstate)
1261
0
{
1262
0
  int     i;
1263
1264
0
  for (i = 0; i < pstate->numWorkers; i++)
1265
0
  {
1266
0
    if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1267
0
      return i;
1268
0
  }
1269
0
  return NO_SLOT;
1270
0
}
1271
1272
/*
1273
 * Return true iff every worker is in the WRKR_TERMINATED state.
1274
 */
1275
static bool
1276
HasEveryWorkerTerminated(ParallelState *pstate)
1277
0
{
1278
0
  int     i;
1279
1280
0
  for (i = 0; i < pstate->numWorkers; i++)
1281
0
  {
1282
0
    if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
1283
0
      return false;
1284
0
  }
1285
0
  return true;
1286
0
}
1287
1288
/*
1289
 * Return true iff every worker is in the WRKR_IDLE state.
1290
 */
1291
bool
1292
IsEveryWorkerIdle(ParallelState *pstate)
1293
0
{
1294
0
  int     i;
1295
1296
0
  for (i = 0; i < pstate->numWorkers; i++)
1297
0
  {
1298
0
    if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1299
0
      return false;
1300
0
  }
1301
0
  return true;
1302
0
}
1303
1304
/*
1305
 * Acquire lock on a table to be dumped by a worker process.
1306
 *
1307
 * The master process is already holding an ACCESS SHARE lock.  Ordinarily
1308
 * it's no problem for a worker to get one too, but if anything else besides
1309
 * pg_dump is running, there's a possible deadlock:
1310
 *
1311
 * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
1312
 * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1313
 *    because the master holds a conflicting ACCESS SHARE lock).
1314
 * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1315
 *    The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1316
 * 4) Now we have a deadlock, since the master is effectively waiting for
1317
 *    the worker.  The server cannot detect that, however.
1318
 *
1319
 * To prevent an infinite wait, prior to touching a table in a worker, request
1320
 * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
1321
 * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1322
 * so we have a deadlock.  We must fail the backup in that case.
1323
 */
1324
static void
1325
lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1326
0
{
1327
0
  const char *qualId;
1328
0
  PQExpBuffer query;
1329
0
  PGresult   *res;
1330
1331
  /* Nothing to do for BLOBS */
1332
0
  if (strcmp(te->desc, "BLOBS") == 0)
1333
0
    return;
1334
1335
0
  query = createPQExpBuffer();
1336
1337
0
  qualId = fmtQualifiedId(te->namespace, te->tag);
1338
1339
0
  appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1340
0
            qualId);
1341
1342
0
  res = PQexec(AH->connection, query->data);
1343
1344
0
  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1345
0
    exit_horribly(modulename,
1346
0
            "could not obtain lock on relation \"%s\"\n"
1347
0
            "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1348
0
            "on the table after the ysql_dump parent process had gotten the "
1349
0
            "initial ACCESS SHARE lock on the table.\n", qualId);
1350
1351
0
  PQclear(res);
1352
0
  destroyPQExpBuffer(query);
1353
0
}
1354
1355
/*
1356
 * WaitForCommands: main routine for a worker process.
1357
 *
1358
 * Read and execute commands from the master until we see EOF on the pipe.
1359
 */
1360
static void
1361
WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1362
0
{
1363
0
  char     *command;
1364
0
  TocEntry   *te;
1365
0
  T_Action  act;
1366
0
  int     status = 0;
1367
0
  char    buf[256];
1368
1369
0
  for (;;)
1370
0
  {
1371
0
    if (!(command = getMessageFromMaster(pipefd)))
1372
0
    {
1373
      /* EOF, so done */
1374
0
      return;
1375
0
    }
1376
1377
    /* Decode the command */
1378
0
    parseWorkerCommand(AH, &te, &act, command);
1379
1380
0
    if (act == ACT_DUMP)
1381
0
    {
1382
      /* Acquire lock on this table within the worker's session */
1383
0
      lockTableForWorker(AH, te);
1384
1385
      /* Perform the dump command */
1386
0
      status = (AH->WorkerJobDumpPtr) (AH, te);
1387
0
    }
1388
0
    else if (act == ACT_RESTORE)
1389
0
    {
1390
      /* Perform the restore command */
1391
0
      status = (AH->WorkerJobRestorePtr) (AH, te);
1392
0
    }
1393
0
    else
1394
0
      Assert(false);
1395
1396
    /* Return status to master */
1397
0
    buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1398
1399
0
    sendMessageToMaster(pipefd, buf);
1400
1401
    /* command was pg_malloc'd and we are responsible for free()ing it. */
1402
0
    free(command);
1403
0
  }
1404
0
}
1405
1406
/*
1407
 * Check for status messages from workers.
1408
 *
1409
 * If do_wait is true, wait to get a status message; otherwise, just return
1410
 * immediately if there is none available.
1411
 *
1412
 * When we get a status message, we pass the status code to the callback
1413
 * function that was specified to DispatchJobForTocEntry, then reset the
1414
 * worker status to IDLE.
1415
 *
1416
 * Returns true if we collected a status message, else false.
1417
 *
1418
 * XXX is it worth checking for more than one status message per call?
1419
 * It seems somewhat unlikely that multiple workers would finish at exactly
1420
 * the same time.
1421
 */
1422
static bool
1423
ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1424
0
{
1425
0
  int     worker;
1426
0
  char     *msg;
1427
1428
  /* Try to collect a status message */
1429
0
  msg = getMessageFromWorker(pstate, do_wait, &worker);
1430
1431
0
  if (!msg)
1432
0
  {
1433
    /* If do_wait is true, we must have detected EOF on some socket */
1434
0
    if (do_wait)
1435
0
      exit_horribly(modulename, "a worker process died unexpectedly\n");
1436
0
    return false;
1437
0
  }
1438
1439
  /* Process it and update our idea of the worker's status */
1440
0
  if (messageStartsWith(msg, "OK "))
1441
0
  {
1442
0
    ParallelSlot *slot = &pstate->parallelSlot[worker];
1443
0
    TocEntry   *te = pstate->te[worker];
1444
0
    int     status;
1445
1446
0
    status = parseWorkerResponse(AH, te, msg);
1447
0
    slot->callback(AH, te, status, slot->callback_data);
1448
0
    slot->workerStatus = WRKR_IDLE;
1449
0
    pstate->te[worker] = NULL;
1450
0
  }
1451
0
  else
1452
0
    exit_horribly(modulename,
1453
0
            "invalid message received from worker: \"%s\"\n",
1454
0
            msg);
1455
1456
  /* Free the string returned from getMessageFromWorker */
1457
0
  free(msg);
1458
1459
0
  return true;
1460
0
}
1461
1462
/*
1463
 * Check for status results from workers, waiting if necessary.
1464
 *
1465
 * Available wait modes are:
1466
 * WFW_NO_WAIT: reap any available status, but don't block
1467
 * WFW_GOT_STATUS: wait for at least one more worker to finish
1468
 * WFW_ONE_IDLE: wait for at least one worker to be idle
1469
 * WFW_ALL_IDLE: wait for all workers to be idle
1470
 *
1471
 * Any received results are passed to the callback specified to
1472
 * DispatchJobForTocEntry.
1473
 *
1474
 * This function is executed in the master process.
1475
 */
1476
void
1477
WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1478
0
{
1479
0
  bool    do_wait = false;
1480
1481
  /*
1482
   * In GOT_STATUS mode, always block waiting for a message, since we can't
1483
   * return till we get something.  In other modes, we don't block the first
1484
   * time through the loop.
1485
   */
1486
0
  if (mode == WFW_GOT_STATUS)
1487
0
  {
1488
    /* Assert that caller knows what it's doing */
1489
0
    Assert(!IsEveryWorkerIdle(pstate));
1490
0
    do_wait = true;
1491
0
  }
1492
1493
0
  for (;;)
1494
0
  {
1495
    /*
1496
     * Check for status messages, even if we don't need to block.  We do
1497
     * not try very hard to reap all available messages, though, since
1498
     * there's unlikely to be more than one.
1499
     */
1500
0
    if (ListenToWorkers(AH, pstate, do_wait))
1501
0
    {
1502
      /*
1503
       * If we got a message, we are done by definition for GOT_STATUS
1504
       * mode, and we can also be certain that there's at least one idle
1505
       * worker.  So we're done in all but ALL_IDLE mode.
1506
       */
1507
0
      if (mode != WFW_ALL_IDLE)
1508
0
        return;
1509
0
    }
1510
1511
    /* Check whether we must wait for new status messages */
1512
0
    switch (mode)
1513
0
    {
1514
0
      case WFW_NO_WAIT:
1515
0
        return;     /* never wait */
1516
0
      case WFW_GOT_STATUS:
1517
0
        Assert(false);  /* can't get here, because we waited */
1518
0
        break;
1519
0
      case WFW_ONE_IDLE:
1520
0
        if (GetIdleWorker(pstate) != NO_SLOT)
1521
0
          return;
1522
0
        break;
1523
0
      case WFW_ALL_IDLE:
1524
0
        if (IsEveryWorkerIdle(pstate))
1525
0
          return;
1526
0
        break;
1527
0
    }
1528
1529
    /* Loop back, and this time wait for something to happen */
1530
0
    do_wait = true;
1531
0
  }
1532
0
}
1533
1534
/*
1535
 * Read one command message from the master, blocking if necessary
1536
 * until one is available, and return it as a malloc'd string.
1537
 * On EOF, return NULL.
1538
 *
1539
 * This function is executed in worker processes.
1540
 */
1541
static char *
1542
getMessageFromMaster(int pipefd[2])
1543
0
{
1544
0
  return readMessageFromPipe(pipefd[PIPE_READ]);
1545
0
}
1546
1547
/*
1548
 * Send a status message to the master.
1549
 *
1550
 * This function is executed in worker processes.
1551
 */
1552
static void
1553
sendMessageToMaster(int pipefd[2], const char *str)
1554
0
{
1555
0
  int     len = strlen(str) + 1;
1556
1557
0
  if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1558
0
    exit_horribly(modulename,
1559
0
            "could not write to the communication channel: %s\n",
1560
0
            strerror(errno));
1561
0
}
1562
1563
/*
1564
 * Wait until some descriptor in "workerset" becomes readable.
1565
 * Returns -1 on error, else the number of readable descriptors.
1566
 */
1567
static int
1568
select_loop(int maxFd, fd_set *workerset)
1569
0
{
1570
0
  int     i;
1571
0
  fd_set    saveSet = *workerset;
1572
1573
0
  for (;;)
1574
0
  {
1575
0
    *workerset = saveSet;
1576
0
    i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1577
1578
0
#ifndef WIN32
1579
0
    if (i < 0 && errno == EINTR)
1580
0
      continue;
1581
#else
1582
    if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1583
      continue;
1584
#endif
1585
0
    break;
1586
0
  }
1587
1588
0
  return i;
1589
0
}
1590
1591
1592
/*
1593
 * Check for messages from worker processes.
1594
 *
1595
 * If a message is available, return it as a malloc'd string, and put the
1596
 * index of the sending worker in *worker.
1597
 *
1598
 * If nothing is available, wait if "do_wait" is true, else return NULL.
1599
 *
1600
 * If we detect EOF on any socket, we'll return NULL.  It's not great that
1601
 * that's hard to distinguish from the no-data-available case, but for now
1602
 * our one caller is okay with that.
1603
 *
1604
 * This function is executed in the master process.
1605
 */
1606
static char *
1607
getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1608
0
{
1609
0
  int     i;
1610
0
  fd_set    workerset;
1611
0
  int     maxFd = -1;
1612
0
  struct timeval nowait = {0, 0};
1613
1614
  /* construct bitmap of socket descriptors for select() */
1615
0
  FD_ZERO(&workerset);
1616
0
  for (i = 0; i < pstate->numWorkers; i++)
1617
0
  {
1618
0
    if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
1619
0
      continue;
1620
0
    FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1621
0
    if (pstate->parallelSlot[i].pipeRead > maxFd)
1622
0
      maxFd = pstate->parallelSlot[i].pipeRead;
1623
0
  }
1624
1625
0
  if (do_wait)
1626
0
  {
1627
0
    i = select_loop(maxFd, &workerset);
1628
0
    Assert(i != 0);
1629
0
  }
1630
0
  else
1631
0
  {
1632
0
    if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1633
0
      return NULL;
1634
0
  }
1635
1636
0
  if (i < 0)
1637
0
    exit_horribly(modulename, "select() failed: %s\n", strerror(errno));
1638
1639
0
  for (i = 0; i < pstate->numWorkers; i++)
1640
0
  {
1641
0
    char     *msg;
1642
1643
0
    if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1644
0
      continue;
1645
1646
    /*
1647
     * Read the message if any.  If the socket is ready because of EOF,
1648
     * we'll return NULL instead (and the socket will stay ready, so the
1649
     * condition will persist).
1650
     *
1651
     * Note: because this is a blocking read, we'll wait if only part of
1652
     * the message is available.  Waiting a long time would be bad, but
1653
     * since worker status messages are short and are always sent in one
1654
     * operation, it shouldn't be a problem in practice.
1655
     */
1656
0
    msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1657
0
    *worker = i;
1658
0
    return msg;
1659
0
  }
1660
0
  Assert(false);
1661
0
  return NULL;
1662
0
}
1663
1664
/*
1665
 * Send a command message to the specified worker process.
1666
 *
1667
 * This function is executed in the master process.
1668
 */
1669
static void
1670
sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1671
0
{
1672
0
  int     len = strlen(str) + 1;
1673
1674
0
  if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1675
0
  {
1676
0
    exit_horribly(modulename,
1677
0
            "could not write to the communication channel: %s\n",
1678
0
            strerror(errno));
1679
0
  }
1680
0
}
1681
1682
/*
1683
 * Read one message from the specified pipe (fd), blocking if necessary
1684
 * until one is available, and return it as a malloc'd string.
1685
 * On EOF, return NULL.
1686
 *
1687
 * A "message" on the channel is just a null-terminated string.
1688
 */
1689
static char *
1690
readMessageFromPipe(int fd)
1691
0
{
1692
0
  char     *msg;
1693
0
  int     msgsize,
1694
0
        bufsize;
1695
0
  int     ret;
1696
1697
  /*
1698
   * In theory, if we let piperead() read multiple bytes, it might give us
1699
   * back fragments of multiple messages.  (That can't actually occur, since
1700
   * neither master nor workers send more than one message without waiting
1701
   * for a reply, but we don't wish to assume that here.)  For simplicity,
1702
   * read a byte at a time until we get the terminating '\0'.  This method
1703
   * is a bit inefficient, but since this is only used for relatively short
1704
   * command and status strings, it shouldn't matter.
1705
   */
1706
0
  bufsize = 64;       /* could be any number */
1707
0
  msg = (char *) pg_malloc(bufsize);
1708
0
  msgsize = 0;
1709
0
  for (;;)
1710
0
  {
1711
0
    Assert(msgsize < bufsize);
1712
0
    ret = piperead(fd, msg + msgsize, 1);
1713
0
    if (ret <= 0)
1714
0
      break;       /* error or connection closure */
1715
1716
0
    Assert(ret == 1);
1717
1718
0
    if (msg[msgsize] == '\0')
1719
0
      return msg;     /* collected whole message */
1720
1721
0
    msgsize++;
1722
0
    if (msgsize == bufsize) /* enlarge buffer if needed */
1723
0
    {
1724
0
      bufsize += 16;    /* could be any number */
1725
0
      msg = (char *) pg_realloc(msg, bufsize);
1726
0
    }
1727
0
  }
1728
1729
  /* Other end has closed the connection */
1730
0
  pg_free(msg);
1731
0
  return NULL;
1732
0
}
1733
1734
#ifdef WIN32
1735
1736
/*
1737
 * This is a replacement version of pipe(2) for Windows which allows the pipe
1738
 * handles to be used in select().
1739
 *
1740
 * Reads and writes on the pipe must go through piperead()/pipewrite().
1741
 *
1742
 * For consistency with Unix we declare the returned handles as "int".
1743
 * This is okay even on WIN64 because system handles are not more than
1744
 * 32 bits wide, but we do have to do some casting.
1745
 */
1746
static int
1747
pgpipe(int handles[2])
1748
{
1749
  pgsocket  s,
1750
        tmp_sock;
1751
  struct sockaddr_in serv_addr;
1752
  int     len = sizeof(serv_addr);
1753
1754
  /* We have to use the Unix socket invalid file descriptor value here. */
1755
  handles[0] = handles[1] = -1;
1756
1757
  /*
1758
   * setup listen socket
1759
   */
1760
  if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1761
  {
1762
    write_msg(modulename, "pgpipe: could not create socket: error code %d\n",
1763
          WSAGetLastError());
1764
    return -1;
1765
  }
1766
1767
  memset((void *) &serv_addr, 0, sizeof(serv_addr));
1768
  serv_addr.sin_family = AF_INET;
1769
  serv_addr.sin_port = pg_hton16(0);
1770
  serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1771
  if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1772
  {
1773
    write_msg(modulename, "pgpipe: could not bind: error code %d\n",
1774
          WSAGetLastError());
1775
    closesocket(s);
1776
    return -1;
1777
  }
1778
  if (listen(s, 1) == SOCKET_ERROR)
1779
  {
1780
    write_msg(modulename, "pgpipe: could not listen: error code %d\n",
1781
          WSAGetLastError());
1782
    closesocket(s);
1783
    return -1;
1784
  }
1785
  if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1786
  {
1787
    write_msg(modulename, "pgpipe: getsockname() failed: error code %d\n",
1788
          WSAGetLastError());
1789
    closesocket(s);
1790
    return -1;
1791
  }
1792
1793
  /*
1794
   * setup pipe handles
1795
   */
1796
  if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1797
  {
1798
    write_msg(modulename, "pgpipe: could not create second socket: error code %d\n",
1799
          WSAGetLastError());
1800
    closesocket(s);
1801
    return -1;
1802
  }
1803
  handles[1] = (int) tmp_sock;
1804
1805
  if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1806
  {
1807
    write_msg(modulename, "pgpipe: could not connect socket: error code %d\n",
1808
          WSAGetLastError());
1809
    closesocket(handles[1]);
1810
    handles[1] = -1;
1811
    closesocket(s);
1812
    return -1;
1813
  }
1814
  if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1815
  {
1816
    write_msg(modulename, "pgpipe: could not accept connection: error code %d\n",
1817
          WSAGetLastError());
1818
    closesocket(handles[1]);
1819
    handles[1] = -1;
1820
    closesocket(s);
1821
    return -1;
1822
  }
1823
  handles[0] = (int) tmp_sock;
1824
1825
  closesocket(s);
1826
  return 0;
1827
}
1828
1829
/*
1830
 * Windows implementation of reading from a pipe.
1831
 */
1832
static int
1833
piperead(int s, char *buf, int len)
1834
{
1835
  int     ret = recv(s, buf, len, 0);
1836
1837
  if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
1838
  {
1839
    /* EOF on the pipe! */
1840
    ret = 0;
1841
  }
1842
  return ret;
1843
}
1844
1845
#endif              /* WIN32 */