YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/backend/commands/publicationcmds.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * publicationcmds.c
4
 *    publication manipulation
5
 *
6
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 * IDENTIFICATION
10
 *    publicationcmds.c
11
 *
12
 *-------------------------------------------------------------------------
13
 */
14
15
#include "postgres.h"
16
17
#include "funcapi.h"
18
#include "miscadmin.h"
19
20
#include "access/genam.h"
21
#include "access/hash.h"
22
#include "access/heapam.h"
23
#include "access/htup_details.h"
24
#include "access/xact.h"
25
26
#include "catalog/catalog.h"
27
#include "catalog/indexing.h"
28
#include "catalog/namespace.h"
29
#include "catalog/objectaccess.h"
30
#include "catalog/objectaddress.h"
31
#include "catalog/pg_inherits.h"
32
#include "catalog/pg_type.h"
33
#include "catalog/pg_publication.h"
34
#include "catalog/pg_publication_rel.h"
35
36
#include "commands/dbcommands.h"
37
#include "commands/defrem.h"
38
#include "commands/event_trigger.h"
39
#include "commands/publicationcmds.h"
40
41
#include "utils/array.h"
42
#include "utils/builtins.h"
43
#include "utils/catcache.h"
44
#include "utils/fmgroids.h"
45
#include "utils/inval.h"
46
#include "utils/lsyscache.h"
47
#include "utils/rel.h"
48
#include "utils/syscache.h"
49
#include "utils/varlena.h"
50
51
/* Same as MAXNUMMESSAGES in sinvaladt.c */
52
0
#define MAX_RELCACHE_INVAL_MSGS 4096
53
54
static List *OpenTableList(List *tables);
55
static void CloseTableList(List *rels);
56
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
57
           AlterPublicationStmt *stmt);
58
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
59
60
static void
61
parse_publication_options(List *options,
62
              bool *publish_given,
63
              bool *publish_insert,
64
              bool *publish_update,
65
              bool *publish_delete,
66
              bool *publish_truncate)
