YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/postgres/src/bin/pg_rewind/libpq_fetch.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * libpq_fetch.c
4
 *    Functions for fetching files from a remote server.
5
 *
6
 * Copyright (c) 2013-2018, PostgreSQL Global Development Group
7
 *
8
 *-------------------------------------------------------------------------
9
 */
10
#include "postgres_fe.h"
11
12
#include <sys/stat.h>
13
#include <dirent.h>
14
#include <fcntl.h>
15
#include <unistd.h>
16
17
#include "pg_rewind.h"
18
#include "datapagemap.h"
19
#include "fetch.h"
20
#include "file_ops.h"
21
#include "filemap.h"
22
#include "logging.h"
23
24
#include "libpq-fe.h"
25
#include "catalog/pg_type_d.h"
26
#include "fe_utils/connect.h"
27
#include "port/pg_bswap.h"
28
29
static PGconn *conn = NULL;
30
31
/*
32
 * Files are fetched max CHUNKSIZE bytes at a time.
33
 *
34
 * (This only applies to files that are copied in whole, or for truncated
35
 * files where we copy the tail. Relation files, where we know the individual
36
 * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
37
 */
38
0
#define CHUNKSIZE 1000000
39
40
static void receiveFileChunks(const char *sql);
41
static void execute_pagemap(datapagemap_t *pagemap, const char *path);
42
static char *run_simple_query(const char *sql);
43
44
void
45
libpqConnect(const char *connstr)
46
0
{
47
0
  char     *str;
48
0
  PGresult   *res;
49
50
0
  conn = PQconnectdb(connstr);
51
0
  if (PQstatus(conn) == CONNECTION_BAD)
52
0
    pg_fatal("could not connect to server: %s",
53
0
         PQerrorMessage(conn));
54
55
0
  pg_log(PG_PROGRESS, "connected to server\n");
56
57
0
  res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
58
0
  if (PQresultStatus(res) != PGRES_TUPLES_OK)
59
0
    pg_fatal("could not clear search_path: %s",
60
0
         PQresultErrorMessage(res));
61
0
  PQclear(res);
62
63
  /*
64
   * Check that the server is not in hot standby mode. There is no
65
   * fundamental reason that couldn't be made to work, but it doesn't
66
   * currently because we use a temporary table. Better to check for it
67
   * explicitly than error out, for a better error message.
68
   */
69
0
  str = run_simple_query("SELECT pg_is_in_recovery()");
70
0
  if (strcmp(str, "f") != 0)
71
0
    pg_fatal("source server must not be in recovery mode\n");
72
0
  pg_free(str);
73
74
  /*
75
   * Also check that full_page_writes is enabled.  We can get torn pages if
76
   * a page is modified while we read it with pg_read_binary_file(), and we
77
   * rely on full page images to fix them.
78
   */
79
0
  str = run_simple_query("SHOW full_page_writes");
80
0
  if (strcmp(str, "on") != 0)
81
0
    pg_fatal("full_page_writes must be enabled in the source server\n");
82
0
  pg_free(str);
83
84
  /*
85
   * Although we don't do any "real" updates, we do work with a temporary
86
   * table. We don't care about synchronous commit for that. It doesn't
87
   * otherwise matter much, but if the server is using synchronous
88
   * replication, and replication isn't working for some reason, we don't
89
   * want to get stuck, waiting for it to start working again.
90
   */
91
0
  res = PQexec(conn, "SET synchronous_commit = off");
92
0
  if (PQresultStatus(res) != PGRES_COMMAND_OK)
93
0
    pg_fatal("could not set up connection context: %s",
94
0
         PQresultErrorMessage(res));
95
0
  PQclear(res);
96
0
}
97
98
/*
99
 * Runs a query that returns a single value.
100
 * The result should be pg_free'd after use.
101
 */
102
static char *
103
run_simple_query(const char *sql)
104
0
{
105
0
  PGresult   *res;
106
0
  char     *result;
107
108
0
  res = PQexec(conn, sql);
109
110
0
  if (PQresultStatus(res) != PGRES_TUPLES_OK)
111
0
    pg_fatal("error running query (%s) in source server: %s",
112
0
         sql, PQresultErrorMessage(res));
113
114
  /* sanity check the result set */
115
0
  if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
116
0
    pg_fatal("unexpected result set from query\n");
117
118
0
  result = pg_strdup(PQgetvalue(res, 0, 0));
119
120
0
  PQclear(res);
121
122
0
  return result;
123
0
}
124
125
/*
126
 * Calls pg_current_wal_insert_lsn() function
127
 */
