YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/bin/scripts/vacuumdb.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * vacuumdb
4
 *
5
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
6
 * Portions Copyright (c) 1994, Regents of the University of California
7
 *
8
 * src/bin/scripts/vacuumdb.c
9
 *
10
 *-------------------------------------------------------------------------
11
 */
12
13
#include "postgres_fe.h"
14
15
#ifdef HAVE_SYS_SELECT_H
16
#include <sys/select.h>
17
#endif
18
19
#include "catalog/pg_class_d.h"
20
21
#include "common.h"
22
#include "fe_utils/simple_list.h"
23
#include "fe_utils/string_utils.h"
24
25
26
0
#define ERRCODE_UNDEFINED_TABLE  "42P01"
27
28
/* Parallel vacuuming stuff */
29
typedef struct ParallelSlot
30
{
31
  PGconn     *connection;   /* One connection */
32
  bool    isFree;     /* Is it known to be idle? */
33
} ParallelSlot;
34
35
/* vacuum options controlled by user flags */
36
typedef struct vacuumingOptions
37
{
38
  bool    analyze_only;
39
  bool    verbose;
40
  bool    and_analyze;
41
  bool    full;
42
  bool    freeze;
43
} vacuumingOptions;
44
45
46
static void vacuum_one_database(const ConnParams *cparams,
47
                vacuumingOptions *vacopts,
48
          int stage,
49
          SimpleStringList *tables,
50
          int concurrentCons,
51
          const char *progname, bool echo, bool quiet);
52
53
static void vacuum_all_databases(ConnParams *cparams,
54
                 vacuumingOptions *vacopts,
55
           bool analyze_in_stages,
56
           int concurrentCons,
57
           const char *progname, bool echo, bool quiet);
58
59
static void prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
60
             vacuumingOptions *vacopts, const char *table,
61
             bool table_pre_qualified,
62
             const char *progname, bool echo);
63
64
static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
65
           const char *table, const char *progname, bool async);
66
67
static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
68
      const char *progname);
69
70
static bool ProcessQueryResult(PGconn *conn, PGresult *result,
71
           const char *progname);
