YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/backend/commands/dbcommands.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * dbcommands.c
4
 *    Database management commands (create/drop database).
5
 *
6
 * Note: database creation/destruction commands use exclusive locks on
7
 * the database objects (as expressed by LockSharedObject()) to avoid
8
 * stepping on each others' toes.  Formerly we used table-level locks
9
 * on pg_database, but that's too coarse-grained.
10
 *
11
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
12
 * Portions Copyright (c) 1994, Regents of the University of California
13
 *
14
 *
15
 * IDENTIFICATION
16
 *    src/backend/commands/dbcommands.c
17
 *
18
 * The following only applies to changes made to this file as part of
19
 * YugaByte development.
20
 *
21
 * Portions Copyright (c) YugaByte, Inc.
22
 *
23
 * Licensed under the Apache License, Version 2.0 (the "License"); you
24
 * may not use this file except in compliance with the License.
25
 * You may obtain a copy of the License at
26
 *
27
 * http://www.apache.org/licenses/LICENSE-2.0
28
 *
29
 * Unless required by applicable law or agreed to in writing, software
30
 * distributed under the License is distributed on an "AS IS" BASIS,
31
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
32
 * implied.  See the License for the specific language governing
33
 * permissions and limitations under the License.
34
 *-------------------------------------------------------------------------
35
 */
36
#include "postgres.h"
37
38
#include <fcntl.h>
39
#include <unistd.h>
40
#include <sys/stat.h>
41
42
#include "access/genam.h"
43
#include "access/heapam.h"
44
#include "access/htup_details.h"
45
#include "access/xact.h"
46
#include "access/xloginsert.h"
47
#include "access/xlogutils.h"
48
#include "catalog/catalog.h"
49
#include "catalog/dependency.h"
50
#include "catalog/indexing.h"
51
#include "catalog/objectaccess.h"
52
#include "catalog/pg_authid.h"
53
#include "catalog/pg_database.h"
54
#include "catalog/pg_db_role_setting.h"
55
#include "catalog/pg_subscription.h"
56
#include "catalog/pg_tablespace.h"
57
#include "commands/comment.h"
58
#include "commands/dbcommands.h"
59
#include "commands/dbcommands_xlog.h"
60
#include "commands/defrem.h"
61
#include "commands/seclabel.h"
62
#include "commands/tablespace.h"
63
#include "mb/pg_wchar.h"
64
#include "miscadmin.h"
65
#include "pgstat.h"
66
#include "postmaster/bgwriter.h"
67
#include "replication/slot.h"
68
#include "storage/copydir.h"
69
#include "storage/fd.h"
70
#include "storage/lmgr.h"
71
#include "storage/ipc.h"
72
#include "storage/procarray.h"
73
#include "storage/smgr.h"
74
#include "utils/acl.h"
75
#include "utils/builtins.h"
76
#include "utils/fmgroids.h"
77
#include "utils/pg_locale.h"
78
#include "utils/snapmgr.h"
79
#include "utils/syscache.h"
80
#include "utils/tqual.h"
81
82
/*  YB includes. */
83
#include "commands/ybccmds.h"
84
#include "pg_yb_utils.h"
85
86
typedef struct
87
{
88
  Oid     src_dboid;    /* source (template) DB */
89
  Oid     dest_dboid;   /* DB we are trying to create */
90
} createdb_failure_params;
91
92
typedef struct
93
{
94
  Oid     dest_dboid;   /* DB we are trying to move */
95
  Oid     dest_tsoid;   /* tablespace we are trying to move to */
96
} movedb_failure_params;
97
98
/* non-export function prototypes */
99
static void createdb_failure_callback(int code, Datum arg);
100
static void movedb(const char *dbname, const char *tblspcname);
101
static void movedb_failure_callback(int code, Datum arg);
102
static bool get_db_info(const char *name, LOCKMODE lockmode,
103
      Oid *dbIdP, Oid *ownerIdP,
104
      int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
105
      Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
106
      MultiXactId *dbMinMultiP,
107
      Oid *dbTablespace, char **dbCollate, char **dbCtype);
108
static bool have_createdb_privilege(void);
109
static void remove_dbtablespaces(Oid db_id);
110
static bool check_db_file_conflict(Oid db_id);
111
static int  errdetail_busy_db(int notherbackends, int npreparedxacts);
112
113
/*
114
 * CREATE DATABASE
115
 */
116
Oid
117
createdb(ParseState *pstate, const CreatedbStmt *stmt)
118
27
{
119
27
  HeapScanDesc scan;
120
27
  Relation  rel;
121
27
  Oid     src_dboid;
122
27
  Oid     src_owner;
123
27
  int     src_encoding;
124
27
  char     *src_collate;
125
27
  char     *src_ctype;
126
27
  bool    src_istemplate;
127
27
  bool    src_allowconn;
128
27
  Oid     src_lastsysoid;
129
27
  TransactionId src_frozenxid;
130
27
  MultiXactId src_minmxid;
131
27
  Oid     src_deftablespace;
132
27
  volatile Oid dst_deftablespace;
133
27
  Relation  pg_database_rel;
134
27
  HeapTuple tuple;
135
27
  Datum   new_record[Natts_pg_database];
136
27
  bool    new_record_nulls[Natts_pg_database];
137
27
  Oid     dboid;
138
27
  Oid     datdba;
139
27
  ListCell   *option;
140
27
  DefElem    *dtablespacename = NULL;
141
27
  DefElem    *downer = NULL;
142
27
  DefElem    *dtemplate = NULL;
143
27
  DefElem    *dencoding = NULL;
144
27
  DefElem    *dcollate = NULL;
145
27
  DefElem    *dcolocated = NULL;
146
27
  DefElem    *dctype = NULL;
147
27
  DefElem    *distemplate = NULL;
148
27
  DefElem    *dallowconnections = NULL;
149
27
  DefElem    *dconnlimit = NULL;
150
27
  DefElem    **default_options[] = {&dtablespacename};
151
27
  char     *dbname = stmt->dbname;
152
27
  char     *dbowner = NULL;
153
27
  const char *dbtemplate = NULL;
154
27
  char     *dbcollate = NULL;
155
27
  char     *dbctype = NULL;
156
27
  char     *canonname;
157
27
  int     encoding = -1;
158
27
  bool    dbistemplate = false;
159
27
  bool    dballowconnections = true;
160
27
  bool    dbcolocated = false;
161
27
  int     dbconnlimit = -1;
162
27
  int     notherbackends;
163
27
  int     npreparedxacts;
164
27
  createdb_failure_params fparms;
165
166
  /*
167
   * We do insert into pg_database without explicit OID, which conflicts
168
   * with OID generation logic for YSQL upgrade.
169
   * This is mostly relevant as a sanity check for tests.
170
   */
171
27
  if (IsYsqlUpgrade)
172
0
    elog(ERROR, "CREATE DATABASE is disallowed in YSQL upgrade mode");
173
174
27
  if (dbname != NULL && (strcmp(dbname, "template0") == 0 ||
175
27
    strcmp(dbname, "template1") == 0))
176
0
  {
177
0
    YBSetPreparingTemplates();
178
0
  }
179
180
  /* Extract options from the statement node tree */
181
27
  foreach(option, stmt->options)
182
2
  {
183
2
    DefElem    *defel = (DefElem *) lfirst(option);
184
185
2
    if (strcmp(defel->defname, "tablespace") == 0)
186
0
    {
187
0
      if (dtablespacename)
188
0
        ereport(ERROR,
189
0
            (errcode(ERRCODE_SYNTAX_ERROR),
190
0
             errmsg("conflicting or redundant options"),
191
0
             parser_errposition(pstate, defel->location)));
192
0
      dtablespacename = defel;
193
0
    }
194
2
    else if (strcmp(defel->defname, "owner") == 0)
195
0
    {
196
0
      if (downer)
197
0
        ereport(ERROR,
198
0
            (errcode(ERRCODE_SYNTAX_ERROR),
199
0
             errmsg("conflicting or redundant options"),
200
0
             parser_errposition(pstate, defel->location)));
201
0
      downer = defel;
202
0
    }
203
2
    else if (strcmp(defel->defname, "template") == 0)
204
0
    {
205
0
      if (dtemplate)
206
0
        ereport(ERROR,
207
0
            (errcode(ERRCODE_SYNTAX_ERROR),
208
0
             errmsg("conflicting or redundant options"),
209
0
             parser_errposition(pstate, defel->location)));
210
0
      dtemplate = defel;
211
0
    }
212
2
    else if (strcmp(defel->defname, "encoding") == 0)
213
0
    {
214
0
      if (dencoding)
215
0
        ereport(ERROR,
216
0
            (errcode(ERRCODE_SYNTAX_ERROR),
217
0
             errmsg("conflicting or redundant options"),
218
0
             parser_errposition(pstate, defel->location)));
219
0
      dencoding = defel;
220
0
    }
221
2
    else if (strcmp(defel->defname, "lc_collate") == 0)
222
0
    {
223
0
      if (dcollate)
224
0
        ereport(ERROR,
225
0
            (errcode(ERRCODE_SYNTAX_ERROR),
226
0
             errmsg("conflicting or redundant options"),
227
0
             parser_errposition(pstate, defel->location)));
228
0
      dcollate = defel;
229
0
    }
230
2
    else if (strcmp(defel->defname, "lc_ctype") == 0)
231
0
    {
232
0
      if (dctype)
233
0
        ereport(ERROR,
234
0
            (errcode(ERRCODE_SYNTAX_ERROR),
235
0
             errmsg("conflicting or redundant options"),
236
0
             parser_errposition(pstate, defel->location)));
237
0
      dctype = defel;
238
0
    }
239
2
    else if (strcmp(defel->defname, "is_template") == 0)
240
0
    {
241
0
      if (distemplate)
242
0
        ereport(ERROR,
243
0
            (errcode(ERRCODE_SYNTAX_ERROR),
244
0
             errmsg("conflicting or redundant options"),
245
0
             parser_errposition(pstate, defel->location)));
246
0
      distemplate = defel;
247
0
    }
248
2
    else if (strcmp(defel->defname, "allow_connections") == 0)
249
0
    {
250
0
      if (dallowconnections)
251
0
        ereport(ERROR,
252
0
            (errcode(ERRCODE_SYNTAX_ERROR),
253
0
             errmsg("conflicting or redundant options"),
254
0
             parser_errposition(pstate, defel->location)));
255
0
      dallowconnections = defel;
256
0
    }
257
2
    else if (strcmp(defel->defname, "connection_limit") == 0)
258
0
    {
259
0
      if (dconnlimit)
260
0
        ereport(ERROR,
261
0
            (errcode(ERRCODE_SYNTAX_ERROR),
262
0
             errmsg("conflicting or redundant options"),
263
0
             parser_errposition(pstate, defel->location)));
264
0
      dconnlimit = defel;
265
0
    }
266
2
    else if (strcmp(defel->defname, "location") == 0)
267
0
    {
268
0
      ereport(WARNING,
269
0
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
270
0
           errmsg("LOCATION is not supported anymore"),
271
0
           errhint("Consider using tablespaces instead."),
272
0
           parser_errposition(pstate, defel->location)));
273
0
    }
274
2
    else if (strcmp(defel->defname, "colocated") == 0)
275
2
    {
276
2
      if (dcolocated)
277
2
        ereport(ERROR,
278
2
            (errcode(ERRCODE_SYNTAX_ERROR),
279
2
             errmsg("conflicting or redundant options"),
280
2
             parser_errposition(pstate, defel->location)));
281
2
      dcolocated = defel;
282
2
    }
283
2
    else
284
0
      ereport(ERROR,
285
2
          (errcode(ERRCODE_SYNTAX_ERROR),
286
2
           errmsg("option \"%s\" not recognized", defel->defname),
287
2
           parser_errposition(pstate, defel->location)));
288
2
  }