67
0
{
68
0
  ListCell   *lc;
69
70
0
  *publish_given = false;
71
72
  /* Defaults are true */
73
0
  *publish_insert = true;
74
0
  *publish_update = true;
75
0
  *publish_delete = true;
76
0
  *publish_truncate = true;
77
78
  /* Parse options */
79
0
  foreach(lc, options)
80
0
  {
81
0
    DefElem    *defel = (DefElem *) lfirst(lc);
82
83
0
    if (strcmp(defel->defname, "publish") == 0)
84
0
    {
85
0
      char     *publish;
86
0
      List     *publish_list;
87
0
      ListCell   *lc;
88
89
0
      if (*publish_given)
90
0
        ereport(ERROR,
91
0
            (errcode(ERRCODE_SYNTAX_ERROR),
92
0
             errmsg("conflicting or redundant options")));
93
94
      /*
95
       * If publish option was given only the explicitly listed actions
96
       * should be published.
97
       */
98
0
      *publish_insert = false;
99
0
      *publish_update = false;
100
0
      *publish_delete = false;
101
0
      *publish_truncate = false;
102
103
0
      *publish_given = true;
104
0
      publish = defGetString(defel);
105
106
0
      if (!SplitIdentifierString(publish, ',', &publish_list))
107
0
        ereport(ERROR,
108
0
            (errcode(ERRCODE_SYNTAX_ERROR),
109
0
             errmsg("invalid list syntax for \"publish\" option")));
110
111
      /* Process the option list. */
112
0
      foreach(lc, publish_list)
113
0
      {
114
0
        char     *publish_opt = (char *) lfirst(lc);
115
116
0
        if (strcmp(publish_opt, "insert") == 0)
117
0
          *publish_insert = true;
118
0
        else if (strcmp(publish_opt, "update") == 0)
119
0
          *publish_update = true;
120
0
        else if (strcmp(publish_opt, "delete") == 0)
121
0
          *publish_delete = true;
122
0
        else if (strcmp(publish_opt, "truncate") == 0)
123
0
          *publish_truncate = true;
124
0
        else
125
0
          ereport(ERROR,
126
0
              (errcode(ERRCODE_SYNTAX_ERROR),
127
0
               errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
128
0
      }
129
0
    }
130
0
    else
131
0
      ereport(ERROR,
132
0
          (errcode(ERRCODE_SYNTAX_ERROR),
133
0
           errmsg("unrecognized publication parameter: %s", defel->defname)));
134
0
  }
135
0
}
136
137
/*
138
 * Create new publication.
139
 */
140
ObjectAddress
141
CreatePublication(CreatePublicationStmt *stmt)
142
0
{
143
0
  Relation  rel;
144
0
  ObjectAddress myself;
145
0
  Oid     puboid;
146
0
  bool    nulls[Natts_pg_publication];
147
0
  Datum   values[Natts_pg_publication];
148
0
  HeapTuple tup;
149
0
  bool    publish_given;
150
0
  bool    publish_insert;
151
0
  bool    publish_update;
152
0
  bool    publish_delete;
153
0
  bool    publish_truncate;
154
0
  AclResult aclresult;
155
156
  /* must have CREATE privilege on database */
157
0
  aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
158
0
  if (aclresult != ACLCHECK_OK)
159
0
    aclcheck_error(aclresult, OBJECT_DATABASE,
160
0
             get_database_name(MyDatabaseId));
161
162
  /* FOR ALL TABLES requires superuser */
163
0
  if (stmt->for_all_tables && !superuser())
164
0
    ereport(ERROR,
165
0
        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
166
0
         (errmsg("must be superuser to create FOR ALL TABLES publication"))));
167
168
0
  rel = heap_open(PublicationRelationId, RowExclusiveLock);
169
170
  /* Check if name is used */
171
0
  puboid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(stmt->pubname));
172
0
  if (OidIsValid(puboid))
173
0
  {
174
0
    ereport(ERROR,
175
0
        (errcode(ERRCODE_DUPLICATE_OBJECT),
176
0
         errmsg("publication \"%s\" already exists",
177
0
            stmt->pubname)));
178
0
  }
179
180
  /* Form a tuple. */
181
0
  memset(values, 0, sizeof(values));
182
0
  memset(nulls, false, sizeof(nulls));
183
184
0
  values[Anum_pg_publication_pubname - 1] =
185
0
    DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
186
0
  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
187
188
0
  parse_publication_options(stmt->options,
189
0
                &publish_given, &publish_insert,
190
0
                &publish_update, &publish_delete,
191
0
                &publish_truncate);
192
193
0
  values[Anum_pg_publication_puballtables - 1] =
194
0
    BoolGetDatum(stmt->for_all_tables);
195
0
  values[Anum_pg_publication_pubinsert - 1] =
196
0
    BoolGetDatum(publish_insert);
197
0
  values[Anum_pg_publication_pubupdate - 1] =
198
0
    BoolGetDatum(publish_update);
199
0
  values[Anum_pg_publication_pubdelete - 1] =
200
0
    BoolGetDatum(publish_delete);
201
0
  values[Anum_pg_publication_pubtruncate - 1] =
202
0
    BoolGetDatum(publish_truncate);
203
204
0
  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
205
206
  /* Insert tuple into catalog. */
207
0
  puboid = CatalogTupleInsert(rel, tup);
208
0
  heap_freetuple(tup);
209
210
0
  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
211
212
0
  ObjectAddressSet(myself, PublicationRelationId, puboid);
213
214
  /* Make the changes visible. */
215
0
  CommandCounterIncrement();
216
217
0
  if (stmt->tables)
218
0
  {
219
0
    List     *rels;
220
221
0
    Assert(list_length(stmt->tables) > 0);
222
223
0
    rels = OpenTableList(stmt->tables);
224
0
    PublicationAddTables(puboid, rels, true, NULL);
225
0
    CloseTableList(rels);
226
0
  }
227
228
0
  heap_close(rel, RowExclusiveLock);
229
230
0
  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
231
232
0
  return myself;
233
0
}
234
235
/*
236
 * Change options of a publication.
237
 */
238
static void
239
AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
240
            HeapTuple tup)