72
73
static bool GetQueryResult(PGconn *conn, const char *progname);
74
75
static void DisconnectDatabase(ParallelSlot *slot);
76
77
static int  select_loop(int maxFd, fd_set *workerset, bool *aborting);
78
79
static void init_slot(ParallelSlot *slot, PGconn *conn);
80
81
static void help(const char *progname);
82
83
/* For analyze-in-stages mode */
84
0
#define ANALYZE_NO_STAGE  -1
85
0
#define ANALYZE_NUM_STAGES  3
86
87
88
int
89
main(int argc, char *argv[])
90
{
91
  static struct option long_options[] = {
92
    {"host", required_argument, NULL, 'h'},
93
    {"port", required_argument, NULL, 'p'},
94
    {"username", required_argument, NULL, 'U'},
95
    {"no-password", no_argument, NULL, 'w'},
96
    {"password", no_argument, NULL, 'W'},
97
    {"echo", no_argument, NULL, 'e'},
98
    {"quiet", no_argument, NULL, 'q'},
99
    {"dbname", required_argument, NULL, 'd'},
100
    {"analyze", no_argument, NULL, 'z'},
101
    {"analyze-only", no_argument, NULL, 'Z'},
102
    {"freeze", no_argument, NULL, 'F'},
103
    {"all", no_argument, NULL, 'a'},
104
    {"table", required_argument, NULL, 't'},
105
    {"full", no_argument, NULL, 'f'},
106
    {"verbose", no_argument, NULL, 'v'},
107
    {"jobs", required_argument, NULL, 'j'},
108
    {"maintenance-db", required_argument, NULL, 2},
109
    {"analyze-in-stages", no_argument, NULL, 3},
110
    {NULL, 0, NULL, 0}
111
  };
112
113
  const char *progname;
114
  int     optindex;
115
  int     c;
116
  const char *dbname = NULL;
117
  const char *maintenance_db = NULL;
118
  char     *host = NULL;
119
  char     *port = NULL;
120
  char     *username = NULL;
121
  enum trivalue prompt_password = TRI_DEFAULT;
122
  ConnParams  cparams;
123
  bool    echo = false;
124
  bool    quiet = false;
125
  vacuumingOptions vacopts;
126
  bool    analyze_in_stages = false;
127
  bool    alldb = false;
128
  SimpleStringList tables = {NULL, NULL};
129
  int     concurrentCons = 1;
130
  int     tbl_count = 0;
131
132
  /* initialize options to all false */
133
  memset(&vacopts, 0, sizeof(vacopts));
134
135
  progname = get_progname(argv[0]);
136
137
  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
138
139
  handle_help_version_opts(argc, argv, "vacuumdb", help);
140
141
  while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
142
  {
143
    switch (c)
144
    {
145
      case 'h':
146
        host = pg_strdup(optarg);
147
        break;
148
      case 'p':
149
        port = pg_strdup(optarg);
150
        break;
151
      case 'U':
152
        username = pg_strdup(optarg);
153
        break;
154
      case 'w':
155
        prompt_password = TRI_NO;
156
        break;
157
      case 'W':
158
        prompt_password = TRI_YES;
159
        break;
160
      case 'e':
161
        echo = true;
162
        break;
163
      case 'q':
164
        quiet = true;
165
        break;
166
      case 'd':
167
        dbname = pg_strdup(optarg);
168
        break;
169
      case 'z':
170
        vacopts.and_analyze = true;
171
        break;
172
      case 'Z':
173
        vacopts.analyze_only = true;
174
        break;
175
      case 'F':
176
        vacopts.freeze = true;
177
        break;
178
      case 'a':
179
        alldb = true;
180
        break;
181
      case 't':
182
        {
183
          simple_string_list_append(&tables, optarg);
184
          tbl_count++;
185
          break;
186
        }
187
      case 'f':
188
        vacopts.full = true;
189
        break;
190
      case 'v':
191
        vacopts.verbose = true;
192
        break;
193
      case 'j':
194
        concurrentCons = atoi(optarg);
195
        if (concurrentCons <= 0)
196
        {
197
          fprintf(stderr, _("%s: number of parallel jobs must be at least 1\n"),
198
              progname);
199
          exit(1);
200
        }
201
        if (concurrentCons > FD_SETSIZE - 1)
202
        {
203
          fprintf(stderr, _("%s: too many parallel jobs requested (maximum: %d)\n"),
204
              progname, FD_SETSIZE - 1);
205
          exit(1);
206
        }
207
        break;
208
      case 2:
209
        maintenance_db = pg_strdup(optarg);
210
        break;
211
      case 3:
212
        analyze_in_stages = vacopts.analyze_only = true;
213
        break;
214
      default:
215
        fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
216
        exit(1);
217
    }
218
  }
219
220
  /*
221
   * Non-option argument specifies database name as long as it wasn't
222
   * already specified with -d / --dbname
223
   */
224
  if (optind < argc && dbname == NULL)
225
  {
226
    dbname = argv[optind];
227
    optind++;
228
  }
229
230
  if (optind < argc)
231
  {
232
    fprintf(stderr, _("%s: too many command-line arguments (first is \"%s\")\n"),
233
        progname, argv[optind]);
234
    fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
235
    exit(1);
236
  }
237
238
  if (vacopts.analyze_only)
239
  {
240
    if (vacopts.full)
241
    {
242
      fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
243
          progname, "full");
244
      exit(1);
245
    }
246
    if (vacopts.freeze)
247
    {
248
      fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
249
          progname, "freeze");
250
      exit(1);
251
    }
252
    /* allow 'and_analyze' with 'analyze_only' */
253
  }
254
255
  /* fill cparams except for dbname, which is set below */
256
  cparams.pghost = host;
257
  cparams.pgport = port;
258
  cparams.pguser = username;
259
  cparams.prompt_password = prompt_password;
260
  cparams.override_dbname = NULL;
261
262
  setup_cancel_handler();
263
264
  /* Avoid opening extra connections. */
265
  if (tbl_count && (concurrentCons > tbl_count))
266
    concurrentCons = tbl_count;
267
268
  if (alldb)