289
290
27
  if (downer && downer->arg)
291
0
    dbowner = defGetString(downer);
292
27
  if (dtemplate && dtemplate->arg)
293
0
    dbtemplate = defGetString(dtemplate);
294
27
  if (dencoding && dencoding->arg)
295
0
  {
296
0
    const char *encoding_name;
297
298
0
    if (IsA(dencoding->arg, Integer))
299
0
    {
300
0
      encoding = defGetInt32(dencoding);
301
0
      encoding_name = pg_encoding_to_char(encoding);
302
0
      if (strcmp(encoding_name, "") == 0 ||
303
0
        pg_valid_server_encoding(encoding_name) < 0)
304
0
        ereport(ERROR,
305
0
            (errcode(ERRCODE_UNDEFINED_OBJECT),
306
0
             errmsg("%d is not a valid encoding code",
307
0
                encoding),
308
0
             parser_errposition(pstate, dencoding->location)));
309
0
    }
310
0
    else
311
0
    {
312
0
      encoding_name = defGetString(dencoding);
313
0
      encoding = pg_valid_server_encoding(encoding_name);
314
0
      if (encoding < 0)
315
0
        ereport(ERROR,
316
0
            (errcode(ERRCODE_UNDEFINED_OBJECT),
317
0
             errmsg("%s is not a valid encoding name",
318
0
                encoding_name),
319
0
             parser_errposition(pstate, dencoding->location)));
320
0
    }
321
0
  }
322
27
  if (dcollate && dcollate->arg)
323
0
    dbcollate = defGetString(dcollate);
324
27
  if (dctype && dctype->arg)
325
0
    dbctype = defGetString(dctype);
326
27
  if (distemplate && distemplate->arg)
327
0
    dbistemplate = defGetBoolean(distemplate);
328
27
  if (dallowconnections && dallowconnections->arg)
329
0
    dballowconnections = defGetBoolean(dallowconnections);
330
27
  if (dconnlimit && dconnlimit->arg)
331
0
  {
332
0
    dbconnlimit = defGetInt32(dconnlimit);
333
0
    if (dbconnlimit < -1)
334
0
      ereport(ERROR,
335
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
336
0
           errmsg("invalid connection limit: %d", dbconnlimit)));
337
0
  }
338
27
  if (dcolocated && dcolocated->arg)
339
2
    dbcolocated = defGetBoolean(dcolocated);
340
341
  /* obtain OID of proposed owner */
342
27
  if (dbowner)
343
0
    datdba = get_role_oid(dbowner, false);
344
27
  else
345
27
    datdba = GetUserId();
346
347
  /*
348
   * To create a database, must have createdb privilege and must be able to
349
   * become the target role (this does not imply that the target role itself
350
   * must have createdb privilege).  The latter provision guards against
351
   * "giveaway" attacks.  Note that a superuser will always have both of
352
   * these privileges a fortiori.
353
   */
354
27
  if (!have_createdb_privilege())
355
27
    ereport(ERROR,
356
27
        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
357
27
         errmsg("permission denied to create database")));
358
359
27
  check_is_member_of_role(GetUserId(), datdba);
360
361
  /*
362
   * Lookup database (template) to be cloned, and obtain share lock on it.
363
   * ShareLock allows two CREATE DATABASEs to work from the same template
364
   * concurrently, while ensuring no one is busy dropping it in parallel
365
   * (which would be Very Bad since we'd likely get an incomplete copy
366
   * without knowing it).  This also prevents any new connections from being
367
   * made to the source until we finish copying it, so we can be sure it
368
   * won't change underneath us.
369
   */
370
27
  if (!dbtemplate)
371
22
    dbtemplate = "template1"; /* Default template database name */
372
373
  /* Check YB options support */
374
27
  if (YBIsUsingYBParser())
375
22
  {
376
44
    for (int i = lengthof(default_options); i > 0; --i)
377
22
    {
378
22
      DefElem *option = *default_options[i - 1];
379
22
      if (option != NULL && option->arg != NULL)
380
22
        ereport(YBUnsupportedFeatureSignalLevel(),
381
22
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
382
22
             errmsg("Value other than default for %s option is "
383
22
                "not yet supported", option->defname),
384
22
             errhint("Please report the issue on "
385
22
                 "https://github.com/YugaByte/yugabyte-db"
386
22
                 "/issues"),
387
22
             parser_errposition(pstate, option->location)));
388
22
    }
389
390
22
    if (strcmp(dbtemplate, "template0") != 0 &&
391
22
      strcmp(dbtemplate, "template1") != 0)
392
22
      ereport(YBUnsupportedFeatureSignalLevel(),
393
22
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
394
22
           errmsg("Value other than default, template0 or template1 "
395
22
              "for template option is not yet supported"),
396
22
           errhint("Please report the issue on "
397
22
               "https://github.com/YugaByte/yugabyte-db/issues"),
398
22
           parser_errposition(pstate, dtemplate->location)));
399
400
22
    if (dbistemplate)
401
22
      ereport(YBUnsupportedFeatureSignalLevel(),
402
22
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
403
22
           errmsg("Value other than default or false for "
404
22
              "is_template option is not yet supported"),
405
22
           errhint("Please report the issue on "
406
22
               "https://github.com/YugaByte/yugabyte-db/issues"),
407
22
           parser_errposition(pstate, distemplate->location)));
408
409
22
    if (encoding >= 0 && encoding != PG_UTF8)
410
22
      ereport(YBUnsupportedFeatureSignalLevel(),
411
22
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
412
22
           errmsg("Value other than unicode or utf8 for encoding "
413
22
              "option is not yet supported"),
414
22
           errhint("Please report the issue on "
415
22
               "https://github.com/yugabyte/yugabyte-db/issues"),
416
22
           parser_errposition(pstate, dencoding->location)));
417
418
22
    if (!(YBIsCollationEnabled() && kTestOnlyUseOSDefaultCollation) && dcollate &&
419
0
      dbcollate && strcmp(dbcollate, "C") != 0)
420
22
      ereport(YBUnsupportedFeatureSignalLevel(),
421
22
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
422
22
           errmsg("Value other than 'C' for lc_collate "
423
22
              "option is not yet supported"),
424
22
           errhint("Please report the issue on "
425
22
               "https://github.com/YugaByte/yugabyte-db/issues"),
426
22
           parser_errposition(pstate, dcollate->location)));
427
428
22
    if (dctype && dbctype && strcmp(dbctype, "en_US.UTF-8") != 0)
429
22
      ereport(YBUnsupportedFeatureSignalLevel(),
430
22
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
431
22
           errmsg("Value other than 'en_US.UTF-8' for lc_ctype "
432
22
              "option is not yet supported"),
433
22
           errhint("Please report the issue on "
434
22
               "https://github.com/YugaByte/yugabyte-db/issues"),
435
22
           parser_errposition(pstate, dctype->location)));
436
22
  }
437
438
27
  if (!get_db_info(dbtemplate, ShareLock,
439
27
           &src_dboid, &src_owner, &src_encoding,
440
27
           &src_istemplate, &src_allowconn, &src_lastsysoid,
441
27
           &src_frozenxid, &src_minmxid, &src_deftablespace,
442
27
           &src_collate, &src_ctype))
443
27
    ereport(ERROR,
444
27
        (errcode(ERRCODE_UNDEFINED_DATABASE),
445
27
         errmsg("template database \"%s\" does not exist",
446
27
            dbtemplate)));
447
448
  /*
449
   * Permission check: to copy a DB that's not marked datistemplate, you
450
   * must be superuser or the owner thereof.
451
   */
452
27
  if (!src_istemplate)
453
0
  {
454
0
    if (!pg_database_ownercheck(src_dboid, GetUserId()))
455
0
      ereport(ERROR,
456
0
          (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
457
0
           errmsg("permission denied to copy database \"%s\"",
458
0
              dbtemplate)));
459
0
  }
460
461
  /* If encoding or locales are defaulted, use source's setting */
462
27
  if (encoding < 0)
463
22
    encoding = src_encoding;
464
27
  if (dbcollate == NULL)
465
22
    dbcollate = src_collate;
466
27
  if (dbctype == NULL)
467
22
    dbctype = src_ctype;
468
469
  /* Some encodings are client only */
470
27
  if (!PG_VALID_BE_ENCODING(encoding))
471
27
    ereport(ERROR,
472
27
        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
473
27
         errmsg("invalid server encoding %d", encoding)));
474
475
  /* Check that the chosen locales are valid, and get canonical spellings */
476
27
  if (!check_locale(LC_COLLATE, dbcollate, &canonname))
477
27
    ereport(ERROR,
478
27
        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
479
27
         errmsg("invalid locale name: \"%s\"", dbcollate)));
480
27
  dbcollate = canonname;
481
27
  if (!check_locale(LC_CTYPE, dbctype, &canonname))
482
27
    ereport(ERROR,
483
27
        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
484
27
         errmsg("invalid locale name: \"%s\"", dbctype)));
485
27
  dbctype = canonname;
486
487
27
  check_encoding_locale_matches(encoding, dbcollate, dbctype);
488
489
  /*
490
   * Check that the new encoding and locale settings match the source
491
   * database.  We insist on this because we simply copy the source data ---
492
   * any non-ASCII data would be wrongly encoded, and any indexes sorted
493
   * according to the source locale would be wrong.
494
   *
495
   * However, we assume that template0 doesn't contain any non-ASCII data
496
   * nor any indexes that depend on collation or ctype, so template0 can be
497
   * used as template for creating a database with any encoding or locale.
498
   */
499
27
  if (strcmp(dbtemplate, "template0") != 0)
500
22
  {
501
22
    if (encoding != src_encoding)
502
22
      ereport(ERROR,
503
22
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
504
22
           errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
505
22
              pg_encoding_to_char(encoding),
506
22
              pg_encoding_to_char(src_encoding)),
507
22
           errhint("Use the same encoding as in the template database, or use template0 as template.")));
508
509
22
    if (strcmp(dbcollate, src_collate) != 0)
510
22
      ereport(ERROR,
511
22
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
512
22
           errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
513
22
              dbcollate, src_collate),
514
22
           errhint("Use the same collation as in the template database, or use template0 as template.")));
515
516
22
    if (strcmp(dbctype, src_ctype) != 0)
517
22
      ereport(ERROR,
518
22
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
519
22
           errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
520
22
              dbctype, src_ctype),
521
22
           errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
522
22
  }
523
524
  /* Resolve default tablespace for new database */
525
27
  if (dtablespacename && dtablespacename->arg)
526
0
  {
527
0
    char     *tablespacename;
528
0
    AclResult aclresult;
529
530
0
    tablespacename = defGetString(dtablespacename);
531
0
    dst_deftablespace = get_tablespace_oid(tablespacename, false);
532
    /* check permissions */
533
0
    aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
534
0
                       ACL_CREATE);