241
0
{
242
0
  bool    nulls[Natts_pg_publication];
243
0
  bool    replaces[Natts_pg_publication];
244
0
  Datum   values[Natts_pg_publication];
245
0
  bool    publish_given;
246
0
  bool    publish_insert;
247
0
  bool    publish_update;
248
0
  bool    publish_delete;
249
0
  bool    publish_truncate;
250
0
  ObjectAddress obj;
251
252
0
  parse_publication_options(stmt->options,
253
0
                &publish_given, &publish_insert,
254
0
                &publish_update, &publish_delete,
255
0
                &publish_truncate);
256
257
  /* Everything ok, form a new tuple. */
258
0
  memset(values, 0, sizeof(values));
259
0
  memset(nulls, false, sizeof(nulls));
260
0
  memset(replaces, false, sizeof(replaces));
261
262
0
  if (publish_given)
263
0
  {
264
0
    values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
265
0
    replaces[Anum_pg_publication_pubinsert - 1] = true;
266
267
0
    values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
268
0
    replaces[Anum_pg_publication_pubupdate - 1] = true;
269
270
0
    values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
271
0
    replaces[Anum_pg_publication_pubdelete - 1] = true;
272
273
0
    values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
274
0
    replaces[Anum_pg_publication_pubtruncate - 1] = true;
275
0
  }
276
277
0
  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
278
0
              replaces);
279
280
  /* Update the catalog. */
281
0
  CatalogTupleUpdate(rel, &tup->t_self, tup);
282
283
0
  CommandCounterIncrement();
284
285
  /* Invalidate the relcache. */
286
0
  if (((Form_pg_publication) GETSTRUCT(tup))->puballtables)
287
0
  {
288
0
    CacheInvalidateRelcacheAll();
289
0
  }
290
0
  else
291
0
  {
292
0
    List     *relids = GetPublicationRelations(HeapTupleGetOid(tup));
293
294
    /*
295
     * We don't want to send too many individual messages, at some point
296
     * it's cheaper to just reset whole relcache.
297
     */
298
0
    if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
299
0
    {
300
0
      ListCell   *lc;
301
302
0
      foreach(lc, relids)
303
0
      {
304
0
        Oid     relid = lfirst_oid(lc);
305
306
0
        CacheInvalidateRelcacheByRelid(relid);
307
0
      }
308
0
    }
309
0
    else
310
0
      CacheInvalidateRelcacheAll();
311
0
  }
312
313
0
  ObjectAddressSet(obj, PublicationRelationId, HeapTupleGetOid(tup));
314
0
  EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
315
0
                   (Node *) stmt);
316
317
0
  InvokeObjectPostAlterHook(PublicationRelationId, HeapTupleGetOid(tup), 0);
318
0
}
319
320
/*
321
 * Add or remove table to/from publication.
322
 */
323
static void
324
AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
325
             HeapTuple tup)