269
  {
270
    if (dbname)
271
    {
272
      fprintf(stderr, _("%s: cannot vacuum all databases and a specific one at the same time\n"),
273
          progname);
274
      exit(1);
275
    }
276
    if (tables.head != NULL)
277
    {
278
      fprintf(stderr, _("%s: cannot vacuum specific table(s) in all databases\n"),
279
          progname);
280
      exit(1);
281
    }
282
283
    cparams.dbname = maintenance_db;
284
285
    vacuum_all_databases(&cparams, &vacopts,
286
               analyze_in_stages,
287
               concurrentCons,
288
               progname, echo, quiet);
289
  }
290
  else
291
  {
292
    if (dbname == NULL)
293
    {
294
      if (getenv("PGDATABASE"))
295
        dbname = getenv("PGDATABASE");
296
      else if (getenv("PGUSER"))
297
        dbname = getenv("PGUSER");
298
      else
299
        dbname = get_user_name_or_exit(progname);
300
    }
301
302
    cparams.dbname = dbname;
303
304
    if (analyze_in_stages)
305
    {
306
      int     stage;
307
308
      for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
309
      {
310
        vacuum_one_database(&cparams, &vacopts,
311
                  stage,
312
                  &tables,
313
                  concurrentCons,
314
                  progname, echo, quiet);
315
      }
316
    }
317
    else
318
      vacuum_one_database(&cparams, &vacopts,
319
                ANALYZE_NO_STAGE,
320
                &tables,
321
                concurrentCons,
322
                progname, echo, quiet);
323
  }
324
325
  exit(0);
326
}
327
328
/*
329
 * vacuum_one_database
330
 *
331
 * Process tables in the given database.  If the 'tables' list is empty,
332
 * process all tables in the database.
333
 *
334
 * Note that this function is only concerned with running exactly one stage
335
 * when in analyze-in-stages mode; caller must iterate on us if necessary.
336
 *
337
 * If concurrentCons is > 1, multiple connections are used to vacuum tables
338
 * in parallel.  In this case and if the table list is empty, we first obtain
339
 * a list of tables from the database.
340
 */
341
static void
342
vacuum_one_database(const ConnParams *cparams,
343
          vacuumingOptions *vacopts,
344
          int stage,
345
          SimpleStringList *tables,
346
          int concurrentCons,
347
          const char *progname, bool echo, bool quiet)