535
0
    if (aclresult != ACLCHECK_OK)
536
0
      aclcheck_error(aclresult, OBJECT_TABLESPACE,
537
0
               tablespacename);
538
539
    /* pg_global must never be the default tablespace */
540
0
    if (dst_deftablespace == GLOBALTABLESPACE_OID)
541
0
      ereport(ERROR,
542
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
543
0
           errmsg("pg_global cannot be used as default tablespace")));
544
545
    /*
546
     * If we are trying to change the default tablespace of the template,
547
     * we require that the template not have any files in the new default
548
     * tablespace.  This is necessary because otherwise the copied
549
     * database would contain pg_class rows that refer to its default
550
     * tablespace both explicitly (by OID) and implicitly (as zero), which
551
     * would cause problems.  For example another CREATE DATABASE using
552
     * the copied database as template, and trying to change its default
553
     * tablespace again, would yield outright incorrect results (it would
554
     * improperly move tables to the new default tablespace that should
555
     * stay in the same tablespace).
556
     */
557
0
    if (dst_deftablespace != src_deftablespace)
558
0
    {
559
0
      char     *srcpath;
560
0
      struct stat st;
561
562
0
      srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
563
564
0
      if (stat(srcpath, &st) == 0 &&
565
0
        S_ISDIR(st.st_mode) &&
566
0
        !directory_is_empty(srcpath))
567
0
        ereport(ERROR,
568
0
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
569
0
             errmsg("cannot assign new default tablespace \"%s\"",
570
0
                tablespacename),
571
0
             errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
572
0
                   dbtemplate)));
573
0
      pfree(srcpath);
574
0
    }
575
0
  }
576
27
  else
577
27
  {
578
    /* Use template database's default tablespace */
579
27
    dst_deftablespace = src_deftablespace;
580
    /* Note there is no additional permission check in this path */
581
27
  }
582
583
  /*
584
   * Check for db name conflict.  This is just to give a more friendly error
585
   * message than "unique index violation".  There's a race condition but
586
   * we're willing to accept the less friendly message in that case.
587
   */
588
27
  if (OidIsValid(get_database_oid(dbname, true)))
589
27
    ereport(ERROR,
590
27
        (errcode(ERRCODE_DUPLICATE_DATABASE),
591
27
         errmsg("database \"%s\" already exists", dbname)));
592
593
  /*
594
   * The source DB can't have any active backends, except this one
595
   * (exception is to allow CREATE DB while connected to template1).
596
   * Otherwise we might copy inconsistent data.
597
   *
598
   * This should be last among the basic error checks, because it involves
599
   * potential waiting; we may as well throw an error first if we're gonna
600
   * throw one.
601
   */
602
27
  if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
603
27
    ereport(ERROR,
604
27
        (errcode(ERRCODE_OBJECT_IN_USE),
605
27
         errmsg("source database \"%s\" is being accessed by other users",
606
27
            dbtemplate),
607
27
         errdetail_busy_db(notherbackends, npreparedxacts)));
608
609
  /*
610
   * Select an OID for the new database, checking that it doesn't have a
611
   * filename conflict with anything already existing in the tablespace
612
   * directories.
613
   */
614
27
  pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
615
616
27
  do
617
27
  {
618
27
    dboid = GetNewOid(pg_database_rel);
619
27
  } while (check_db_file_conflict(dboid));
620
621
  /*
622
   * Insert a new tuple into pg_database.  This establishes our ownership of
623
   * the new database name (anyone else trying to insert the same name will
624
   * block on the unique index, and fail after we commit).
625
   */
626
627
  /* Form tuple */
628
27
  MemSet(new_record, 0, sizeof(new_record));
629
27
  MemSet(new_record_nulls, false, sizeof(new_record_nulls));
630
631
27
  new_record[Anum_pg_database_datname - 1] =
632
27
    DirectFunctionCall1(namein, CStringGetDatum(dbname));
633
27
  new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
634
27
  new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
635
27
  new_record[Anum_pg_database_datcollate - 1] =
636
27
    DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
637
27
  new_record[Anum_pg_database_datctype - 1] =
638
27
    DirectFunctionCall1(namein, CStringGetDatum(dbctype));
639
27
  new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
640
27
  new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
641
27
  new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
642
27
  new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
643
27
  new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
644
27
  new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
645
27
  new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
646
647
27
  if (IsYugaByteEnabled())
648
22
    YBCCreateDatabase(dboid, dbname, src_dboid, InvalidOid, dbcolocated);
649
650
  /*
651
   * We deliberately set datacl to default (NULL), rather than copying it
652
   * from the template database.  Copying it would be a bad idea when the
653
   * owner is not the same as the template's owner.
654
   */
655
27
  new_record_nulls[Anum_pg_database_datacl - 1] = true;
656
657
27
  tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
658
27
              new_record, new_record_nulls);
659
660
27
  HeapTupleSetOid(tuple, dboid);
661
662
27
  CatalogTupleInsert(pg_database_rel, tuple);
663
664
  /*
665
   * Now generate additional catalog entries associated with the new DB
666
   */
667
668
  /* Register owner dependency */
669
27
  recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
670
671
  /*
672
   * Register tablespace dependency to prevent dropping database default
673
   * tablespace.
674
   */
675
27
  recordDependencyOnTablespace(DatabaseRelationId, dboid, dst_deftablespace);
676
677
  /* Create pg_shdepend entries for objects within database */
678
27
  copyTemplateDependencies(src_dboid, dboid);
679
680
  /* Post creation hook for new database */
681
27
  InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
682
683
  /*
684
   * Force a checkpoint before starting the copy. This will force all dirty
685
   * buffers, including those of unlogged tables, out to disk, to ensure
686
   * source database is up-to-date on disk for the copy.
687
   * FlushDatabaseBuffers() would suffice for that, but we also want to
688
   * process any pending unlink requests. Otherwise, if a checkpoint
689
   * happened while we're copying files, a file might be deleted just when
690
   * we're about to copy it, causing the lstat() call in copydir() to fail
691
   * with ENOENT.
692
   */
693
27
  RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
694
27
            | CHECKPOINT_FLUSH_ALL);
695
696
  /*
697
   * Once we start copying subdirectories, we need to be able to clean 'em
698
   * up if we fail.  Use an ENSURE block to make sure this happens.  (This
699
   * is not a 100% solution, because of the possibility of failure during
700
   * transaction commit after we leave this routine, but it should handle
701
   * most scenarios.)
702
   */
703
27
  fparms.src_dboid = src_dboid;
704
27
  fparms.dest_dboid = dboid;
705
75
  PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
706
75
                          PointerGetDatum(&fparms));
707
75
  {
708
21
    if (!IsYugaByteEnabled())
709
0
    {
710
      /*
711
       * Iterate through all tablespaces of the template database, and copy
712
       * each one to the new database.
713
       */
714
0
      rel = heap_open(TableSpaceRelationId, AccessShareLock);
715
0
      scan = heap_beginscan_catalog(rel, 0, NULL);
716
0
      while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
717
0
      {
718
719
0
        Oid     srctablespace = HeapTupleGetOid(tuple);
720
0
        Oid     dsttablespace;
721
0
        char     *srcpath;
722
0
        char     *dstpath;
723
0
        struct stat st;
724
725
        /* No need to copy global tablespace */
726
0
        if (srctablespace == GLOBALTABLESPACE_OID)
727
0
          continue;
728
729
0
        srcpath = GetDatabasePath(src_dboid, srctablespace);
730
731
0
        if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
732
0
            directory_is_empty(srcpath))
733
0
        {
734
          /* Assume we can ignore it */
735
0
          pfree(srcpath);
736
0
          continue;
737
0
        }
738
739
0
        if (srctablespace == src_deftablespace)
740
0
          dsttablespace = dst_deftablespace;
741
0
        else
742
0
          dsttablespace = srctablespace;
743
744
0
        dstpath = GetDatabasePath(dboid, dsttablespace);
745
746
        /*
747
         * Copy this subdirectory to the new location
748
         *
749
         * We don't need to copy subdirectories
750
         */
751
0
        copydir(srcpath, dstpath, false);
752
753
        /* Record the filesystem change in XLOG */
754
0
        {
755
0
          xl_dbase_create_rec xlrec;
756
757
0
          xlrec.db_id = dboid;
758
0
          xlrec.tablespace_id = dsttablespace;
759
0
          xlrec.src_db_id = src_dboid;
760
0
          xlrec.src_tablespace_id = srctablespace;
761
762
0
          XLogBeginInsert();
763
0
          XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
764
765
0
          (void) XLogInsert(RM_DBASE_ID,
766
0
                            XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
767
0
        }
768
0
      }
769
0
      heap_endscan(scan);
770
0
      heap_close(rel, AccessShareLock);
771
0
    }
772
773
    /*
774
     * We force a checkpoint before committing.  This effectively means
775
     * that committed XLOG_DBASE_CREATE operations will never need to be
776
     * replayed (at least not in ordinary crash recovery; we still have to
777
     * make the XLOG entry for the benefit of PITR operations). This
778
     * avoids two nasty scenarios:
779
     *
780
     * #1: When PITR is off, we don't XLOG the contents of newly created
781
     * indexes; therefore the drop-and-recreate-whole-directory behavior
782
     * of DBASE_CREATE replay would lose such indexes.
783
     *
784
     * #2: Since we have to recopy the source database during DBASE_CREATE
785
     * replay, we run the risk of copying changes in it that were
786
     * committed after the original CREATE DATABASE command but before the
787
     * system crash that led to the replay.  This is at least unexpected
788
     * and at worst could lead to inconsistencies, eg duplicate table
789
     * names.
790
     *
791
     * (Both of these were real bugs in releases 8.0 through 8.0.3.)
792
     *
793
     * In PITR replay, the first of these isn't an issue, and the second
794
     * is only a risk if the CREATE DATABASE and subsequent template
795
     * database change both occur while a base backup is being taken.
796
     * There doesn't seem to be much we can do about that except document
797
     * it as a limitation.
798
     *
799
     * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
800
     * we can avoid this.
801
     */
802
21
    RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
803
804
    /*
805
     * Close pg_database, but keep lock till commit.
806
     */
807
21
    heap_close(pg_database_rel, NoLock);
808
809
    /*
810
     * Force synchronous commit, thus minimizing the window between
811
     * creation of the database files and committal of the transaction. If
812
     * we crash before committing, we'll have a DB that's taking up disk
813
     * space but is not in pg_database, which is not good.
814
     */
815
75
    ForceSyncCommit();
816
75
  }
817
818
21
  PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
819
27
                              PointerGetDatum(&fparms));
820
21
  return dboid;
821
27
}
822
823
/*
824
 * Check whether chosen encoding matches chosen locale settings.  This
825
 * restriction is necessary because libc's locale-specific code usually
826
 * fails when presented with data in an encoding it's not expecting. We
827
 * allow mismatch in four cases:
828
 *
829
 * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
830
 * which works with any encoding.
831
 *
832
 * 2. locale encoding = -1, which means that we couldn't determine the
833
 * locale's encoding and have to trust the user to get it right.
834
 *
835
 * 3. selected encoding is UTF8 and platform is win32. This is because
836
 * UTF8 is a pseudo codepage that is supported in all locales since it's
837
 * converted to UTF16 before being used.
838
 *
839
 * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
840
 * is risky but we have historically allowed it --- notably, the
841
 * regression tests require it.
842
 *
843
 * Note: if you change this policy, fix initdb to match.
844
 */