326
0
{
327
0
  Oid     pubid = HeapTupleGetOid(tup);
328
0
  List     *rels = NIL;
329
0
  Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
330
331
  /* Check that user is allowed to manipulate the publication tables. */
332
0
  if (pubform->puballtables)
333
0
    ereport(ERROR,
334
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
335
0
         errmsg("publication \"%s\" is defined as FOR ALL TABLES",
336
0
            NameStr(pubform->pubname)),
337
0
         errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
338
339
0
  Assert(list_length(stmt->tables) > 0);
340
341
0
  rels = OpenTableList(stmt->tables);
342
343
0
  if (stmt->tableAction == DEFELEM_ADD)
344
0
    PublicationAddTables(pubid, rels, false, stmt);
345
0
  else if (stmt->tableAction == DEFELEM_DROP)
346
0
    PublicationDropTables(pubid, rels, false);
347
0
  else            /* DEFELEM_SET */
348
0
  {
349
0
    List     *oldrelids = GetPublicationRelations(pubid);
350
0
    List     *delrels = NIL;
351
0
    ListCell   *oldlc;
352
353
    /* Calculate which relations to drop. */
354
0
    foreach(oldlc, oldrelids)
355
0
    {
356
0
      Oid     oldrelid = lfirst_oid(oldlc);
357
0
      ListCell   *newlc;
358
0
      bool    found = false;
359
360
0
      foreach(newlc, rels)
361
0
      {
362
0
        Relation  newrel = (Relation) lfirst(newlc);
363
364
0
        if (RelationGetRelid(newrel) == oldrelid)
365
0
        {
366
0
          found = true;
367
0
          break;
368
0
        }
369
0
      }
370
371
0
      if (!found)
372
0
      {
373
0
        Relation  oldrel = heap_open(oldrelid,
374
0
                         ShareUpdateExclusiveLock);
375
376
0
        delrels = lappend(delrels, oldrel);
377
0
      }
378
0
    }
379
380
    /* And drop them. */
381
0
    PublicationDropTables(pubid, delrels, true);
382
383
    /*
384
     * Don't bother calculating the difference for adding, we'll catch and
385
     * skip existing ones when doing catalog update.
386
     */
387
0
    PublicationAddTables(pubid, rels, true, stmt);
388
389
0
    CloseTableList(delrels);
390
0
  }
391
392
0
  CloseTableList(rels);
393
0
}
394
395
/*
396
 * Alter the existing publication.
397
 *
398
 * This is dispatcher function for AlterPublicationOptions and
399
 * AlterPublicationTables.
400
 */
401
void
402
AlterPublication(AlterPublicationStmt *stmt)
403
0
{
404
0
  Relation  rel;
405
0
  HeapTuple tup;
406
407
0
  rel = heap_open(PublicationRelationId, RowExclusiveLock);
408
409
0
  tup = SearchSysCacheCopy1(PUBLICATIONNAME,
410
0
                CStringGetDatum(stmt->pubname));
411
412
0
  if (!HeapTupleIsValid(tup))
413
0
    ereport(ERROR,
414
0
        (errcode(ERRCODE_UNDEFINED_OBJECT),
415
0
         errmsg("publication \"%s\" does not exist",
416
0
            stmt->pubname)));
417
418
  /* must be owner */
419
0
  if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
420
0
    aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
421
0
             stmt->pubname);
422
423
0
  if (stmt->options)
424
0
    AlterPublicationOptions(stmt, rel, tup);
425
0
  else
426
0
    AlterPublicationTables(stmt, rel, tup);
427
428
  /* Cleanup. */
429
0
  heap_freetuple(tup);
430
0
  heap_close(rel, RowExclusiveLock);
431
0
}
432
433
/*
434
 * Drop publication by OID
435
 */
436
void
437
RemovePublicationById(Oid pubid)
438
0
{
439
0
  Relation  rel;
440
0
  HeapTuple tup;
441
442
0
  rel = heap_open(PublicationRelationId, RowExclusiveLock);
443
444
0
  tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
445
446
0
  if (!HeapTupleIsValid(tup))
447
0
    elog(ERROR, "cache lookup failed for publication %u", pubid);
448
449
0
  CatalogTupleDelete(rel, tup);
450
451
0
  ReleaseSysCache(tup);
452
453
0
  heap_close(rel, RowExclusiveLock);
454
0
}
455
456
/*
457
 * Remove relation from publication by mapping OID.
458
 */
459
void
460
RemovePublicationRelById(Oid proid)
461
0
{
462
0
  Relation  rel;
463
0
  HeapTuple tup;
464
0
  Form_pg_publication_rel pubrel;
465
466
0
  rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
467
468
0
  tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
469
470
0
  if (!HeapTupleIsValid(tup))
471
0
    elog(ERROR, "cache lookup failed for publication table %u",
472
0
       proid);
473
474
0
  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
475
476
  /* Invalidate relcache so that publication info is rebuilt. */
477
0
  CacheInvalidateRelcacheByRelid(pubrel->prrelid);
478
479
0
  CatalogTupleDelete(rel, tup);
480
481
0
  ReleaseSysCache(tup);
482
483
0
  heap_close(rel, RowExclusiveLock);
484
0
}
485
486
/*
487
 * Open relations specified by a RangeVar list.
488
 * The returned tables are locked in ShareUpdateExclusiveLock mode.
489
 */