128
XLogRecPtr
129
libpqGetCurrentXlogInsertLocation(void)
130
0
{
131
0
  XLogRecPtr  result;
132
0
  uint32    hi;
133
0
  uint32    lo;
134
0
  char     *val;
135
136
0
  val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
137
138
0
  if (sscanf(val, "%X/%X", &hi, &lo) != 2)
139
0
    pg_fatal("unrecognized result \"%s\" for current WAL insert location\n", val);
140
141
0
  result = ((uint64) hi) << 32 | lo;
142
143
0
  pg_free(val);
144
145
0
  return result;
146
0
}
147
148
/*
149
 * Get a list of all files in the data directory.
150
 */
151
void
152
libpqProcessFileList(void)
153
0
{
154
0
  PGresult   *res;
155
0
  const char *sql;
156
0
  int     i;
157
158
  /*
159
   * Create a recursive directory listing of the whole data directory.
160
   *
161
   * The WITH RECURSIVE part does most of the work. The second part gets the
162
   * targets of the symlinks in pg_tblspc directory.
163
   *
164
   * XXX: There is no backend function to get a symbolic link's target in
165
   * general, so if the admin has put any custom symbolic links in the data
166
   * directory, they won't be copied correctly.
167
   */
168
0
  sql =
169
0
    "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
170
0
    "  SELECT '' AS path, filename, size, isdir FROM\n"
171
0
    "  (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
172
0
    "        pg_stat_file(fn.filename, true) AS this\n"
173
0
    "  UNION ALL\n"
174
0
    "  SELECT parent.path || parent.filename || '/' AS path,\n"
175
0
    "         fn, this.size, this.isdir\n"
176
0
    "  FROM files AS parent,\n"
177
0
    "       pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
178
0
    "       pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
179
0
    "       WHERE parent.isdir = 't'\n"
180
0
    ")\n"
181
0
    "SELECT path || filename, size, isdir,\n"
182
0
    "       pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
183
0
    "FROM files\n"
184
0
    "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
185
0
    "                             AND oid::text = files.filename\n";
186
0
  res = PQexec(conn, sql);
187
188
0
  if (PQresultStatus(res) != PGRES_TUPLES_OK)
189
0
    pg_fatal("could not fetch file list: %s",
190
0
         PQresultErrorMessage(res));
191
192
  /* sanity check the result set */
193
0
  if (PQnfields(res) != 4)
194
0
    pg_fatal("unexpected result set while fetching file list\n");
195
196
  /* Read result to local variables */
197
0
  for (i = 0; i < PQntuples(res); i++)
198
0
  {
199
0
    char     *path = PQgetvalue(res, i, 0);
200
0
    int64   filesize = atol(PQgetvalue(res, i, 1));
201
0
    bool    isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
202
0
    char     *link_target = PQgetvalue(res, i, 3);
203
0
    file_type_t type;
204
205
0
    if (PQgetisnull(res, 0, 1))
206
0
    {
207
      /*
208
       * The file was removed from the server while the query was
209
       * running. Ignore it.
210
       */
211
0
      continue;
212
0
    }
213
214
0
    if (link_target[0])
215
0
      type = FILE_TYPE_SYMLINK;
216
0
    else if (isdir)
217
0
      type = FILE_TYPE_DIRECTORY;
218
0
    else
219
0
      type = FILE_TYPE_REGULAR;
220
221
0
    process_source_file(path, type, filesize, link_target);
222
0
  }
223
0
  PQclear(res);
224
0
}
225
226
/*----
227
 * Runs a query, which returns pieces of files from the remote source data
228
 * directory, and overwrites the corresponding parts of target files with
229
 * the received parts. The result set is expected to be of format:
230
 *
231
 * path   text  -- path in the data directory, e.g "base/1/123"
232
 * begin  int8  -- offset within the file
233
 * chunk  bytea -- file content
234
 *----
235
 */