348
0
{
349
0
  PQExpBufferData sql;
350
0
  PGconn     *conn;
351
0
  SimpleStringListCell *cell;
352
0
  ParallelSlot *slots;
353
0
  SimpleStringList dbtables = {NULL, NULL};
354
0
  int     i;
355
0
  bool    failed = false;
356
0
  bool    parallel = concurrentCons > 1;
357
0
  const char *stage_commands[] = {
358
0
    "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
359
0
    "SET default_statistics_target=10; RESET vacuum_cost_delay;",
360
0
    "RESET default_statistics_target;"
361
0
  };
362
0
  const char *stage_messages[] = {
363
0
    gettext_noop("Generating minimal optimizer statistics (1 target)"),
364
0
    gettext_noop("Generating medium optimizer statistics (10 targets)"),
365
0
    gettext_noop("Generating default (full) optimizer statistics")
366
0
  };
367
368
0
  Assert(stage == ANALYZE_NO_STAGE ||
369
0
       (stage >= 0 && stage < ANALYZE_NUM_STAGES));
370
371
0
  conn = connectDatabase(cparams, progname, echo, false, true);
372
373
0
  if (!quiet)
374
0
  {
375
0
    if (stage != ANALYZE_NO_STAGE)
376
0
      printf(_("%s: processing database \"%s\": %s\n"),
377
0
           progname, PQdb(conn), _(stage_messages[stage]));
378
0
    else
379
0
      printf(_("%s: vacuuming database \"%s\"\n"),
380
0
           progname, PQdb(conn));
381
0
    fflush(stdout);
382
0
  }
383
384
0
  initPQExpBuffer(&sql);
385
386
  /*
387
   * If a table list is not provided and we're using multiple connections,
388
   * prepare the list of tables by querying the catalogs.
389
   */
390
0
  if (parallel && (!tables || !tables->head))
391
0
  {
392
0
    PQExpBufferData buf;
393
0
    PGresult   *res;
394
0
    int     ntups;
395
396
0
    initPQExpBuffer(&buf);
397
398
0
    res = executeQuery(conn,
399
0
               "SELECT c.relname, ns.nspname"
400
0
               " FROM pg_class c, pg_namespace ns\n"
401
0
               " WHERE relkind IN ("
402
0
               CppAsString2(RELKIND_RELATION) ", "
403
0
               CppAsString2(RELKIND_MATVIEW) ")"
404
0
               " AND c.relnamespace = ns.oid\n"
405
0
               " ORDER BY c.relpages DESC;",
406
0
               progname, echo);
407
408
0
    ntups = PQntuples(res);
409
0
    for (i = 0; i < ntups; i++)
410
0
    {
411
0
      appendPQExpBufferStr(&buf,
412
0
                 fmtQualifiedId(PQgetvalue(res, i, 1),
413
0
                        PQgetvalue(res, i, 0)));
414
415
0
      simple_string_list_append(&dbtables, buf.data);
416
0
      resetPQExpBuffer(&buf);
417
0
    }
418
419
0
    termPQExpBuffer(&buf);
420
0
    tables = &dbtables;
421
422
    /*
423
     * If there are more connections than vacuumable relations, we don't
424
     * need to use them all.
425
     */
426
0
    if (concurrentCons > ntups)
427
0
      concurrentCons = ntups;
428
0
    if (concurrentCons <= 1)
429
0
      parallel = false;
430
0
    PQclear(res);
431
0
  }
432
433
  /*
434
   * Setup the database connections. We reuse the connection we already have
435
   * for the first slot.  If not in parallel mode, the first slot in the
436
   * array contains the connection.
437
   */
438
0
  if (concurrentCons <= 0)
439
0
    concurrentCons = 1;
440
0
  slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
441
0
  init_slot(slots, conn);
442
0
  if (parallel)
443
0
  {
444
0
    for (i = 1; i < concurrentCons; i++)
445
0
    {
446
0
      conn = connectDatabase(cparams, progname, echo, false, true);
447
0
      init_slot(slots + i, conn);
448
0
    }
449
0
  }
450
451
  /*
452
   * Prepare all the connections to run the appropriate analyze stage, if
453
   * caller requested that mode.
454
   */
455
0
  if (stage != ANALYZE_NO_STAGE)
456
0
  {
457
0
    int     j;
458
459
    /* We already emitted the message above */
460
461
0
    for (j = 0; j < concurrentCons; j++)
462
0
      executeCommand((slots + j)->connection,
463
0
               stage_commands[stage], progname, echo);
464
0
  }
465
466
0
  cell = tables ? tables->head : NULL;
467
0
  do
468
0
  {
469
0
    const char *tabname = cell ? cell->val : NULL;
470
0
    ParallelSlot *free_slot;
471
472
0
    if (CancelRequested)
473
0
    {
474
0
      failed = true;
475
0
      goto finish;
476
0
    }
477
478
    /*
479
     * Get the connection slot to use.  If in parallel mode, here we wait
480
     * for one connection to become available if none already is.  In
481
     * non-parallel mode we simply use the only slot we have, which we
482
     * know to be free.
483
     */
484
0
    if (parallel)
485
0
    {
486
      /*
487
       * Get a free slot, waiting until one becomes free if none
488
       * currently is.
489
       */
490
0
      free_slot = GetIdleSlot(slots, concurrentCons, progname);
491
0
      if (!free_slot)
492
0
      {
493
0
        failed = true;
494
0
        goto finish;
495
0
      }
496
497
0
      free_slot->isFree = false;
498
0
    }
499
0
    else
500
0
      free_slot = slots;
501
502
    /*
503
     * Prepare the vacuum command.  Note that in some cases this requires
504
     * query execution, so be sure to use the free connection.
505
     */
506
0
    prepare_vacuum_command(&sql, free_slot->connection, vacopts, tabname,
507
0
                 tables == &dbtables, progname, echo);
508
509
    /*
510
     * Execute the vacuum.  If not in parallel mode, this terminates the
511
     * program in case of an error.  (The parallel case handles query
512
     * errors in ProcessQueryResult through GetIdleSlot.)
513
     */
514
0
    run_vacuum_command(free_slot->connection, sql.data,
515
0
               echo, tabname, progname, parallel);
516
517
0
    if (cell)
518
0
      cell = cell->next;
519
0
  } while (cell != NULL);
520
521
0
  if (parallel)
522
0
  {
523
0
    int     j;
524
525
    /* wait for all connections to finish */
526
0
    for (j = 0; j < concurrentCons; j++)
527
0
    {
528
0
      if (!GetQueryResult((slots + j)->connection, progname))
529
0
        goto finish;
530
0
    }
531
0
  }
532
533
0
finish:
534
0
  for (i = 0; i < concurrentCons; i++)
535
0
    DisconnectDatabase(slots + i);
536
0
  pfree(slots);
537
538
0
  termPQExpBuffer(&sql);
539
540
0
  if (failed)
541
0
    exit(1);
542
0
}
543
544
/*
545
 * Vacuum/analyze all connectable databases.
546
 *
547
 * In analyze-in-stages mode, we process all databases in one stage before
548
 * moving on to the next stage.  That ensure minimal stats are available
549
 * quickly everywhere before generating more detailed ones.
550
 */