490
static List *
491
OpenTableList(List *tables)
492
0
{
493
0
  List     *relids = NIL;
494
0
  List     *rels = NIL;
495
0
  ListCell   *lc;
496
497
  /*
498
   * Open, share-lock, and check all the explicitly-specified relations
499
   */
500
0
  foreach(lc, tables)
501
0
  {
502
0
    RangeVar   *rv = castNode(RangeVar, lfirst(lc));
503
0
    bool    recurse = rv->inh;
504
0
    Relation  rel;
505
0
    Oid     myrelid;
506
507
    /* Allow query cancel in case this takes a long time */
508
0
    CHECK_FOR_INTERRUPTS();
509
510
0
    rel = heap_openrv(rv, ShareUpdateExclusiveLock);
511
0
    myrelid = RelationGetRelid(rel);
512
513
    /*
514
     * Filter out duplicates if user specifies "foo, foo".
515
     *
516
     * Note that this algorithm is known to not be very efficient (O(N^2))
517
     * but given that it only works on list of tables given to us by user
518
     * it's deemed acceptable.
519
     */
520
0
    if (list_member_oid(relids, myrelid))
521
0
    {
522
0
      heap_close(rel, ShareUpdateExclusiveLock);
523
0
      continue;
524
0
    }
525
526
0
    rels = lappend(rels, rel);
527
0
    relids = lappend_oid(relids, myrelid);
528
529
    /* Add children of this rel, if requested */
530
0
    if (recurse)
531
0
    {
532
0
      List     *children;
533
0
      ListCell   *child;
534
535
0
      children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
536
0
                       NULL);
537
538
0
      foreach(child, children)
539
0
      {
540
0
        Oid     childrelid = lfirst_oid(child);
541
542
        /* Allow query cancel in case this takes a long time */
543
0
        CHECK_FOR_INTERRUPTS();
544
545
        /*
546
         * Skip duplicates if user specified both parent and child
547
         * tables.
548
         */
549
0
        if (list_member_oid(relids, childrelid))
550
0
          continue;
551
552
        /* find_all_inheritors already got lock */
553
0
        rel = heap_open(childrelid, NoLock);
554
0
        rels = lappend(rels, rel);
555
0
        relids = lappend_oid(relids, childrelid);
556
0
      }
557
0
    }
558
0
  }
559
560
0
  list_free(relids);
561
562
0
  return rels;
563
0
}
564
565
/*
566
 * Close all relations in the list.
567
 */
568
static void
569
CloseTableList(List *rels)
570
0
{
571
0
  ListCell   *lc;
572
573
0
  foreach(lc, rels)
574
0
  {
575
0
    Relation  rel = (Relation) lfirst(lc);
576
577
0
    heap_close(rel, NoLock);
578
0
  }
579
0
}
580
581
/*
582
 * Add listed tables to the publication.
583
 */
584
static void
585
PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
586
           AlterPublicationStmt *stmt)
587
0
{
588
0
  ListCell   *lc;
589
590
0
  Assert(!stmt || !stmt->for_all_tables);
591
592
0
  foreach(lc, rels)
593
0
  {
594
0
    Relation  rel = (Relation) lfirst(lc);
595
0
    ObjectAddress obj;
596
597
    /* Must be owner of the table or superuser. */
598
0
    if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
599
0
      aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
600
0
               RelationGetRelationName(rel));
601
602
0
    obj = publication_add_relation(pubid, rel, if_not_exists);
603
0
    if (stmt)
604
0
    {
605
0
      EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
606
0
                       (Node *) stmt);
607
608
0
      InvokeObjectPostCreateHook(PublicationRelRelationId,
609
0
                     obj.objectId, 0);
610
0
    }
611
0
  }
612
0
}
613
614
/*
615
 * Remove listed tables from the publication.
616
 */