845
void
846
check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
847
22
{
848
22
  int     ctype_encoding = pg_get_encoding_from_locale(ctype, true);
849
22
  int     collate_encoding = pg_get_encoding_from_locale(collate, true);
850
851
22
  if (!(ctype_encoding == encoding ||
852
0
      ctype_encoding == PG_SQL_ASCII ||
853
0
      ctype_encoding == -1 ||
854
#ifdef WIN32
855
      encoding == PG_UTF8 ||
856
#endif
857
0
      (encoding == PG_SQL_ASCII && superuser())))
858
22
    ereport(ERROR,
859
22
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
860
22
         errmsg("encoding \"%s\" does not match locale \"%s\"",
861
22
            pg_encoding_to_char(encoding),
862
22
            ctype),
863
22
         errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
864
22
               pg_encoding_to_char(ctype_encoding))));
865
866
22
  if (!(collate_encoding == encoding ||
867
22
      collate_encoding == PG_SQL_ASCII ||
868
0
      collate_encoding == -1 ||
869
#ifdef WIN32
870
      encoding == PG_UTF8 ||
871
#endif
872
0
      (encoding == PG_SQL_ASCII && superuser())))
873
22
    ereport(ERROR,
874
22
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
875
22
         errmsg("encoding \"%s\" does not match locale \"%s\"",
876
22
            pg_encoding_to_char(encoding),
877
22
            collate),
878
22
         errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
879
22
               pg_encoding_to_char(collate_encoding))));
880
22
}
881
882
/* Error cleanup callback for createdb */
883
static void
884
createdb_failure_callback(int code, Datum arg)
885
0
{
886
0
  createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
887
888
  /*
889
   * Release lock on source database before doing recursive remove. This is
890
   * not essential but it seems desirable to release the lock as soon as
891
   * possible.
892
   */
893
0
  UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
894
895
  /* Throw away any successfully copied subdirectories */
896
0
  remove_dbtablespaces(fparms->dest_dboid);
897
0
}
898
899
900
/*
901
 * DROP DATABASE
902
 */
903
void
904
dropdb(const char *dbname, bool missing_ok, bool force)
905
25
{
906
25
  Oid     db_id;
907
25
  bool    db_istemplate;
908
25
  Relation  pgdbrel;
909
25
  HeapTuple tup;
910
25
  int     notherbackends;
911
25
  int     npreparedxacts;
912
25
  int     nslots,
913
25
        nslots_active;
914
25
  int     nsubscriptions;
915
916
  /*
917
   * Look up the target database's OID, and get exclusive lock on it. We
918
   * need this to ensure that no new backend starts up in the target
919
   * database while we are deleting it (see postinit.c), and that no one is
920
   * using it as a CREATE DATABASE template or trying to delete it for
921
   * themselves.
922
   */
923
25
  pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
924
925
25
  if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
926
25
           &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
927
2
  {
928
2
    if (!missing_ok)
929
1
    {
930
1
      ereport(ERROR,
931
1
          (errcode(ERRCODE_UNDEFINED_DATABASE),
932
1
           errmsg("database \"%s\" does not exist", dbname)));
933
1
    }
934
1
    else
935
1
    {
936
      /* Close pg_database, release the lock, since we changed nothing */
937
1
      heap_close(pgdbrel, RowExclusiveLock);
938
1
      ereport(NOTICE,
939
1
          (errmsg("database \"%s\" does not exist, skipping",
940
1
              dbname)));
941
1
      return;
942
24
    }
943
2
  }
944
945
  /*
946
   * Permission checks
947
   */
948
24
  if (!pg_database_ownercheck(db_id, GetUserId()))
949
2
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
950
2
             dbname);
951
952
  /* DROP hook for the database being removed */
953
24
  InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
954
955
  /*
956
   * Disallow dropping a DB that is marked istemplate.  This is just to
957
   * prevent people from accidentally dropping template0 or template1; they
958
   * can do so if they're really determined ...
959
   */
960
24
  if (db_istemplate)
961
24
    ereport(ERROR,
962
24
        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
963
24
         errmsg("cannot drop a template database")));
964
965
  /* Obviously can't drop my own database */
966
24
  if (db_id == MyDatabaseId)
967
24
    ereport(ERROR,
968
24
        (errcode(ERRCODE_OBJECT_IN_USE),
969
24
         errmsg("cannot drop the currently open database")));
970
971
  /*
972
   * YugaByte allows dropping a database even when multiple sessions are dependent on that database.
973
   * Skip the following checks.
974
   */
975
24
  if (IsYugaByteEnabled())
976
21
    goto removing_database_from_system;
977
978
  /*
979
   * Check whether there are active logical slots that refer to the
980
   * to-be-dropped database. The database lock we are holding prevents the
981
   * creation of new slots using the database or existing slots becoming
982
   * active.
983
   */
984
3
  (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
985
3
  if (nslots_active)
986
0
  {
987
0
    ereport(ERROR,
988
0
        (errcode(ERRCODE_OBJECT_IN_USE),
989
0
         errmsg("database \"%s\" is used by an active logical replication slot",
990
0
            dbname),
991
0
         errdetail_plural("There is %d active slot.",
992
0
                  "There are %d active slots.",
993
0
                  nslots_active, nslots_active)));
994
0
  }
995
996
  /*
997
   * Check if there are subscriptions defined in the target database.
998
   *
999
   * We can't drop them automatically because they might be holding
1000
   * resources in other databases/instances.
1001
   */
1002
3
  if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1003
3
    ereport(ERROR,
1004
3
        (errcode(ERRCODE_OBJECT_IN_USE),
1005
3
         errmsg("database \"%s\" is being used by logical replication subscription",
1006
3
            dbname),
1007
3
         errdetail_plural("There is %d subscription.",
1008
3
                  "There are %d subscriptions.",
1009
3
                  nsubscriptions, nsubscriptions)));
1010
1011
21
removing_database_from_system:
1012
  /*
1013
   * Attempt to terminate all existing connections to the target database if
1014
   * the user has requested to do so.
1015
   */
1016
21
  if (force)
1017
0
    TerminateOtherDBBackends(db_id);
1018
1019
  /*
1020
   * Check for other backends in the target database.  (Because we hold the
1021
   * database lock, no new ones can start after this.)
1022
   *
1023
   * As in CREATE DATABASE, check this after other error conditions.
1024
   */
1025
21
  if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1026
21
    ereport(ERROR,
1027
21
        (errcode(ERRCODE_OBJECT_IN_USE),
1028
21
         errmsg("database \"%s\" is being accessed by other users",
1029
21
            dbname),
1030
21
         errdetail_busy_db(notherbackends, npreparedxacts)));
1031
1032
  /*
1033
   * Remove the database's tuple from pg_database.
1034
   */
1035
21
  tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
1036
21
  if (!HeapTupleIsValid(tup))
1037
0
    elog(ERROR, "cache lookup failed for database %u", db_id);
1038
1039
21
  CatalogTupleDelete(pgdbrel, tup);
1040
1041
21
  ReleaseSysCache(tup);
1042
1043
  /*
1044
   * Delete any comments or security labels associated with the database.
1045
   */
1046
21
  DeleteSharedComments(db_id, DatabaseRelationId);
1047
21
  DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1048
1049
  /*
1050
   * Remove settings associated with this database
1051
   */
1052
21
  DropSetting(db_id, InvalidOid);
1053
1054
  /*
1055
   * Remove shared dependency references for the database.
1056
   */
1057
21
  dropDatabaseDependencies(db_id);
1058
1059
  /*
1060
   * Drop db-specific replication slots.
1061
   */
1062
21
  ReplicationSlotsDropDBSlots(db_id);
1063
1064
  /*
1065
   * Drop pages for this database that are in the shared buffer cache. This
1066
   * is important to ensure that no remaining backend tries to write out a
1067
   * dirty buffer to the dead database later...
1068
   */
1069
21
  DropDatabaseBuffers(db_id);
1070
1071
  /*
1072
   * Tell the stats collector to forget it immediately, too.
1073
   */
1074
21
  pgstat_drop_database(db_id);
1075
1076
  /*
1077
   * Tell checkpointer to forget any pending fsync and unlink requests for
1078
   * files in the database; else the fsyncs will fail at next checkpoint, or
1079
   * worse, it will delete files that belong to a newly created database
1080
   * with the same OID.
1081
   */
1082
21
  ForgetDatabaseFsyncRequests(db_id);
1083
1084
  /*
1085
   * Force a checkpoint to make sure the checkpointer has received the
1086
   * message sent by ForgetDatabaseFsyncRequests. On Windows, this also
1087
   * ensures that background procs don't hold any open files, which would
1088
   * cause rmdir() to fail.
1089
   */
1090
21
  RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1091
1092
  /*
1093
   * Remove all tablespace subdirs belonging to the database.
1094
   */
1095
21
  remove_dbtablespaces(db_id);
1096
1097
  /*
1098
   * Close pg_database, but keep lock till commit.
1099
   */
1100
21
  heap_close(pgdbrel, NoLock);
1101
1102
  /*
1103
   * Force synchronous commit, thus minimizing the window between removal of
1104
   * the database files and committal of the transaction. If we crash before
1105
   * committing, we'll have a DB that's gone on disk but still there
1106
   * according to pg_database, which is not good.
1107
   */
1108
21
  ForceSyncCommit();
1109
1110
  /*
1111
   * Call YugaByte to delete the entries ourselves.
1112
   */
1113
21
  if (IsYugaByteEnabled())
1114
21
  {
1115
21
    YBCDropDatabase(db_id, dbname);
1116
21
  }
1117
21
}
1118
1119
1120
/*
1121
 * Rename database
1122
 */
1123
ObjectAddress
1124
RenameDatabase(const char *oldname, const char *newname)
1125
0
{
1126
0
  Oid     db_id;
1127
0
  HeapTuple newtup;
1128
0
  Relation  rel;
1129
0
  int     notherbackends;
1130
0
  int     npreparedxacts;
1131
0
  ObjectAddress address;
1132
1133
  /*
1134
   * Look up the target database's OID, and get exclusive lock on it. We
1135
   * need this for the same reasons as DROP DATABASE.
1136
   */
1137
0
  rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1138
1139
0
  if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
1140
0
           NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1141
0
    ereport(ERROR,
1142
0
        (errcode(ERRCODE_UNDEFINED_DATABASE),
1143
0
         errmsg("database \"%s\" does not exist", oldname)));
1144
1145
  /* must be owner */
1146
0
  if (!pg_database_ownercheck(db_id, GetUserId()))
1147
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1148
0
             oldname);
1149
1150
  /* must have createdb rights */
1151
0
  if (!have_createdb_privilege())
1152
0
    ereport(ERROR,
1153
0
        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1154
0
         errmsg("permission denied to rename database")));
1155
1156
  /*
1157
   * Make sure the new name doesn't exist.  See notes for same error in
1158
   * CREATE DATABASE.
1159
   */
1160
0
  if (OidIsValid(get_database_oid(newname, true)))