551
static void
552
vacuum_all_databases(ConnParams *cparams,
553
           vacuumingOptions *vacopts,
554
           bool analyze_in_stages,
555
           int concurrentCons,
556
           const char *progname, bool echo, bool quiet)
557
0
{
558
0
  PGconn     *conn;
559
0
  PGresult   *result;
560
0
  int     stage;
561
0
  int     i;
562
563
0
  conn = connectMaintenanceDatabase(cparams, progname, echo);
564
0
  result = executeQuery(conn,
565
0
              "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
566
0
              progname, echo);
567
0
  PQfinish(conn);
568
569
0
  if (analyze_in_stages)
570
0
  {
571
    /*
572
     * When analyzing all databases in stages, we analyze them all in the
573
     * fastest stage first, so that initial statistics become available
574
     * for all of them as soon as possible.
575
     *
576
     * This means we establish several times as many connections, but
577
     * that's a secondary consideration.
578
     */
579
0
    for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
580
0
    {
581
0
      for (i = 0; i < PQntuples(result); i++)
582
0
      {
583
0
        cparams->override_dbname = PQgetvalue(result, i, 0);
584
585
0
        vacuum_one_database(cparams, vacopts,
586
0
                  stage,
587
0
                  NULL,
588
0
                  concurrentCons,
589
0
                  progname, echo, quiet);
590
0
      }
591
0
    }
592
0
  }
593
0
  else
594
0
  {
595
0
    for (i = 0; i < PQntuples(result); i++)
596
0
    {
597
0
      cparams->override_dbname = PQgetvalue(result, i, 0);
598
599
0
      vacuum_one_database(cparams, vacopts,
600
0
                ANALYZE_NO_STAGE,
601
0
                NULL,
602
0
                concurrentCons,
603
0
                progname, echo, quiet);
604
0
    }
605
0
  }
606
607
0
  PQclear(result);
608
0
}
609
610
/*
611
 * Construct a vacuum/analyze command to run based on the given options, in the
612
 * given string buffer, which may contain previous garbage.
613
 *
614
 * An optional table name can be passed; this must be already be properly
615
 * quoted.  The command is semicolon-terminated.
616
 */
617
static void
618
prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
619
             vacuumingOptions *vacopts, const char *table,
620
             bool table_pre_qualified,
621
             const char *progname, bool echo)