236
static void
237
receiveFileChunks(const char *sql)
238
0
{
239
0
  PGresult   *res;
240
241
0
  if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
242
0
    pg_fatal("could not send query: %s", PQerrorMessage(conn));
243
244
0
  pg_log(PG_DEBUG, "getting file chunks\n");
245
246
0
  if (PQsetSingleRowMode(conn) != 1)
247
0
    pg_fatal("could not set libpq connection to single row mode\n");
248
249
0
  while ((res = PQgetResult(conn)) != NULL)
250
0
  {
251
0
    char     *filename;
252
0
    int     filenamelen;
253
0
    int64   chunkoff;
254
0
    char    chunkoff_str[32];
255
0
    int     chunksize;
256
0
    char     *chunk;
257
258
0
    switch (PQresultStatus(res))
259
0
    {
260
0
      case PGRES_SINGLE_TUPLE:
261
0
        break;
262
263
0
      case PGRES_TUPLES_OK:
264
0
        PQclear(res);
265
0
        continue;   /* final zero-row result */
266
267
0
      default:
268
0
        pg_fatal("unexpected result while fetching remote files: %s",
269
0
             PQresultErrorMessage(res));
270
0
    }
271
272
    /* sanity check the result set */
273
0
    if (PQnfields(res) != 3 || PQntuples(res) != 1)
274
0
      pg_fatal("unexpected result set size while fetching remote files\n");
275
276
0
    if (PQftype(res, 0) != TEXTOID ||
277
0
      PQftype(res, 1) != INT8OID ||
278
0
      PQftype(res, 2) != BYTEAOID)
279
0
    {
280
0
      pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u\n",
281
0
           PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
282
0
    }
283
284
0
    if (PQfformat(res, 0) != 1 &&
285
0
      PQfformat(res, 1) != 1 &&
286
0
      PQfformat(res, 2) != 1)
287
0
    {
288
0
      pg_fatal("unexpected result format while fetching remote files\n");
289
0
    }
290
291
0
    if (PQgetisnull(res, 0, 0) ||
292
0
      PQgetisnull(res, 0, 1))
293
0
    {
294
0
      pg_fatal("unexpected null values in result while fetching remote files\n");
295
0
    }
296
297
0
    if (PQgetlength(res, 0, 1) != sizeof(int64))
298
0
      pg_fatal("unexpected result length while fetching remote files\n");
299
300
    /* Read result set to local variables */
301
0
    memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
302
0
    chunkoff = pg_ntoh64(chunkoff);
303
0
    chunksize = PQgetlength(res, 0, 2);
304
305
0
    filenamelen = PQgetlength(res, 0, 0);
306
0
    filename = pg_malloc(filenamelen + 1);
307
0
    memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
308
0
    filename[filenamelen] = '\0';
309
310
0
    chunk = PQgetvalue(res, 0, 2);
311
312
    /*
313
     * If a file has been deleted on the source, remove it on the target
314
     * as well.  Note that multiple unlink() calls may happen on the same
315
     * file if multiple data chunks are associated with it, hence ignore
316
     * unconditionally anything missing.  If this file is not a relation
317
     * data file, then it has been already truncated when creating the
318
     * file chunk list at the previous execution of the filemap.
319
     */
320
0
    if (PQgetisnull(res, 0, 2))
321
0
    {
322
0
      pg_log(PG_DEBUG,
323
0
           "received null value for chunk for file \"%s\", file has been deleted\n",
324
0
           filename);
325
0
      remove_target_file(filename, true);
326
0
      pg_free(filename);
327
0
      PQclear(res);
328
0
      continue;
329
0
    }
330
331
    /*
332
     * Separate step to keep platform-dependent format code out of
333
     * translatable strings.
334
     */
335
0
    snprintf(chunkoff_str, sizeof(chunkoff_str), INT64_FORMAT, chunkoff);
336
0
    pg_log(PG_DEBUG, "received chunk for file \"%s\", offset %s, size %d\n",
337
0
         filename, chunkoff_str, chunksize);
338
339
0
    open_target_file(filename, false);
340
341
0
    write_target_range(chunk, chunkoff, chunksize);
342
343
0
    pg_free(filename);
344
345
0
    PQclear(res);
346
0
  }
347
0
}
348
349
/*
350
 * Receive a single file as a malloc'd buffer.
351
 */
352
char *
353
libpqGetFile(const char *filename, size_t *filesize)
354
0
{
355
0
  PGresult   *res;
356
0
  char     *result;
357
0
  int     len;
358
0
  const char *paramValues[1];
359
360
0
  paramValues[0] = filename;
361
0
  res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
362
0
             1, NULL, paramValues, NULL, NULL, 1);
363
364
0
  if (PQresultStatus(res) != PGRES_TUPLES_OK)
365
0
    pg_fatal("could not fetch remote file \"%s\": %s",
366
0
         filename, PQresultErrorMessage(res));
367
368
  /* sanity check the result set */
369
0
  if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
370
0
    pg_fatal("unexpected result set while fetching remote file \"%s\"\n",
371
0
         filename);
372
373
  /* Read result to local variables */
374
0
  len = PQgetlength(res, 0, 0);
375
0
  result = pg_malloc(len + 1);
376
0
  memcpy(result, PQgetvalue(res, 0, 0), len);
377
0
  result[len] = '\0';
378
379
0
  PQclear(res);
380
381
0
  pg_log(PG_DEBUG, "fetched file \"%s\", length %d\n", filename, len);
382
383
0
  if (filesize)
384
0
    *filesize = len;
385
0
  return result;