1161
0
    ereport(ERROR,
1162
0
        (errcode(ERRCODE_DUPLICATE_DATABASE),
1163
0
         errmsg("database \"%s\" already exists", newname)));
1164
1165
  /*
1166
   * XXX Client applications probably store the current database somewhere,
1167
   * so renaming it could cause confusion.  On the other hand, there may not
1168
   * be an actual problem besides a little confusion, so think about this
1169
   * and decide.
1170
   */
1171
0
  if (db_id == MyDatabaseId)
1172
0
    ereport(ERROR,
1173
0
        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1174
0
         errmsg("current database cannot be renamed")));
1175
1176
  /*
1177
   * Make sure the database does not have active sessions.  This is the same
1178
   * concern as above, but applied to other sessions.
1179
   *
1180
   * As in CREATE DATABASE, check this after other error conditions.
1181
   */
1182
0
  if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1183
0
    ereport(ERROR,
1184
0
        (errcode(ERRCODE_OBJECT_IN_USE),
1185
0
         errmsg("database \"%s\" is being accessed by other users",
1186
0
            oldname),
1187
0
         errdetail_busy_db(notherbackends, npreparedxacts)));
1188
1189
  /* rename */
1190
0
  newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1191
0
  if (!HeapTupleIsValid(newtup))
1192
0
    elog(ERROR, "cache lookup failed for database %u", db_id);
1193
0
  namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1194
0
  CatalogTupleUpdate(rel, &newtup->t_self, newtup);
1195
1196
0
  if (IsYugaByteEnabled()) {
1197
0
    YBCPgStatement handle = NULL;
1198
0
    HandleYBStatus(YBCPgNewAlterDatabase(oldname, db_id, &handle));
1199
0
    HandleYBStatus(YBCPgAlterDatabaseRenameDatabase(handle, newname));
1200
0
    HandleYBStatus(YBCPgExecAlterDatabase(handle));
1201
0
  }
1202
1203
0
  InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1204
1205
0
  ObjectAddressSet(address, DatabaseRelationId, db_id);
1206
1207
  /*
1208
   * Close pg_database, but keep lock till commit.
1209
   */
1210
0
  heap_close(rel, NoLock);
1211
1212
0
  return address;
1213
0
}
1214
1215
1216
/*
1217
 * ALTER DATABASE SET TABLESPACE
1218
 */
1219
static void
1220
movedb(const char *dbname, const char *tblspcname)
1221
0
{
1222
0
  Oid     db_id;
1223
0
  Relation  pgdbrel;
1224
0
  int     notherbackends;
1225
0
  int     npreparedxacts;
1226
0
  HeapTuple oldtuple,
1227
0
        newtuple;
1228
0
  Oid     src_tblspcoid,
1229
0
        dst_tblspcoid;
1230
0
  Datum   new_record[Natts_pg_database];
1231
0
  bool    new_record_nulls[Natts_pg_database];
1232
0
  bool    new_record_repl[Natts_pg_database];
1233
0
  ScanKeyData scankey;
1234
0
  SysScanDesc sysscan;
1235
0
  AclResult aclresult;
1236
0
  char     *src_dbpath;
1237
0
  char     *dst_dbpath;
1238
0
  DIR      *dstdir;
1239
0
  struct dirent *xlde;
1240
0
  movedb_failure_params fparms;
1241
1242
  /*
1243
   * Look up the target database's OID, and get exclusive lock on it. We
1244
   * need this to ensure that no new backend starts up in the database while
1245
   * we are moving it, and that no one is using it as a CREATE DATABASE
1246
   * template or trying to delete it.
1247
   */
1248
0
  pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
1249
1250
0
  if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1251
0
           NULL, NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
1252
0
    ereport(ERROR,
1253
0
        (errcode(ERRCODE_UNDEFINED_DATABASE),
1254
0
         errmsg("database \"%s\" does not exist", dbname)));
1255
1256
  /*
1257
   * We actually need a session lock, so that the lock will persist across
1258
   * the commit/restart below.  (We could almost get away with letting the
1259
   * lock be released at commit, except that someone could try to move
1260
   * relations of the DB back into the old directory while we rmtree() it.)
1261
   */
1262
0
  LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1263
0
                 AccessExclusiveLock);
1264
1265
  /*
1266
   * Permission checks
1267
   */
1268
0
  if (!pg_database_ownercheck(db_id, GetUserId()))
1269
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1270
0
             dbname);
1271
1272
  /*
1273
   * Obviously can't move the tables of my own database
1274
   */
1275
0
  if (db_id == MyDatabaseId)
1276
0
    ereport(ERROR,
1277
0
        (errcode(ERRCODE_OBJECT_IN_USE),
1278
0
         errmsg("cannot change the tablespace of the currently open database")));
1279
1280
  /*
1281
   * Get tablespace's oid
1282
   */
1283
0
  dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1284
1285
  /*
1286
   * Permission checks
1287
   */
1288
0
  aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1289
0
                     ACL_CREATE);
1290
0
  if (aclresult != ACLCHECK_OK)
1291
0
    aclcheck_error(aclresult, OBJECT_TABLESPACE,
1292
0
             tblspcname);
1293
1294
  /*
1295
   * pg_global must never be the default tablespace
1296
   */
1297
0
  if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1298
0
    ereport(ERROR,
1299
0
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1300
0
         errmsg("pg_global cannot be used as default tablespace")));
1301
1302
  /*
1303
   * No-op if same tablespace
1304
   */
1305
0
  if (src_tblspcoid == dst_tblspcoid)
1306
0
  {
1307
0
    heap_close(pgdbrel, NoLock);
1308
0
    UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1309
0
                   AccessExclusiveLock);
1310
0
    return;
1311
0
  }
1312
1313
  /*
1314
   * Check for other backends in the target database.  (Because we hold the
1315
   * database lock, no new ones can start after this.)
1316
   *
1317
   * As in CREATE DATABASE, check this after other error conditions.
1318
   */
1319
0
  if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1320
0
    ereport(ERROR,
1321
0
        (errcode(ERRCODE_OBJECT_IN_USE),
1322
0
         errmsg("database \"%s\" is being accessed by other users",
1323
0
            dbname),
1324
0
         errdetail_busy_db(notherbackends, npreparedxacts)));
1325
1326
  /*
1327
   * Get old and new database paths
1328
   */
1329
0
  src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1330
0
  dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1331
1332
  /*
1333
   * Force a checkpoint before proceeding. This will force all dirty
1334
   * buffers, including those of unlogged tables, out to disk, to ensure
1335
   * source database is up-to-date on disk for the copy.
1336
   * FlushDatabaseBuffers() would suffice for that, but we also want to
1337
   * process any pending unlink requests. Otherwise, the check for existing
1338
   * files in the target directory might fail unnecessarily, not to mention
1339
   * that the copy might fail due to source files getting deleted under it.
1340
   * On Windows, this also ensures that background procs don't hold any open
1341
   * files, which would cause rmdir() to fail.
1342
   */
1343
0
  RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
1344
0
            | CHECKPOINT_FLUSH_ALL);
1345
1346
  /*
1347
   * Now drop all buffers holding data of the target database; they should
1348
   * no longer be dirty so DropDatabaseBuffers is safe.
1349
   *
1350
   * It might seem that we could just let these buffers age out of shared
1351
   * buffers naturally, since they should not get referenced anymore.  The
1352
   * problem with that is that if the user later moves the database back to
1353
   * its original tablespace, any still-surviving buffers would appear to
1354
   * contain valid data again --- but they'd be missing any changes made in
1355
   * the database while it was in the new tablespace.  In any case, freeing
1356
   * buffers that should never be used again seems worth the cycles.
1357
   *
1358
   * Note: it'd be sufficient to get rid of buffers matching db_id and
1359
   * src_tblspcoid, but bufmgr.c presently provides no API for that.
1360
   */
1361
0
  DropDatabaseBuffers(db_id);
1362
1363
  /*
1364
   * Check for existence of files in the target directory, i.e., objects of
1365
   * this database that are already in the target tablespace.  We can't
1366
   * allow the move in such a case, because we would need to change those
1367
   * relations' pg_class.reltablespace entries to zero, and we don't have
1368
   * access to the DB's pg_class to do so.
1369
   */
1370
0
  dstdir = AllocateDir(dst_dbpath);
1371
0
  if (dstdir != NULL)
1372
0
  {
1373
0
    while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1374
0
    {
1375
0
      if (strcmp(xlde->d_name, ".") == 0 ||
1376
0
        strcmp(xlde->d_name, "..") == 0)
1377
0
        continue;
1378
1379
0
      ereport(ERROR,
1380
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1381
0
           errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1382
0
              dbname, tblspcname),
1383
0
           errhint("You must move them back to the database's default tablespace before using this command.")));
1384
0
    }
1385
1386
0
    FreeDir(dstdir);
1387
1388
    /*
1389
     * The directory exists but is empty. We must remove it before using
1390
     * the copydir function.
1391
     */
1392
0
    if (rmdir(dst_dbpath) != 0)
1393
0
      elog(ERROR, "could not remove directory \"%s\": %m",
1394
0
         dst_dbpath);
1395
0
  }
1396
1397
  /*
1398
   * Use an ENSURE block to make sure we remove the debris if the copy fails
1399
   * (eg, due to out-of-disk-space).  This is not a 100% solution, because
1400
   * of the possibility of failure during transaction commit, but it should
1401
   * handle most scenarios.
1402
   */
1403
0
  fparms.dest_dboid = db_id;
1404
0
  fparms.dest_tsoid = dst_tblspcoid;
1405
0
  PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1406
0
              PointerGetDatum(&fparms));
1407
0
  {
1408
    /*
1409
     * Copy files from the old tablespace to the new one
1410
     */
1411
0
    copydir(src_dbpath, dst_dbpath, false);
1412
1413
    /*
1414
     * Record the filesystem change in XLOG
1415
     */
1416
0
    {
1417
0
      xl_dbase_create_rec xlrec;
1418
1419
0
      xlrec.db_id = db_id;
1420
0
      xlrec.tablespace_id = dst_tblspcoid;
1421
0
      xlrec.src_db_id = db_id;
1422
0
      xlrec.src_tablespace_id = src_tblspcoid;
1423
1424
0
      XLogBeginInsert();
1425
0
      XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
1426
1427
0
      (void) XLogInsert(RM_DBASE_ID,
1428
0
                XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
1429
0
    }
1430
1431
    /*
1432
     * Update the database's pg_database tuple
1433
     */
1434
0
    ScanKeyInit(&scankey,
1435
0
          Anum_pg_database_datname,
1436
0
          BTEqualStrategyNumber, F_NAMEEQ,
1437
0
          CStringGetDatum(dbname));
1438
0
    sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1439
0
                   NULL, 1, &scankey);
1440
0
    oldtuple = systable_getnext(sysscan);
1441
0
    if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
1442
0
      ereport(ERROR,
1443
0
          (errcode(ERRCODE_UNDEFINED_DATABASE),
1444
0
           errmsg("database \"%s\" does not exist", dbname)));
1445
1446
0
    MemSet(new_record, 0, sizeof(new_record));
1447
0
    MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1448
0
    MemSet(new_record_repl, false, sizeof(new_record_repl));
1449
1450
0
    new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1451
0
    new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1452
1453
0
    newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1454
0
                   new_record,
1455
0
                   new_record_nulls, new_record_repl);