622
0
{
623
0
  resetPQExpBuffer(sql);
624
625
0
  if (vacopts->analyze_only)
626
0
  {
627
0
    appendPQExpBufferStr(sql, "ANALYZE");
628
0
    if (vacopts->verbose)
629
0
      appendPQExpBufferStr(sql, " VERBOSE");
630
0
  }
631
0
  else
632
0
  {
633
0
    appendPQExpBufferStr(sql, "VACUUM");
634
0
    if (PQserverVersion(conn) >= 90000)
635
0
    {
636
0
      const char *paren = " (";
637
0
      const char *comma = ", ";
638
0
      const char *sep = paren;
639
640
0
      if (vacopts->full)
641
0
      {
642
0
        appendPQExpBuffer(sql, "%sFULL", sep);
643
0
        sep = comma;
644
0
      }
645
0
      if (vacopts->freeze)
646
0
      {
647
0
        appendPQExpBuffer(sql, "%sFREEZE", sep);
648
0
        sep = comma;
649
0
      }
650
0
      if (vacopts->verbose)
651
0
      {
652
0
        appendPQExpBuffer(sql, "%sVERBOSE", sep);
653
0
        sep = comma;
654
0
      }
655
0
      if (vacopts->and_analyze)
656
0
      {
657
0
        appendPQExpBuffer(sql, "%sANALYZE", sep);
658
0
        sep = comma;
659
0
      }
660
0
      if (sep != paren)
661
0
        appendPQExpBufferChar(sql, ')');
662
0
    }
663
0
    else
664
0
    {
665
0
      if (vacopts->full)
666
0
        appendPQExpBufferStr(sql, " FULL");
667
0
      if (vacopts->freeze)
668
0
        appendPQExpBufferStr(sql, " FREEZE");
669
0
      if (vacopts->verbose)
670
0
        appendPQExpBufferStr(sql, " VERBOSE");
671
0
      if (vacopts->and_analyze)
672
0
        appendPQExpBufferStr(sql, " ANALYZE");
673
0
    }
674
0
  }
675
676
0
  if (table)
677
0
  {
678
0
    appendPQExpBufferChar(sql, ' ');
679
0
    if (table_pre_qualified)
680
0
      appendPQExpBufferStr(sql, table);
681
0
    else
682
0
      appendQualifiedRelation(sql, table, conn, progname, echo);
683
0
  }
684
0
  appendPQExpBufferChar(sql, ';');
685
0
}
686
687
/*
688
 * Send a vacuum/analyze command to the server.  In async mode, return after
689
 * sending the command; else, wait for it to finish.
690
 *
691
 * Any errors during command execution are reported to stderr.  If async is
692
 * false, this function exits the program after reporting the error.
693
 */
694
static void
695
run_vacuum_command(PGconn *conn, const char *sql, bool echo,
696
           const char *table, const char *progname, bool async)
697
0
{
698
0
  bool    status;
699
700
0
  if (async)
701
0
  {
702
0
    if (echo)
703
0
      printf("%s\n", sql);
704
705
0
    status = PQsendQuery(conn, sql) == 1;
706
0
  }
707
0
  else
708
0
    status = executeMaintenanceCommand(conn, sql, echo);
709
710
0
  if (!status)
711
0
  {
712
0
    if (table)
713
0
      fprintf(stderr,
714
0
          _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
715
0
          progname, table, PQdb(conn), PQerrorMessage(conn));
716
0
    else
717
0
      fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
718
0
          progname, PQdb(conn), PQerrorMessage(conn));
719
720
0
    if (!async)
721
0
    {
722
0
      PQfinish(conn);
723
0
      exit(1);
724
0
    }
725
0
  }
726
0
}
727
728
/*
729
 * GetIdleSlot
730
 *    Return a connection slot that is ready to execute a command.
731
 *
732
 * We return the first slot we find that is marked isFree, if one is;
733
 * otherwise, we loop on select() until one socket becomes available.  When
734
 * this happens, we read the whole set and mark as free all sockets that become
735
 * available.
736
 *
737
 * If an error occurs, NULL is returned.
738
 */
739
static ParallelSlot *
740
GetIdleSlot(ParallelSlot slots[], int numslots,
741
      const char *progname)