617
static void
618
PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
619
0
{
620
0
  ObjectAddress obj;
621
0
  ListCell   *lc;
622
0
  Oid     prid;
623
624
0
  foreach(lc, rels)
625
0
  {
626
0
    Relation  rel = (Relation) lfirst(lc);
627
0
    Oid     relid = RelationGetRelid(rel);
628
629
0
    prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
630
0
                 ObjectIdGetDatum(pubid));
631
0
    if (!OidIsValid(prid))
632
0
    {
633
0
      if (missing_ok)
634
0
        continue;
635
636
0
      ereport(ERROR,
637
0
          (errcode(ERRCODE_UNDEFINED_OBJECT),
638
0
           errmsg("relation \"%s\" is not part of the publication",
639
0
              RelationGetRelationName(rel))));
640
0
    }
641
642
0
    ObjectAddressSet(obj, PublicationRelRelationId, prid);
643
0
    performDeletion(&obj, DROP_CASCADE, 0);
644
0
  }
645
0
}
646
647
/*
648
 * Internal workhorse for changing a publication owner
649
 */
650
static void
651
AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
652
0
{
653
0
  Form_pg_publication form;
654
655
0
  form = (Form_pg_publication) GETSTRUCT(tup);
656
657
0
  if (form->pubowner == newOwnerId)
658
0
    return;
659
660
0
  if (!superuser())
661
0
  {
662
0
    AclResult aclresult;
663
664
    /* Must be owner */
665
0
    if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
666
0
      aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
667
0
               NameStr(form->pubname));
668
669
    /* Must be able to become new owner */
670
0
    check_is_member_of_role(GetUserId(), newOwnerId);
671
672
    /* New owner must have CREATE privilege on database */
673
0
    aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
674
0
    if (aclresult != ACLCHECK_OK)
675
0
      aclcheck_error(aclresult, OBJECT_DATABASE,
676
0
               get_database_name(MyDatabaseId));
677
678
0
    if (form->puballtables && !superuser_arg(newOwnerId))
679
0
      ereport(ERROR,
680
0
          (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
681
0
           errmsg("permission denied to change owner of publication \"%s\"",
682
0
              NameStr(form->pubname)),
683
0
           errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
684
0
  }
685
686
0
  form->pubowner = newOwnerId;
687
0
  CatalogTupleUpdate(rel, &tup->t_self, tup);
688
689
  /* Update owner dependency reference */
690
0
  changeDependencyOnOwner(PublicationRelationId,
691
0
              HeapTupleGetOid(tup),
692
0
              newOwnerId);
693
694
0
  InvokeObjectPostAlterHook(PublicationRelationId,
695
0
                HeapTupleGetOid(tup), 0);
696
0
}
697
698
/*
699
 * Change publication owner -- by name
700
 */
701
ObjectAddress
702
AlterPublicationOwner(const char *name, Oid newOwnerId)
703
0
{
704
0
  Oid     subid;
705
0
  HeapTuple tup;
706
0
  Relation  rel;
707
0
  ObjectAddress address;
708
709
0
  rel = heap_open(PublicationRelationId, RowExclusiveLock);
710
711
0
  tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
712
713
0
  if (!HeapTupleIsValid(tup))
714
0
    ereport(ERROR,
715
0
        (errcode(ERRCODE_UNDEFINED_OBJECT),
716
0
         errmsg("publication \"%s\" does not exist", name)));
717
718
0
  subid = HeapTupleGetOid(tup);
719
720
0
  AlterPublicationOwner_internal(rel, tup, newOwnerId);
721
722
0
  ObjectAddressSet(address, PublicationRelationId, subid);
723
724
0
  heap_freetuple(tup);
725
726
0
  heap_close(rel, RowExclusiveLock);
727
728
0
  return address;
729
0
}
730
731
/*
732
 * Change publication owner -- by OID
733
 */
734
void
735
AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
736
0
{
737
0
  HeapTuple tup;
738
0
  Relation  rel;
739
740
0
  rel = heap_open(PublicationRelationId, RowExclusiveLock);
741
742
0
  tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
743
744
0
  if (!HeapTupleIsValid(tup))
745
0
    ereport(ERROR,
746
0
        (errcode(ERRCODE_UNDEFINED_OBJECT),
747
0
         errmsg("publication with OID %u does not exist", subid)));
748
749
0
  AlterPublicationOwner_internal(rel, tup, newOwnerId);
750
751
0
  heap_freetuple(tup);
752
753
0
  heap_close(rel, RowExclusiveLock);
754
0
}