1456
0
    CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
1457
1458
0
    InvokeObjectPostAlterHook(DatabaseRelationId,
1459
0
                  HeapTupleGetOid(newtuple), 0);
1460
1461
0
    systable_endscan(sysscan);
1462
1463
    /*
1464
     * Force another checkpoint here.  As in CREATE DATABASE, this is to
1465
     * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1466
     * operation, which would cause us to lose any unlogged operations
1467
     * done in the new DB tablespace before the next checkpoint.
1468
     */
1469
0
    RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1470
1471
    /*
1472
     * Force synchronous commit, thus minimizing the window between
1473
     * copying the database files and committal of the transaction. If we
1474
     * crash before committing, we'll leave an orphaned set of files on
1475
     * disk, which is not fatal but not good either.
1476
     */
1477
0
    ForceSyncCommit();
1478
1479
    /*
1480
     * Close pg_database, but keep lock till commit.
1481
     */
1482
0
    heap_close(pgdbrel, NoLock);
1483
0
  }
1484
0
  PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1485
0
                PointerGetDatum(&fparms));
1486
1487
  /*
1488
   * Commit the transaction so that the pg_database update is committed. If
1489
   * we crash while removing files, the database won't be corrupt, we'll
1490
   * just leave some orphaned files in the old directory.
1491
   *
1492
   * (This is OK because we know we aren't inside a transaction block.)
1493
   *
1494
   * XXX would it be safe/better to do this inside the ensure block?  Not
1495
   * convinced it's a good idea; consider elog just after the transaction
1496
   * really commits.
1497
   */
1498
0
  PopActiveSnapshot();
1499
0
  CommitTransactionCommand();
1500
1501
  /* Start new transaction for the remaining work; don't need a snapshot */
1502
0
  StartTransactionCommand();
1503
1504
  /*
1505
   * Remove files from the old tablespace
1506
   */
1507
0
  if (!rmtree(src_dbpath, true))
1508
0
    ereport(WARNING,
1509
0
        (errmsg("some useless files may be left behind in old database directory \"%s\"",
1510
0
            src_dbpath)));
1511
1512
  /*
1513
   * Record the filesystem change in XLOG
1514
   */
1515
0
  {
1516
0
    xl_dbase_drop_rec xlrec;
1517
1518
0
    xlrec.db_id = db_id;
1519
0
    xlrec.tablespace_id = src_tblspcoid;
1520
1521
0
    XLogBeginInsert();
1522
0
    XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1523
1524
0
    (void) XLogInsert(RM_DBASE_ID,
1525
0
              XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1526
0
  }
1527
1528
  /* Now it's safe to release the database lock */
1529
0
  UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1530
0
                 AccessExclusiveLock);
1531
0
}
1532
1533
/* Error cleanup callback for movedb */
1534
static void
1535
movedb_failure_callback(int code, Datum arg)
1536
0
{
1537
0
  movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1538
0
  char     *dstpath;
1539
1540
  /* Get rid of anything we managed to copy to the target directory */
1541
0
  dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1542
1543
0
  (void) rmtree(dstpath, true);
1544
0
}
1545
1546
/*
1547
 * Process options and call dropdb function.
1548
 */
1549
void
1550
DropDatabase(ParseState *pstate, DropdbStmt *stmt)
1551
25
{
1552
25
  bool    force = false;
1553
25
  ListCell   *lc;
1554
1555
25
  foreach(lc, stmt->options)
1556
0
  {
1557
0
    DefElem    *opt = (DefElem *) lfirst(lc);
1558
1559
0
    if (strcmp(opt->defname, "force") == 0)
1560
0
      force = true;
1561
0
    else
1562
0
      ereport(ERROR,
1563
0
          (errcode(ERRCODE_SYNTAX_ERROR),
1564
0
           errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
1565
0
           parser_errposition(pstate, opt->location)));
1566
0
  }
1567
1568
25
  dropdb(stmt->dbname, stmt->missing_ok, force);
1569
25
}
1570
1571
/*
1572
 * ALTER DATABASE name ...
1573
 */
1574
Oid
1575
AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
1576
0
{
1577
0
  Relation  rel;
1578
0
  Oid     dboid;
1579
0
  HeapTuple tuple,
1580
0
        newtuple;
1581
0
  ScanKeyData scankey;
1582
0
  SysScanDesc scan;
1583
0
  ListCell   *option;
1584
0
  bool    dbistemplate = false;
1585
0
  bool    dballowconnections = true;
1586
0
  int     dbconnlimit = -1;
1587
0
  DefElem    *distemplate = NULL;
1588
0
  DefElem    *dallowconnections = NULL;
1589
0
  DefElem    *dconnlimit = NULL;
1590
0
  DefElem    *dtablespace = NULL;
1591
0
  DefElem   **unsupported_options[] = {&distemplate, &dtablespace};
1592
0
  Datum   new_record[Natts_pg_database];
1593
0
  bool    new_record_nulls[Natts_pg_database];
1594
0
  bool    new_record_repl[Natts_pg_database];
1595
1596
  /* Extract options from the statement node tree */
1597
0
  foreach(option, stmt->options)
1598
0
  {
1599
0
    DefElem    *defel = (DefElem *) lfirst(option);
1600
1601
0
    if (strcmp(defel->defname, "is_template") == 0)
1602
0
    {
1603
0
      if (distemplate)
1604
0
        ereport(ERROR,
1605
0
            (errcode(ERRCODE_SYNTAX_ERROR),
1606
0
             errmsg("conflicting or redundant options"),
1607
0
             parser_errposition(pstate, defel->location)));
1608
0
      distemplate = defel;
1609
0
    }
1610
0
    else if (strcmp(defel->defname, "allow_connections") == 0)
1611
0
    {
1612
0
      if (dallowconnections)
1613
0
        ereport(ERROR,
1614
0
            (errcode(ERRCODE_SYNTAX_ERROR),
1615
0
             errmsg("conflicting or redundant options"),
1616
0
             parser_errposition(pstate, defel->location)));
1617
0
      dallowconnections = defel;
1618
0
    }
1619
0
    else if (strcmp(defel->defname, "connection_limit") == 0)
1620
0
    {
1621
0
      if (dconnlimit)
1622
0
        ereport(ERROR,
1623
0
            (errcode(ERRCODE_SYNTAX_ERROR),
1624
0
             errmsg("conflicting or redundant options"),
1625
0
             parser_errposition(pstate, defel->location)));
1626
0
      dconnlimit = defel;
1627
0
    }
1628
0
    else if (strcmp(defel->defname, "tablespace") == 0)
1629
0
    {
1630
0
      if (dtablespace)
1631
0
        ereport(ERROR,
1632
0
            (errcode(ERRCODE_SYNTAX_ERROR),
1633
0
             errmsg("conflicting or redundant options"),
1634
0
             parser_errposition(pstate, defel->location)));
1635
0
      dtablespace = defel;
1636
0
    }
1637
0
    else
1638
0
      ereport(ERROR,
1639
0
          (errcode(ERRCODE_SYNTAX_ERROR),
1640
0
           errmsg("option \"%s\" not recognized", defel->defname),
1641
0
           parser_errposition(pstate, defel->location)));
1642
0
  }
1643
1644
  /* Check YB options support */
1645
0
  if (YBIsUsingYBParser()) {
1646
0
    for (int i = lengthof(unsupported_options); i > 0; --i) {
1647
0
      DefElem *option = *unsupported_options[i - 1];
1648
0
      if (option != NULL && option->arg != NULL) {
1649
0
        ereport(YBUnsupportedFeatureSignalLevel(),
1650
0
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1651
0
             errmsg("Altering %s option is not yet supported",
1652
0
                option->defname),
1653
0
             errhint("Please report the issue on "
1654
0
                 "https://github.com/YugaByte/yugabyte-db"
1655
0
                 "/issues"),
1656
0
             parser_errposition(pstate, option->location)));
1657
0
      }
1658
0
    }
1659
0
  }
1660
1661
0
  if (dtablespace)
1662
0
  {
1663
    /*
1664
     * While the SET TABLESPACE syntax doesn't allow any other options,
1665
     * somebody could write "WITH TABLESPACE ...".  Forbid any other
1666
     * options from being specified in that case.
1667
     */
1668
0
    if (list_length(stmt->options) != 1)
1669
0
      ereport(ERROR,
1670
0
          (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1671
0
           errmsg("option \"%s\" cannot be specified with other options",
1672
0
              dtablespace->defname),
1673
0
           parser_errposition(pstate, dtablespace->location)));
1674
    /* this case isn't allowed within a transaction block */
1675
0
    PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1676
0
    movedb(stmt->dbname, defGetString(dtablespace));
1677
0
    return InvalidOid;
1678
0
  }
1679
1680
0
  if (distemplate && distemplate->arg)
1681
0
    dbistemplate = defGetBoolean(distemplate);
1682
0
  if (dallowconnections && dallowconnections->arg)
1683
0
    dballowconnections = defGetBoolean(dallowconnections);
1684
0
  if (dconnlimit && dconnlimit->arg)
1685
0
  {
1686
0
    dbconnlimit = defGetInt32(dconnlimit);
1687
0
    if (dbconnlimit < -1)
1688
0
      ereport(ERROR,
1689
0
          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1690
0
           errmsg("invalid connection limit: %d", dbconnlimit)));
1691
0
  }
1692
1693
  /*
1694
   * Get the old tuple.  We don't need a lock on the database per se,
1695
   * because we're not going to do anything that would mess up incoming
1696
   * connections.
1697
   */
1698
0
  rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1699
0
  ScanKeyInit(&scankey,
1700
0
        Anum_pg_database_datname,
1701
0
        BTEqualStrategyNumber, F_NAMEEQ,
1702
0
        CStringGetDatum(stmt->dbname));
1703
0
  scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1704
0
                NULL, 1, &scankey);
1705
0
  tuple = systable_getnext(scan);
1706
0
  if (!HeapTupleIsValid(tuple))
1707
0
    ereport(ERROR,
1708
0
        (errcode(ERRCODE_UNDEFINED_DATABASE),
1709
0
         errmsg("database \"%s\" does not exist", stmt->dbname)));
1710
1711
0
  dboid = HeapTupleGetOid(tuple);
1712
1713
0
  if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1714
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1715
0
             stmt->dbname);
1716
1717
  /*
1718
   * In order to avoid getting locked out and having to go through
1719
   * standalone mode, we refuse to disallow connections to the database
1720
   * we're currently connected to.  Lockout can still happen with concurrent
1721
   * sessions but the likeliness of that is not high enough to worry about.
1722
   */
1723
0
  if (!dballowconnections && dboid == MyDatabaseId)
1724
0
    ereport(ERROR,
1725
0
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1726
0
         errmsg("cannot disallow connections for current database")));
1727
1728
  /*
1729
   * Build an updated tuple, perusing the information just obtained
1730
   */
1731
0
  MemSet(new_record, 0, sizeof(new_record));
1732
0
  MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1733
0
  MemSet(new_record_repl, false, sizeof(new_record_repl));
1734
1735
0
  if (distemplate)
1736
0
  {
1737
0
    new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1738
0
    new_record_repl[Anum_pg_database_datistemplate - 1] = true;
1739
0
  }