742
0
{
743
0
  int     i;
744
0
  int     firstFree = -1;
745
746
  /* Any connection already known free? */
747
0
  for (i = 0; i < numslots; i++)
748
0
  {
749
0
    if (slots[i].isFree)
750
0
      return slots + i;
751
0
  }
752
753
  /*
754
   * No free slot found, so wait until one of the connections has finished
755
   * its task and return the available slot.
756
   */
757
0
  while (firstFree < 0)
758
0
  {
759
0
    fd_set    slotset;
760
0
    int     maxFd = 0;
761
0
    bool    aborting;
762
763
    /* We must reconstruct the fd_set for each call to select_loop */
764
0
    FD_ZERO(&slotset);
765
766
0
    for (i = 0; i < numslots; i++)
767
0
    {
768
0
      int     sock = PQsocket(slots[i].connection);
769
770
      /*
771
       * We don't really expect any connections to lose their sockets
772
       * after startup, but just in case, cope by ignoring them.
773
       */
774
0
      if (sock < 0)
775
0
        continue;
776
777
0
      FD_SET(sock, &slotset);
778
0
      if (sock > maxFd)
779
0
        maxFd = sock;
780
0
    }
781
782
0
    SetCancelConn(slots->connection);
783
0
    i = select_loop(maxFd, &slotset, &aborting);
784
0
    ResetCancelConn();
785
786
0
    if (aborting)
787
0
    {
788
      /*
789
       * We set the cancel-receiving connection to the one in the zeroth
790
       * slot above, so fetch the error from there.
791
       */
792
0
      GetQueryResult(slots->connection, progname);
793
0
      return NULL;
794
0
    }
795
0
    Assert(i != 0);
796
797
0
    for (i = 0; i < numslots; i++)
798
0
    {
799
0
      int     sock = PQsocket(slots[i].connection);
800
801
0
      if (sock >= 0 && FD_ISSET(sock, &slotset))
802
0
      {
803
        /* select() says input is available, so consume it */
804
0
        PQconsumeInput(slots[i].connection);
805
0
      }
806
807
      /* Collect result(s) as long as any are available */
808
0
      while (!PQisBusy(slots[i].connection))
809
0
      {
810
0
        PGresult   *result = PQgetResult(slots[i].connection);
811
812
0
        if (result != NULL)
813
0
        {
814
          /* Check and discard the command result */
815
0
          if (!ProcessQueryResult(slots[i].connection, result,
816
0
                      progname))
817
0
            return NULL;
818
0
        }
819
0
        else
820
0
        {
821
          /* This connection has become idle */
822
0
          slots[i].isFree = true;
823
0
          if (firstFree < 0)
824
0
            firstFree = i;
825
0
          break;
826
0
        }
827
0
      }
828
0
    }
829
0
  }
830
831
0
  return slots + firstFree;
832
0
}
833
834
/*
835
 * ProcessQueryResult
836
 *
837
 * Process (and delete) a query result.  Returns true if there's no error,
838
 * false otherwise -- but errors about trying to vacuum a missing relation
839
 * are reported and subsequently ignored.
840
 */
841
static bool
842
ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
843
0
{
844
  /*
845
   * If it's an error, report it.  Errors about a missing table are harmless
846
   * so we continue processing; but die for other errors.
847
   */
848
0
  if (PQresultStatus(result) != PGRES_COMMAND_OK)
849
0
  {
850
0
    char     *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
851
852
0
    fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
853
0
        progname, PQdb(conn), PQerrorMessage(conn));
854
855
0
    if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
856
0
    {
857
0
      PQclear(result);
858
0
      return false;
859
0
    }
860
0
  }
861
862
0
  PQclear(result);
863
0
  return true;
864
0
}
865
866
/*
867
 * GetQueryResult
868
 *
869
 * Pump the conn till it's dry of results; return false if any are errors.
870
 * Note that this will block if the conn is busy.
871
 */
872
static bool
873
GetQueryResult(PGconn *conn, const char *progname)
874
0
{
875
0
  bool    ok = true;
876
0
  PGresult   *result;
877
878
0
  SetCancelConn(conn);
879
0
  while ((result = PQgetResult(conn)) != NULL)
880
0
  {
881
0
    if (!ProcessQueryResult(conn, result, progname))
882
0
      ok = false;
883
0
  }
884
0
  ResetCancelConn();
885
0
  return ok;
886
0
}
887
888
/*
889
 * DisconnectDatabase
890
 *    Disconnect the connection associated with the given slot
891
 */
892
static void
893
DisconnectDatabase(ParallelSlot *slot)
894
0
{
895
0
  char    errbuf[256];
896
897
0
  if (!slot->connection)
898
0
    return;
899
900
0
  if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
901
0
  {
902
0
    PGcancel   *cancel;
903
904
0
    if ((cancel = PQgetCancel(slot->connection)))
905
0
    {
906
0
      (void) PQcancel(cancel, errbuf, sizeof(errbuf));
907
0
      PQfreeCancel(cancel);
908
0
    }
909
0
  }
910
911
0
  PQfinish(slot->connection);
912
0
  slot->connection = NULL;
913
0
}
914
915
/*
916
 * Loop on select() until a descriptor from the given set becomes readable.
917
 *
918
 * If we get a cancel request while we're waiting, we forego all further
919
 * processing and set the *aborting flag to true.  The return value must be
920
 * ignored in this case.  Otherwise, *aborting is set to false.
921
 */