386
0
}
387
388
/*
389
 * Write a file range to a temporary table in the server.
390
 *
391
 * The range is sent to the server as a COPY formatted line, to be inserted
392
 * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
393
 * function to actually fetch the data.
394
 */
395
static void
396
fetch_file_range(const char *path, uint64 begin, uint64 end)
397
0
{
398
0
  char    linebuf[MAXPGPATH + 23];
399
400
  /* Split the range into CHUNKSIZE chunks */
401
0
  while (end - begin > 0)
402
0
  {
403
0
    unsigned int len;
404
405
    /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
406
0
    if (end - begin > CHUNKSIZE)
407
0
      len = CHUNKSIZE;
408
0
    else
409
0
      len = (unsigned int) (end - begin);
410
411
0
    snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
412
413
0
    if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
414
0
      pg_fatal("could not send COPY data: %s",
415
0
           PQerrorMessage(conn));
416
417
0
    begin += len;
418
0
  }
419
0
}
420
421
/*
422
 * Fetch all changed blocks from remote source data directory.
423
 */
424
void
425
libpq_executeFileMap(filemap_t *map)
426
0
{
427
0
  file_entry_t *entry;
428
0
  const char *sql;
429
0
  PGresult   *res;
430
0
  int     i;
431
432
  /*
433
   * First create a temporary table, and load it with the blocks that we
434
   * need to fetch.
435
   */
436
0
  sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
437
0
  res = PQexec(conn, sql);
438
439
0
  if (PQresultStatus(res) != PGRES_COMMAND_OK)
440
0
    pg_fatal("could not create temporary table: %s",
441
0
         PQresultErrorMessage(res));
442
0
  PQclear(res);
443
444
0
  sql = "COPY fetchchunks FROM STDIN";
445
0
  res = PQexec(conn, sql);
446
447
0
  if (PQresultStatus(res) != PGRES_COPY_IN)
448
0
    pg_fatal("could not send file list: %s",
449
0
         PQresultErrorMessage(res));
450
0
  PQclear(res);
451
452
0
  for (i = 0; i < map->narray; i++)
453
0
  {
454
0
    entry = map->array[i];
455
456
    /* If this is a relation file, copy the modified blocks */
457
0
    execute_pagemap(&entry->pagemap, entry->path);
458
459
0
    switch (entry->action)
460
0
    {
461
0
      case FILE_ACTION_NONE:
462
        /* nothing else to do */
463
0
        break;
464
465
0
      case FILE_ACTION_COPY:
466
        /* Truncate the old file out of the way, if any */
467
0
        open_target_file(entry->path, true);
468
0
        fetch_file_range(entry->path, 0, entry->newsize);
469
0
        break;
470
471
0
      case FILE_ACTION_TRUNCATE:
472
0
        truncate_target_file(entry->path, entry->newsize);
473
0
        break;
474
475
0
      case FILE_ACTION_COPY_TAIL:
476
0
        fetch_file_range(entry->path, entry->oldsize, entry->newsize);
477
0
        break;
478
479
0
      case FILE_ACTION_REMOVE:
480
0
        remove_target(entry);
481
0
        break;
482
483
0
      case FILE_ACTION_CREATE:
484
0
        create_target(entry);
485
0
        break;
486
0
    }
487
0
  }
488
489
0
  if (PQputCopyEnd(conn, NULL) != 1)
490
0
    pg_fatal("could not send end-of-COPY: %s",
491
0
         PQerrorMessage(conn));
492
493
0
  while ((res = PQgetResult(conn)) != NULL)
494
0
  {
495
0
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
496
0
      pg_fatal("unexpected result while sending file list: %s",
497
0
           PQresultErrorMessage(res));
498
0
    PQclear(res);
499
0
  }
500
501
  /*
502
   * We've now copied the list of file ranges that we need to fetch to the
503
   * temporary table. Now, actually fetch all of those ranges.
504
   */
505
0
  sql =
506
0
    "SELECT path, begin,\n"
507
0
    "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
508
0
    "FROM fetchchunks\n";
509
510
0
  receiveFileChunks(sql);
511
0
}
512
513
static void
514
execute_pagemap(datapagemap_t *pagemap, const char *path)
515
0
{
516
0
  datapagemap_iterator_t *iter;
517
0
  BlockNumber blkno;
518
0
  off_t   offset;
519
520
0
  iter = datapagemap_iterate(pagemap);
521
0
  while (datapagemap_next(iter, &blkno))
522
0
  {
523
0
    offset = blkno * BLCKSZ;
524
525
0
    fetch_file_range(path, offset, offset + BLCKSZ);
526
0
  }
527
0
  pg_free(iter);
528
0
}