1740
0
  if (dallowconnections)
1741
0
  {
1742
0
    new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1743
0
    new_record_repl[Anum_pg_database_datallowconn - 1] = true;
1744
0
  }
1745
0
  if (dconnlimit)
1746
0
  {
1747
0
    new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1748
0
    new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1749
0
  }
1750
1751
0
  newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1752
0
                 new_record_nulls, new_record_repl);
1753
0
  CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
1754
1755
0
  InvokeObjectPostAlterHook(DatabaseRelationId,
1756
0
                HeapTupleGetOid(newtuple), 0);
1757
1758
0
  systable_endscan(scan);
1759
1760
  /* Close pg_database, but keep lock till commit */
1761
0
  heap_close(rel, NoLock);
1762
1763
0
  return dboid;
1764
0
}
1765
1766
1767
/*
1768
 * ALTER DATABASE name SET ...
1769
 */
1770
Oid
1771
AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1772
0
{
1773
0
  Oid     datid = get_database_oid(stmt->dbname, false);
1774
1775
  /*
1776
   * Obtain a lock on the database and make sure it didn't go away in the
1777
   * meantime.
1778
   */
1779
0
  shdepLockAndCheckObject(DatabaseRelationId, datid);
1780
1781
0
  if (!pg_database_ownercheck(datid, GetUserId()))
1782
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1783
0
             stmt->dbname);
1784
1785
0
  AlterSetting(datid, InvalidOid, stmt->setstmt);
1786
1787
0
  UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
1788
1789
0
  return datid;
1790
0
}
1791
1792
1793
/*
1794
 * ALTER DATABASE name OWNER TO newowner
1795
 */
1796
ObjectAddress
1797
AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1798
3
{
1799
3
  Oid     db_id;
1800
3
  HeapTuple tuple;
1801
3
  Relation  rel;
1802
3
  ScanKeyData scankey;
1803
3
  SysScanDesc scan;
1804
3
  Form_pg_database datForm;
1805
3
  ObjectAddress address;
1806
1807
  /*
1808
   * Get the old tuple.  We don't need a lock on the database per se,
1809
   * because we're not going to do anything that would mess up incoming
1810
   * connections.
1811
   */
1812
3
  rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1813
3
  ScanKeyInit(&scankey,
1814
3
        Anum_pg_database_datname,
1815
3
        BTEqualStrategyNumber, F_NAMEEQ,
1816
3
        CStringGetDatum(dbname));
1817
3
  scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1818
3
                NULL, 1, &scankey);
1819
3
  tuple = systable_getnext(scan);
1820
3
  if (!HeapTupleIsValid(tuple))
1821
3
    ereport(ERROR,
1822
3
        (errcode(ERRCODE_UNDEFINED_DATABASE),
1823
3
         errmsg("database \"%s\" does not exist", dbname)));
1824
1825
3
  db_id = HeapTupleGetOid(tuple);
1826
3
  datForm = (Form_pg_database) GETSTRUCT(tuple);
1827
1828
  /*
1829
   * If the new owner is the same as the existing owner, consider the
1830
   * command to have succeeded.  This is to be consistent with other
1831
   * objects.
1832
   */
1833
3
  if (datForm->datdba != newOwnerId)
1834
3
  {
1835
3
    Datum   repl_val[Natts_pg_database];
1836
3
    bool    repl_null[Natts_pg_database];
1837
3
    bool    repl_repl[Natts_pg_database];
1838
3
    Acl      *newAcl;
1839
3
    Datum   aclDatum;
1840
3
    bool    isNull;
1841
3
    HeapTuple newtuple;
1842
1843
    /* Otherwise, must be owner of the existing object */
1844
3
    if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1845
0
      aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1846
0
               dbname);
1847
1848
    /* Must be able to become new owner */
1849
3
    check_is_member_of_role(GetUserId(), newOwnerId);
1850
1851
    /*
1852
     * must have createdb rights
1853
     *
1854
     * NOTE: This is different from other alter-owner checks in that the
1855
     * current user is checked for createdb privileges instead of the
1856
     * destination owner.  This is consistent with the CREATE case for
1857
     * databases.  Because superusers will always have this right, we need
1858
     * no special case for them.
1859
     */
1860
3
    if (!have_createdb_privilege())
1861
3
      ereport(ERROR,
1862
3
          (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1863
3
           errmsg("permission denied to change owner of database")));
1864
1865
3
    memset(repl_null, false, sizeof(repl_null));
1866
3
    memset(repl_repl, false, sizeof(repl_repl));
1867
1868
3
    repl_repl[Anum_pg_database_datdba - 1] = true;
1869
3
    repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1870
1871
    /*
1872
     * Determine the modified ACL for the new owner.  This is only
1873
     * necessary when the ACL is non-null.
1874
     */
1875
3
    aclDatum = heap_getattr(tuple,
1876
3
                Anum_pg_database_datacl,
1877
3
                RelationGetDescr(rel),
1878
3
                &isNull);
1879
3
    if (!isNull)
1880
0
    {
1881
0
      newAcl = aclnewowner(DatumGetAclP(aclDatum),
1882
0
                 datForm->datdba, newOwnerId);
1883
0
      repl_repl[Anum_pg_database_datacl - 1] = true;
1884
0
      repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1885
0
    }
1886
1887
3
    newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1888
3
    CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
1889
1890
3
    heap_freetuple(newtuple);
1891
1892
    /* Update owner dependency reference */
1893
3
    changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1894
3
                newOwnerId);
1895
3
  }
1896
1897
3
  InvokeObjectPostAlterHook(DatabaseRelationId, HeapTupleGetOid(tuple), 0);
1898
1899
3
  ObjectAddressSet(address, DatabaseRelationId, db_id);
1900
1901
3
  systable_endscan(scan);
1902
1903
  /* Close pg_database, but keep lock till commit */
1904
3
  heap_close(rel, NoLock);
1905
1906
3
  return address;
1907
3
}
1908
1909
1910
/*
1911
 * Helper functions
1912
 */
1913
1914
/*
1915
 * Look up info about the database named "name".  If the database exists,
1916
 * obtain the specified lock type on it, fill in any of the remaining
1917
 * parameters that aren't NULL, and return true.  If no such database,
1918
 * return false.
1919
 */
1920
static bool
1921
get_db_info(const char *name, LOCKMODE lockmode,
1922
      Oid *dbIdP, Oid *ownerIdP,
1923
      int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1924
      Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1925
      MultiXactId *dbMinMultiP,
1926
      Oid *dbTablespace, char **dbCollate, char **dbCtype)
1927
47
{
1928
47
  bool    result = false;
1929
47
  Relation  relation;
1930
1931
47
  AssertArg(name);
1932
1933
  /* Caller may wish to grab a better lock on pg_database beforehand... */
1934
47
  relation = heap_open(DatabaseRelationId, AccessShareLock);
1935
1936
  /*
1937
   * Loop covers the rare case where the database is renamed before we can
1938
   * lock it.  We try again just in case we can find a new one of the same
1939
   * name.
1940
   */
1941
47
  for (;;)
1942
47
  {
1943
47
    ScanKeyData scanKey;
1944
47
    SysScanDesc scan;
1945
47
    HeapTuple tuple;
1946
47
    Oid     dbOid;
1947
1948
    /*
1949
     * there's no syscache for database-indexed-by-name, so must do it the
1950
     * hard way
1951
     */
1952
47
    ScanKeyInit(&scanKey,
1953
47
          Anum_pg_database_datname,
1954
47
          BTEqualStrategyNumber, F_NAMEEQ,
1955
47
          CStringGetDatum(name));
1956
1957
47
    scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1958
47
                  NULL, 1, &scanKey);
1959
1960
47
    tuple = systable_getnext(scan);
1961
1962
47
    if (!HeapTupleIsValid(tuple))
1963
2
    {
1964
      /* definitely no database of that name */
1965
2
      systable_endscan(scan);
1966
2
      break;
1967
2
    }
1968
1969
45
    dbOid = HeapTupleGetOid(tuple);
1970
1971
45
    systable_endscan(scan);
1972
1973
    /*
1974
     * Now that we have a database OID, we can try to lock the DB.
1975
     */
1976
45
    if (lockmode != NoLock)
1977
45
      LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1978
1979
    /*
1980
     * And now, re-fetch the tuple by OID.  If it's still there and still
1981
     * the same name, we win; else, drop the lock and loop back to try
1982
     * again.
1983
     */
1984
45
    tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
1985
45
    if (HeapTupleIsValid(tuple))
1986
45
    {
1987
45
      Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1988
1989
45
      if (strcmp(name, NameStr(dbform->datname)) == 0)
1990
45
      {
1991
        /* oid of the database */
1992
45
        if (dbIdP)
1993
45
          *dbIdP = dbOid;
1994
        /* oid of the owner */
1995
45
        if (ownerIdP)
1996
22
          *ownerIdP = dbform->datdba;
1997
        /* character encoding */
1998
45
        if (encodingP)
1999
22
          *encodingP = dbform->encoding;
2000
        /* allowed as template? */
2001
45
        if (dbIsTemplateP)
2002
45
          *dbIsTemplateP = dbform->datistemplate;
2003
        /* allowing connections? */
2004
45
        if (dbAllowConnP)
2005
22
          *dbAllowConnP = dbform->datallowconn;
2006
        /* last system OID used in database */
2007
45
        if (dbLastSysOidP)
2008
22
          *dbLastSysOidP = dbform->datlastsysoid;
2009
        /* limit of frozen XIDs */
2010
45
        if (dbFrozenXidP)
2011
22
          *dbFrozenXidP = dbform->datfrozenxid;
2012
        /* minimum MultixactId */
2013
45
        if (dbMinMultiP)
2014
22
          *dbMinMultiP = dbform->datminmxid;
2015
        /* default tablespace for this database */
2016
45
        if (dbTablespace)
2017
22
          *dbTablespace = dbform->dattablespace;
2018
        /* default locale settings for this database */
2019
45
        if (dbCollate)
2020
22
          *dbCollate = pstrdup(NameStr(dbform->datcollate));
2021
45
        if (dbCtype)
2022
22
          *dbCtype = pstrdup(NameStr(dbform->datctype));
2023
45
        ReleaseSysCache(tuple);
2024
45
        result = true;
2025
45
        break;
2026
45
      }
2027
      /* can only get here if it was just renamed */
2028
0
      ReleaseSysCache(tuple);
2029
0
    }
2030
2031
0
    if (lockmode != NoLock)
2032
0
      UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2033
0
  }
2034
2035
47
  heap_close(relation, AccessShareLock);
2036
2037
47
  return result;
2038
47
}
2039
2040
/* Check if current user has createdb privileges */
2041
static bool
2042
have_createdb_privilege(void)
2043
30
{
2044
30
  bool    result = false;
2045
30
  HeapTuple utup;
2046
2047
  /* Superusers can always do everything */
2048
30
  if (superuser())
2049
19
    return true;
2050
2051
11
  utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2052
11
  if (HeapTupleIsValid(utup))
2053
11
  {
2054
11
    result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
2055
11
    ReleaseSysCache(utup);
2056
11
  }
2057
11
  return result;
2058
11
}
2059
2060
/*
2061
 * Remove tablespace directories
2062
 *
2063
 * We don't know what tablespaces db_id is using, so iterate through all
2064
 * tablespaces removing <tablespace>/db_id
2065
 */