922
static int
923
select_loop(int maxFd, fd_set *workerset, bool *aborting)
924
0
{
925
0
  int     i;
926
0
  fd_set    saveSet = *workerset;
927
928
0
  if (CancelRequested)
929
0
  {
930
0
    *aborting = true;
931
0
    return -1;
932
0
  }
933
0
  else
934
0
    *aborting = false;
935
936
0
  for (;;)
937
0
  {
938
    /*
939
     * On Windows, we need to check once in a while for cancel requests;
940
     * on other platforms we rely on select() returning when interrupted.
941
     */
942
0
    struct timeval *tvp;
943
#ifdef WIN32
944
    struct timeval tv = {0, 1000000};
945
946
    tvp = &tv;
947
#else
948
0
    tvp = NULL;
949
0
#endif
950
951
0
    *workerset = saveSet;
952
0
    i = select(maxFd + 1, workerset, NULL, NULL, tvp);
953
954
#ifdef WIN32
955
    if (i == SOCKET_ERROR)
956
    {
957
      i = -1;
958
959
      if (WSAGetLastError() == WSAEINTR)
960
        errno = EINTR;
961
    }
962
#endif
963
964
0
    if (i < 0 && errno == EINTR)
965
0
      continue;     /* ignore this */
966
0
    if (i < 0 || CancelRequested)
967
0
      *aborting = true; /* but not this */
968
0
    if (i == 0)
969
0
      continue;     /* timeout (Win32 only) */
970
0
    break;
971
0
  }
972
973
0
  return i;
974
0
}
975
976
static void
977
init_slot(ParallelSlot *slot, PGconn *conn)
978
0
{
979
0
  slot->connection = conn;
980
  /* Initially assume connection is idle */
981
0
  slot->isFree = true;
982
0
}
983
984
static void
985
help(const char *progname)
986
0
{
987
0
  printf(_("%s cleans and analyzes a PostgreSQL database.\n\n"), progname);
988
0
  printf(_("Usage:\n"));
989
0
  printf(_("  %s [OPTION]... [DBNAME]\n"), progname);
990
0
  printf(_("\nOptions:\n"));
991
0
  printf(_("  -a, --all                       vacuum all databases\n"));
992
0
  printf(_("  -d, --dbname=DBNAME             database to vacuum\n"));
993
0
  printf(_("  -e, --echo                      show the commands being sent to the server\n"));
994
0
  printf(_("  -f, --full                      do full vacuuming\n"));
995
0
  printf(_("  -F, --freeze                    freeze row transaction information\n"));
996
0
  printf(_("  -j, --jobs=NUM                  use this many concurrent connections to vacuum\n"));
997
0
  printf(_("  -q, --quiet                     don't write any messages\n"));
998
0
  printf(_("  -t, --table='TABLE[(COLUMNS)]'  vacuum specific table(s) only\n"));
999
0
  printf(_("  -v, --verbose                   write a lot of output\n"));
1000
0
  printf(_("  -V, --version                   output version information, then exit\n"));
1001
0
  printf(_("  -z, --analyze                   update optimizer statistics\n"));
1002
0
  printf(_("  -Z, --analyze-only              only update optimizer statistics; no vacuum\n"));
1003
0
  printf(_("      --analyze-in-stages         only update optimizer statistics, in multiple\n"
1004
0
       "                                  stages for faster results; no vacuum\n"));
1005
0
  printf(_("  -?, --help                      show this help, then exit\n"));
1006
0
  printf(_("\nConnection options:\n"));
1007
0
  printf(_("  -h, --host=HOSTNAME       database server host or socket directory\n"));
1008
0
  printf(_("  -p, --port=PORT           database server port\n"));
1009
0
  printf(_("  -U, --username=USERNAME   user name to connect as\n"));
1010
0
  printf(_("  -w, --no-password         never prompt for password\n"));
1011
0
  printf(_("  -W, --password            force password prompt\n"));
1012
0
  printf(_("  --maintenance-db=DBNAME   alternate maintenance database\n"));
1013
0
  printf(_("\nRead the description of the SQL command VACUUM for details.\n"));
1014
0
  printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
1015
0
}