2066
static void
2067
remove_dbtablespaces(Oid db_id)
2068
21
{
2069
21
  Relation  rel;
2070
21
  HeapScanDesc scan;
2071
21
  HeapTuple tuple;
2072
2073
21
  rel = heap_open(TableSpaceRelationId, AccessShareLock);
2074
21
  scan = heap_beginscan_catalog(rel, 0, NULL);
2075
63
  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2076
42
  {
2077
42
    Oid     dsttablespace = HeapTupleGetOid(tuple);
2078
42
    char     *dstpath;
2079
42
    struct stat st;
2080
2081
    /* Don't mess with the global tablespace */
2082
42
    if (dsttablespace == GLOBALTABLESPACE_OID)
2083
21
      continue;
2084
2085
21
    dstpath = GetDatabasePath(db_id, dsttablespace);
2086
2087
21
    if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
2088
21
    {
2089
      /* Assume we can ignore it */
2090
21
      pfree(dstpath);
2091
21
      continue;
2092
21
    }
2093
2094
0
    if (!rmtree(dstpath, true))
2095
0
      ereport(WARNING,
2096
0
          (errmsg("some useless files may be left behind in old database directory \"%s\"",
2097
0
              dstpath)));
2098
2099
    /* Record the filesystem change in XLOG */
2100
0
    {
2101
0
      xl_dbase_drop_rec xlrec;
2102
2103
0
      xlrec.db_id = db_id;
2104
0
      xlrec.tablespace_id = dsttablespace;
2105
2106
0
      XLogBeginInsert();
2107
0
      XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
2108
2109
0
      (void) XLogInsert(RM_DBASE_ID,
2110
0
                XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2111
0
    }
2112
2113
0
    pfree(dstpath);
2114
0
  }
2115
2116
21
  heap_endscan(scan);
2117
21
  heap_close(rel, AccessShareLock);
2118
21
}
2119
2120
/*
2121
 * Check for existing files that conflict with a proposed new DB OID;
2122
 * return true if there are any
2123
 *
2124
 * If there were a subdirectory in any tablespace matching the proposed new
2125
 * OID, we'd get a create failure due to the duplicate name ... and then we'd
2126
 * try to remove that already-existing subdirectory during the cleanup in
2127
 * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
2128
 * instead we make this extra check before settling on the OID of the new
2129
 * database.  This exactly parallels what GetNewRelFileNode() does for table
2130
 * relfilenode values.
2131
 */
2132
static bool
2133
check_db_file_conflict(Oid db_id)
2134
22
{
2135
22
  bool    result = false;
2136
22
  Relation  rel;
2137
22
  HeapScanDesc scan;
2138
22
  HeapTuple tuple;
2139
2140
22
  rel = heap_open(TableSpaceRelationId, AccessShareLock);
2141
22
  scan = heap_beginscan_catalog(rel, 0, NULL);
2142
66
  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2143
44
  {
2144
44
    Oid     dsttablespace = HeapTupleGetOid(tuple);
2145
44
    char     *dstpath;
2146
44
    struct stat st;
2147
2148
    /* Don't mess with the global tablespace */
2149
44
    if (dsttablespace == GLOBALTABLESPACE_OID)
2150
22
      continue;
2151
2152
22
    dstpath = GetDatabasePath(db_id, dsttablespace);
2153
2154
22
    if (lstat(dstpath, &st) == 0)
2155
0
    {
2156
      /* Found a conflicting file (or directory, whatever) */
2157
0
      pfree(dstpath);
2158
0
      result = true;
2159
0
      break;
2160
0
    }
2161
2162
22
    pfree(dstpath);
2163
22
  }
2164
2165
22
  heap_endscan(scan);
2166
22
  heap_close(rel, AccessShareLock);
2167
2168
22
  return result;
2169
22
}
2170
2171
/*
2172
 * Issue a suitable errdetail message for a busy database
2173
 */
2174
static int
2175
errdetail_busy_db(int notherbackends, int npreparedxacts)
2176
0
{
2177
0
  if (notherbackends > 0 && npreparedxacts > 0)
2178
2179
    /*
2180
     * We don't deal with singular versus plural here, since gettext
2181
     * doesn't support multiple plurals in one string.
2182
     */
2183
0
    errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
2184
0
          notherbackends, npreparedxacts);
2185
0
  else if (notherbackends > 0)
2186
0
    errdetail_plural("There is %d other session using the database.",
2187
0
             "There are %d other sessions using the database.",
2188
0
             notherbackends,
2189
0
             notherbackends);
2190
0
  else
2191
0
    errdetail_plural("There is %d prepared transaction using the database.",
2192
0
             "There are %d prepared transactions using the database.",
2193
0
             npreparedxacts,
2194
0
             npreparedxacts);
2195
0
  return 0;         /* just to keep ereport macro happy */
2196
0
}
2197
2198
/*
2199
 * get_database_oid - given a database name, look up the OID
2200
 *
2201
 * If missing_ok is false, throw an error if database name not found.  If
2202
 * true, just return InvalidOid.
2203
 */
2204
Oid
2205
get_database_oid(const char *dbname, bool missing_ok)
2206
1.78k
{
2207
1.78k
  Relation  pg_database;
2208
1.78k
  ScanKeyData entry[1];
2209
1.78k
  SysScanDesc scan;
2210
1.78k
  HeapTuple dbtuple;
2211
1.78k
  Oid     oid;
2212
2213
  /*
2214
   * There's no syscache for pg_database indexed by name, so we must look
2215
   * the hard way.
2216
   */
2217
1.78k
  pg_database = heap_open(DatabaseRelationId, AccessShareLock);
2218
1.78k
  ScanKeyInit(&entry[0],
2219
1.78k
        Anum_pg_database_datname,
2220
1.78k
        BTEqualStrategyNumber, F_NAMEEQ,
2221
1.78k
        CStringGetDatum(dbname));
2222
1.78k
  scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
2223
1.78k
                NULL, 1, entry);
2224
2225
1.78k
  dbtuple = systable_getnext(scan);
2226
2227
  /* We assume that there can be at most one matching tuple */
2228
1.78k
  if (HeapTupleIsValid(dbtuple))
2229
1.76k
    oid = HeapTupleGetOid(dbtuple);
2230
1.78k
  else
2231
21
    oid = InvalidOid;
2232
2233
1.78k
  systable_endscan(scan);
2234
1.78k
  heap_close(pg_database, AccessShareLock);
2235
2236
1.78k
  if (!OidIsValid(oid) && !missing_ok)
2237
1.78k
    ereport(ERROR,
2238
1.78k
        (errcode(ERRCODE_UNDEFINED_DATABASE),
2239
1.78k
         errmsg("database \"%s\" does not exist",
2240
1.78k
            dbname)));
2241
2242
1.78k
  return oid;
2243
1.78k
}
2244
2245
2246
/*
2247
 * get_database_name - given a database OID, look up the name
2248
 *
2249
 * Returns a palloc'd string, or NULL if no such database.
2250
 */
2251
char *
2252
get_database_name(Oid dbid)
2253
3.23k
{
2254
3.23k
  HeapTuple dbtuple;
2255
3.23k
  char     *result;
2256
2257
3.23k
  dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2258
3.23k
  if (HeapTupleIsValid(dbtuple))
2259
3.23k
  {
2260
3.23k
    result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
2261
3.23k
    ReleaseSysCache(dbtuple);
2262
3.23k
  }
2263
2
  else
2264
2
    result = NULL;
2265
2266
3.23k
  return result;
2267
3.23k
}
2268
2269
/*
2270
 * DATABASE resource manager's routines
2271
 */
2272
void
2273
dbase_redo(XLogReaderState *record)
2274
0
{
2275
0
  uint8   info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
2276
2277
  /* Backup blocks are not used in dbase records */
2278
0
  Assert(!XLogRecHasAnyBlockRefs(record));
2279
2280
0
  if (info == XLOG_DBASE_CREATE)
2281
0
  {
2282
0
    xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
2283
0
    char     *src_path;
2284
0
    char     *dst_path;
2285
0
    struct stat st;
2286
2287
0
    src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
2288
0
    dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2289
2290
    /*
2291
     * Our theory for replaying a CREATE is to forcibly drop the target
2292
     * subdirectory if present, then re-copy the source data. This may be
2293
     * more work than needed, but it is simple to implement.
2294
     */
2295
0
    if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
2296
0
    {
2297
0
      if (!rmtree(dst_path, true))
2298
        /* If this failed, copydir() below is going to error. */
2299
0
        ereport(WARNING,
2300
0
            (errmsg("some useless files may be left behind in old database directory \"%s\"",
2301
0
                dst_path)));
2302
0
    }
2303
2304
    /*
2305
     * Force dirty buffers out to disk, to ensure source database is
2306
     * up-to-date for the copy.
2307
     */
2308
0
    FlushDatabaseBuffers(xlrec->src_db_id);
2309
2310
    /*
2311
     * Copy this subdirectory to the new location
2312
     *
2313
     * We don't need to copy subdirectories
2314
     */
2315
0
    copydir(src_path, dst_path, false);
2316
0
  }
2317
0
  else if (info == XLOG_DBASE_DROP)
2318
0
  {
2319
0
    xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
2320
0
    char     *dst_path;
2321
2322
0
    dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2323
2324
0
    if (InHotStandby)
2325
0
    {
2326
      /*
2327
       * Lock database while we resolve conflicts to ensure that
2328
       * InitPostgres() cannot fully re-execute concurrently. This
2329
       * avoids backends re-connecting automatically to same database,
2330
       * which can happen in some cases.
2331
       *
2332
       * This will lock out walsenders trying to connect to db-specific
2333
       * slots for logical decoding too, so it's safe for us to drop
2334
       * slots.
2335
       */
2336
0
      LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2337
0
      ResolveRecoveryConflictWithDatabase(xlrec->db_id);
2338
0
    }
2339
2340
    /* Drop any database-specific replication slots */
2341
0
    ReplicationSlotsDropDBSlots(xlrec->db_id);
2342
2343
    /* Drop pages for this database that are in the shared buffer cache */
2344
0
    DropDatabaseBuffers(xlrec->db_id);
2345
2346
    /* Also, clean out any fsync requests that might be pending in md.c */
2347
0
    ForgetDatabaseFsyncRequests(xlrec->db_id);
2348
2349
    /* Clean out the xlog relcache too */
2350
0
    XLogDropDatabase(xlrec->db_id);
2351
2352
    /* And remove the physical files */
2353
0
    if (!rmtree(dst_path, true))
2354
0
      ereport(WARNING,
2355
0
          (errmsg("some useless files may be left behind in old database directory \"%s\"",
2356
0
              dst_path)));
2357
2358
0
    if (InHotStandby)
2359
0
    {
2360
      /*
2361
       * Release locks prior to commit. XXX There is a race condition
2362
       * here that may allow backends to reconnect, but the window for
2363
       * this is small because the gap between here and commit is mostly
2364
       * fairly small and it is unlikely that people will be dropping
2365
       * databases that we are trying to connect to anyway.
2366
       */
2367
0
      UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2368
0
    }
2369
0
  }
2370
0
  else
2371
0
    elog(PANIC, "dbase_redo: unknown op code %u", info);
2372
0
}