/Users/deen/code/yugabyte-db/src/postgres/src/backend/commands/publicationcmds.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * publicationcmds.c |
4 | | * publication manipulation |
5 | | * |
6 | | * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * IDENTIFICATION |
10 | | * publicationcmds.c |
11 | | * |
12 | | *------------------------------------------------------------------------- |
13 | | */ |
14 | | |
15 | | #include "postgres.h" |
16 | | |
17 | | #include "funcapi.h" |
18 | | #include "miscadmin.h" |
19 | | |
20 | | #include "access/genam.h" |
21 | | #include "access/hash.h" |
22 | | #include "access/heapam.h" |
23 | | #include "access/htup_details.h" |
24 | | #include "access/xact.h" |
25 | | |
26 | | #include "catalog/catalog.h" |
27 | | #include "catalog/indexing.h" |
28 | | #include "catalog/namespace.h" |
29 | | #include "catalog/objectaccess.h" |
30 | | #include "catalog/objectaddress.h" |
31 | | #include "catalog/pg_inherits.h" |
32 | | #include "catalog/pg_type.h" |
33 | | #include "catalog/pg_publication.h" |
34 | | #include "catalog/pg_publication_rel.h" |
35 | | |
36 | | #include "commands/dbcommands.h" |
37 | | #include "commands/defrem.h" |
38 | | #include "commands/event_trigger.h" |
39 | | #include "commands/publicationcmds.h" |
40 | | |
41 | | #include "utils/array.h" |
42 | | #include "utils/builtins.h" |
43 | | #include "utils/catcache.h" |
44 | | #include "utils/fmgroids.h" |
45 | | #include "utils/inval.h" |
46 | | #include "utils/lsyscache.h" |
47 | | #include "utils/rel.h" |
48 | | #include "utils/syscache.h" |
49 | | #include "utils/varlena.h" |
50 | | |
51 | | /* Same as MAXNUMMESSAGES in sinvaladt.c */ |
52 | 0 | #define MAX_RELCACHE_INVAL_MSGS 4096 |
53 | | |
54 | | static List *OpenTableList(List *tables); |
55 | | static void CloseTableList(List *rels); |
56 | | static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, |
57 | | AlterPublicationStmt *stmt); |
58 | | static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); |
59 | | |
60 | | static void |
61 | | parse_publication_options(List *options, |
62 | | bool *publish_given, |
63 | | bool *publish_insert, |
64 | | bool *publish_update, |
65 | | bool *publish_delete, |
66 | | bool *publish_truncate) |
67 | 0 | { |
68 | 0 | ListCell *lc; |
69 | |
|
70 | 0 | *publish_given = false; |
71 | | |
72 | | /* Defaults are true */ |
73 | 0 | *publish_insert = true; |
74 | 0 | *publish_update = true; |
75 | 0 | *publish_delete = true; |
76 | 0 | *publish_truncate = true; |
77 | | |
78 | | /* Parse options */ |
79 | 0 | foreach(lc, options) |
80 | 0 | { |
81 | 0 | DefElem *defel = (DefElem *) lfirst(lc); |
82 | |
|
83 | 0 | if (strcmp(defel->defname, "publish") == 0) |
84 | 0 | { |
85 | 0 | char *publish; |
86 | 0 | List *publish_list; |
87 | 0 | ListCell *lc; |
88 | |
|
89 | 0 | if (*publish_given) |
90 | 0 | ereport(ERROR, |
91 | 0 | (errcode(ERRCODE_SYNTAX_ERROR), |
92 | 0 | errmsg("conflicting or redundant options"))); |
93 | | |
94 | | /* |
95 | | * If publish option was given only the explicitly listed actions |
96 | | * should be published. |
97 | | */ |
98 | 0 | *publish_insert = false; |
99 | 0 | *publish_update = false; |
100 | 0 | *publish_delete = false; |
101 | 0 | *publish_truncate = false; |
102 | |
|
103 | 0 | *publish_given = true; |
104 | 0 | publish = defGetString(defel); |
105 | |
|
106 | 0 | if (!SplitIdentifierString(publish, ',', &publish_list)) |
107 | 0 | ereport(ERROR, |
108 | 0 | (errcode(ERRCODE_SYNTAX_ERROR), |
109 | 0 | errmsg("invalid list syntax for \"publish\" option"))); |
110 | | |
111 | | /* Process the option list. */ |
112 | 0 | foreach(lc, publish_list) |
113 | 0 | { |
114 | 0 | char *publish_opt = (char *) lfirst(lc); |
115 | |
|
116 | 0 | if (strcmp(publish_opt, "insert") == 0) |
117 | 0 | *publish_insert = true; |
118 | 0 | else if (strcmp(publish_opt, "update") == 0) |
119 | 0 | *publish_update = true; |
120 | 0 | else if (strcmp(publish_opt, "delete") == 0) |
121 | 0 | *publish_delete = true; |
122 | 0 | else if (strcmp(publish_opt, "truncate") == 0) |
123 | 0 | *publish_truncate = true; |
124 | 0 | else |
125 | 0 | ereport(ERROR, |
126 | 0 | (errcode(ERRCODE_SYNTAX_ERROR), |
127 | 0 | errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt))); |
128 | 0 | } |
129 | 0 | } |
130 | 0 | else |
131 | 0 | ereport(ERROR, |
132 | 0 | (errcode(ERRCODE_SYNTAX_ERROR), |
133 | 0 | errmsg("unrecognized publication parameter: %s", defel->defname))); |
134 | 0 | } |
135 | 0 | } |
136 | | |
137 | | /* |
138 | | * Create new publication. |
139 | | */ |
140 | | ObjectAddress |
141 | | CreatePublication(CreatePublicationStmt *stmt) |
142 | 0 | { |
143 | 0 | Relation rel; |
144 | 0 | ObjectAddress myself; |
145 | 0 | Oid puboid; |
146 | 0 | bool nulls[Natts_pg_publication]; |
147 | 0 | Datum values[Natts_pg_publication]; |
148 | 0 | HeapTuple tup; |
149 | 0 | bool publish_given; |
150 | 0 | bool publish_insert; |
151 | 0 | bool publish_update; |
152 | 0 | bool publish_delete; |
153 | 0 | bool publish_truncate; |
154 | 0 | AclResult aclresult; |
155 | | |
156 | | /* must have CREATE privilege on database */ |
157 | 0 | aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); |
158 | 0 | if (aclresult != ACLCHECK_OK) |
159 | 0 | aclcheck_error(aclresult, OBJECT_DATABASE, |
160 | 0 | get_database_name(MyDatabaseId)); |
161 | | |
162 | | /* FOR ALL TABLES requires superuser */ |
163 | 0 | if (stmt->for_all_tables && !superuser()) |
164 | 0 | ereport(ERROR, |
165 | 0 | (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
166 | 0 | (errmsg("must be superuser to create FOR ALL TABLES publication")))); |
167 | |
|
168 | 0 | rel = heap_open(PublicationRelationId, RowExclusiveLock); |
169 | | |
170 | | /* Check if name is used */ |
171 | 0 | puboid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(stmt->pubname)); |
172 | 0 | if (OidIsValid(puboid)) |
173 | 0 | { |
174 | 0 | ereport(ERROR, |
175 | 0 | (errcode(ERRCODE_DUPLICATE_OBJECT), |
176 | 0 | errmsg("publication \"%s\" already exists", |
177 | 0 | stmt->pubname))); |
178 | 0 | } |
179 | | |
180 | | /* Form a tuple. */ |
181 | 0 | memset(values, 0, sizeof(values)); |
182 | 0 | memset(nulls, false, sizeof(nulls)); |
183 | |
|
184 | 0 | values[Anum_pg_publication_pubname - 1] = |
185 | 0 | DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname)); |
186 | 0 | values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId()); |
187 | |
|
188 | 0 | parse_publication_options(stmt->options, |
189 | 0 | &publish_given, &publish_insert, |
190 | 0 | &publish_update, &publish_delete, |
191 | 0 | &publish_truncate); |
192 | |
|
193 | 0 | values[Anum_pg_publication_puballtables - 1] = |
194 | 0 | BoolGetDatum(stmt->for_all_tables); |
195 | 0 | values[Anum_pg_publication_pubinsert - 1] = |
196 | 0 | BoolGetDatum(publish_insert); |
197 | 0 | values[Anum_pg_publication_pubupdate - 1] = |
198 | 0 | BoolGetDatum(publish_update); |
199 | 0 | values[Anum_pg_publication_pubdelete - 1] = |
200 | 0 | BoolGetDatum(publish_delete); |
201 | 0 | values[Anum_pg_publication_pubtruncate - 1] = |
202 | 0 | BoolGetDatum(publish_truncate); |
203 | |
|
204 | 0 | tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); |
205 | | |
206 | | /* Insert tuple into catalog. */ |
207 | 0 | puboid = CatalogTupleInsert(rel, tup); |
208 | 0 | heap_freetuple(tup); |
209 | |
|
210 | 0 | recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId()); |
211 | |
|
212 | 0 | ObjectAddressSet(myself, PublicationRelationId, puboid); |
213 | | |
214 | | /* Make the changes visible. */ |
215 | 0 | CommandCounterIncrement(); |
216 | |
|
217 | 0 | if (stmt->tables) |
218 | 0 | { |
219 | 0 | List *rels; |
220 | |
|
221 | 0 | Assert(list_length(stmt->tables) > 0); |
222 | |
|
223 | 0 | rels = OpenTableList(stmt->tables); |
224 | 0 | PublicationAddTables(puboid, rels, true, NULL); |
225 | 0 | CloseTableList(rels); |
226 | 0 | } |
227 | |
|
228 | 0 | heap_close(rel, RowExclusiveLock); |
229 | |
|
230 | 0 | InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); |
231 | |
|
232 | 0 | return myself; |
233 | 0 | } |
234 | | |
235 | | /* |
236 | | * Change options of a publication. |
237 | | */ |
238 | | static void |
239 | | AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, |
240 | | HeapTuple tup) |
241 | 0 | { |
242 | 0 | bool nulls[Natts_pg_publication]; |
243 | 0 | bool replaces[Natts_pg_publication]; |
244 | 0 | Datum values[Natts_pg_publication]; |
245 | 0 | bool publish_given; |
246 | 0 | bool publish_insert; |
247 | 0 | bool publish_update; |
248 | 0 | bool publish_delete; |
249 | 0 | bool publish_truncate; |
250 | 0 | ObjectAddress obj; |
251 | |
|
252 | 0 | parse_publication_options(stmt->options, |
253 | 0 | &publish_given, &publish_insert, |
254 | 0 | &publish_update, &publish_delete, |
255 | 0 | &publish_truncate); |
256 | | |
257 | | /* Everything ok, form a new tuple. */ |
258 | 0 | memset(values, 0, sizeof(values)); |
259 | 0 | memset(nulls, false, sizeof(nulls)); |
260 | 0 | memset(replaces, false, sizeof(replaces)); |
261 | |
|
262 | 0 | if (publish_given) |
263 | 0 | { |
264 | 0 | values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert); |
265 | 0 | replaces[Anum_pg_publication_pubinsert - 1] = true; |
266 | |
|
267 | 0 | values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update); |
268 | 0 | replaces[Anum_pg_publication_pubupdate - 1] = true; |
269 | |
|
270 | 0 | values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); |
271 | 0 | replaces[Anum_pg_publication_pubdelete - 1] = true; |
272 | |
|
273 | 0 | values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate); |
274 | 0 | replaces[Anum_pg_publication_pubtruncate - 1] = true; |
275 | 0 | } |
276 | |
|
277 | 0 | tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, |
278 | 0 | replaces); |
279 | | |
280 | | /* Update the catalog. */ |
281 | 0 | CatalogTupleUpdate(rel, &tup->t_self, tup); |
282 | |
|
283 | 0 | CommandCounterIncrement(); |
284 | | |
285 | | /* Invalidate the relcache. */ |
286 | 0 | if (((Form_pg_publication) GETSTRUCT(tup))->puballtables) |
287 | 0 | { |
288 | 0 | CacheInvalidateRelcacheAll(); |
289 | 0 | } |
290 | 0 | else |
291 | 0 | { |
292 | 0 | List *relids = GetPublicationRelations(HeapTupleGetOid(tup)); |
293 | | |
294 | | /* |
295 | | * We don't want to send too many individual messages, at some point |
296 | | * it's cheaper to just reset whole relcache. |
297 | | */ |
298 | 0 | if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS) |
299 | 0 | { |
300 | 0 | ListCell *lc; |
301 | |
|
302 | 0 | foreach(lc, relids) |
303 | 0 | { |
304 | 0 | Oid relid = lfirst_oid(lc); |
305 | |
|
306 | 0 | CacheInvalidateRelcacheByRelid(relid); |
307 | 0 | } |
308 | 0 | } |
309 | 0 | else |
310 | 0 | CacheInvalidateRelcacheAll(); |
311 | 0 | } |
312 | |
|
313 | 0 | ObjectAddressSet(obj, PublicationRelationId, HeapTupleGetOid(tup)); |
314 | 0 | EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, |
315 | 0 | (Node *) stmt); |
316 | |
|
317 | 0 | InvokeObjectPostAlterHook(PublicationRelationId, HeapTupleGetOid(tup), 0); |
318 | 0 | } |
319 | | |
320 | | /* |
321 | | * Add or remove table to/from publication. |
322 | | */ |
323 | | static void |
324 | | AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, |
325 | | HeapTuple tup) |
326 | 0 | { |
327 | 0 | Oid pubid = HeapTupleGetOid(tup); |
328 | 0 | List *rels = NIL; |
329 | 0 | Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); |
330 | | |
331 | | /* Check that user is allowed to manipulate the publication tables. */ |
332 | 0 | if (pubform->puballtables) |
333 | 0 | ereport(ERROR, |
334 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
335 | 0 | errmsg("publication \"%s\" is defined as FOR ALL TABLES", |
336 | 0 | NameStr(pubform->pubname)), |
337 | 0 | errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); |
338 | |
|
339 | 0 | Assert(list_length(stmt->tables) > 0); |
340 | |
|
341 | 0 | rels = OpenTableList(stmt->tables); |
342 | |
|
343 | 0 | if (stmt->tableAction == DEFELEM_ADD) |
344 | 0 | PublicationAddTables(pubid, rels, false, stmt); |
345 | 0 | else if (stmt->tableAction == DEFELEM_DROP) |
346 | 0 | PublicationDropTables(pubid, rels, false); |
347 | 0 | else /* DEFELEM_SET */ |
348 | 0 | { |
349 | 0 | List *oldrelids = GetPublicationRelations(pubid); |
350 | 0 | List *delrels = NIL; |
351 | 0 | ListCell *oldlc; |
352 | | |
353 | | /* Calculate which relations to drop. */ |
354 | 0 | foreach(oldlc, oldrelids) |
355 | 0 | { |
356 | 0 | Oid oldrelid = lfirst_oid(oldlc); |
357 | 0 | ListCell *newlc; |
358 | 0 | bool found = false; |
359 | |
|
360 | 0 | foreach(newlc, rels) |
361 | 0 | { |
362 | 0 | Relation newrel = (Relation) lfirst(newlc); |
363 | |
|
364 | 0 | if (RelationGetRelid(newrel) == oldrelid) |
365 | 0 | { |
366 | 0 | found = true; |
367 | 0 | break; |
368 | 0 | } |
369 | 0 | } |
370 | |
|
371 | 0 | if (!found) |
372 | 0 | { |
373 | 0 | Relation oldrel = heap_open(oldrelid, |
374 | 0 | ShareUpdateExclusiveLock); |
375 | |
|
376 | 0 | delrels = lappend(delrels, oldrel); |
377 | 0 | } |
378 | 0 | } |
379 | | |
380 | | /* And drop them. */ |
381 | 0 | PublicationDropTables(pubid, delrels, true); |
382 | | |
383 | | /* |
384 | | * Don't bother calculating the difference for adding, we'll catch and |
385 | | * skip existing ones when doing catalog update. |
386 | | */ |
387 | 0 | PublicationAddTables(pubid, rels, true, stmt); |
388 | |
|
389 | 0 | CloseTableList(delrels); |
390 | 0 | } |
391 | |
|
392 | 0 | CloseTableList(rels); |
393 | 0 | } |
394 | | |
395 | | /* |
396 | | * Alter the existing publication. |
397 | | * |
398 | | * This is dispatcher function for AlterPublicationOptions and |
399 | | * AlterPublicationTables. |
400 | | */ |
401 | | void |
402 | | AlterPublication(AlterPublicationStmt *stmt) |
403 | 0 | { |
404 | 0 | Relation rel; |
405 | 0 | HeapTuple tup; |
406 | |
|
407 | 0 | rel = heap_open(PublicationRelationId, RowExclusiveLock); |
408 | |
|
409 | 0 | tup = SearchSysCacheCopy1(PUBLICATIONNAME, |
410 | 0 | CStringGetDatum(stmt->pubname)); |
411 | |
|
412 | 0 | if (!HeapTupleIsValid(tup)) |
413 | 0 | ereport(ERROR, |
414 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
415 | 0 | errmsg("publication \"%s\" does not exist", |
416 | 0 | stmt->pubname))); |
417 | | |
418 | | /* must be owner */ |
419 | 0 | if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId())) |
420 | 0 | aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION, |
421 | 0 | stmt->pubname); |
422 | |
|
423 | 0 | if (stmt->options) |
424 | 0 | AlterPublicationOptions(stmt, rel, tup); |
425 | 0 | else |
426 | 0 | AlterPublicationTables(stmt, rel, tup); |
427 | | |
428 | | /* Cleanup. */ |
429 | 0 | heap_freetuple(tup); |
430 | 0 | heap_close(rel, RowExclusiveLock); |
431 | 0 | } |
432 | | |
433 | | /* |
434 | | * Drop publication by OID |
435 | | */ |
436 | | void |
437 | | RemovePublicationById(Oid pubid) |
438 | 0 | { |
439 | 0 | Relation rel; |
440 | 0 | HeapTuple tup; |
441 | |
|
442 | 0 | rel = heap_open(PublicationRelationId, RowExclusiveLock); |
443 | |
|
444 | 0 | tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); |
445 | |
|
446 | 0 | if (!HeapTupleIsValid(tup)) |
447 | 0 | elog(ERROR, "cache lookup failed for publication %u", pubid); |
448 | |
|
449 | 0 | CatalogTupleDelete(rel, tup); |
450 | |
|
451 | 0 | ReleaseSysCache(tup); |
452 | |
|
453 | 0 | heap_close(rel, RowExclusiveLock); |
454 | 0 | } |
455 | | |
456 | | /* |
457 | | * Remove relation from publication by mapping OID. |
458 | | */ |
459 | | void |
460 | | RemovePublicationRelById(Oid proid) |
461 | 0 | { |
462 | 0 | Relation rel; |
463 | 0 | HeapTuple tup; |
464 | 0 | Form_pg_publication_rel pubrel; |
465 | |
|
466 | 0 | rel = heap_open(PublicationRelRelationId, RowExclusiveLock); |
467 | |
|
468 | 0 | tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid)); |
469 | |
|
470 | 0 | if (!HeapTupleIsValid(tup)) |
471 | 0 | elog(ERROR, "cache lookup failed for publication table %u", |
472 | 0 | proid); |
473 | |
|
474 | 0 | pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); |
475 | | |
476 | | /* Invalidate relcache so that publication info is rebuilt. */ |
477 | 0 | CacheInvalidateRelcacheByRelid(pubrel->prrelid); |
478 | |
|
479 | 0 | CatalogTupleDelete(rel, tup); |
480 | |
|
481 | 0 | ReleaseSysCache(tup); |
482 | |
|
483 | 0 | heap_close(rel, RowExclusiveLock); |
484 | 0 | } |
485 | | |
486 | | /* |
487 | | * Open relations specified by a RangeVar list. |
488 | | * The returned tables are locked in ShareUpdateExclusiveLock mode. |
489 | | */ |
490 | | static List * |
491 | | OpenTableList(List *tables) |
492 | 0 | { |
493 | 0 | List *relids = NIL; |
494 | 0 | List *rels = NIL; |
495 | 0 | ListCell *lc; |
496 | | |
497 | | /* |
498 | | * Open, share-lock, and check all the explicitly-specified relations |
499 | | */ |
500 | 0 | foreach(lc, tables) |
501 | 0 | { |
502 | 0 | RangeVar *rv = castNode(RangeVar, lfirst(lc)); |
503 | 0 | bool recurse = rv->inh; |
504 | 0 | Relation rel; |
505 | 0 | Oid myrelid; |
506 | | |
507 | | /* Allow query cancel in case this takes a long time */ |
508 | 0 | CHECK_FOR_INTERRUPTS(); |
509 | |
|
510 | 0 | rel = heap_openrv(rv, ShareUpdateExclusiveLock); |
511 | 0 | myrelid = RelationGetRelid(rel); |
512 | | |
513 | | /* |
514 | | * Filter out duplicates if user specifies "foo, foo". |
515 | | * |
516 | | * Note that this algorithm is known to not be very efficient (O(N^2)) |
517 | | * but given that it only works on list of tables given to us by user |
518 | | * it's deemed acceptable. |
519 | | */ |
520 | 0 | if (list_member_oid(relids, myrelid)) |
521 | 0 | { |
522 | 0 | heap_close(rel, ShareUpdateExclusiveLock); |
523 | 0 | continue; |
524 | 0 | } |
525 | | |
526 | 0 | rels = lappend(rels, rel); |
527 | 0 | relids = lappend_oid(relids, myrelid); |
528 | | |
529 | | /* Add children of this rel, if requested */ |
530 | 0 | if (recurse) |
531 | 0 | { |
532 | 0 | List *children; |
533 | 0 | ListCell *child; |
534 | |
|
535 | 0 | children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock, |
536 | 0 | NULL); |
537 | |
|
538 | 0 | foreach(child, children) |
539 | 0 | { |
540 | 0 | Oid childrelid = lfirst_oid(child); |
541 | | |
542 | | /* Allow query cancel in case this takes a long time */ |
543 | 0 | CHECK_FOR_INTERRUPTS(); |
544 | | |
545 | | /* |
546 | | * Skip duplicates if user specified both parent and child |
547 | | * tables. |
548 | | */ |
549 | 0 | if (list_member_oid(relids, childrelid)) |
550 | 0 | continue; |
551 | | |
552 | | /* find_all_inheritors already got lock */ |
553 | 0 | rel = heap_open(childrelid, NoLock); |
554 | 0 | rels = lappend(rels, rel); |
555 | 0 | relids = lappend_oid(relids, childrelid); |
556 | 0 | } |
557 | 0 | } |
558 | 0 | } |
559 | |
|
560 | 0 | list_free(relids); |
561 | |
|
562 | 0 | return rels; |
563 | 0 | } |
564 | | |
565 | | /* |
566 | | * Close all relations in the list. |
567 | | */ |
568 | | static void |
569 | | CloseTableList(List *rels) |
570 | 0 | { |
571 | 0 | ListCell *lc; |
572 | |
|
573 | 0 | foreach(lc, rels) |
574 | 0 | { |
575 | 0 | Relation rel = (Relation) lfirst(lc); |
576 | |
|
577 | 0 | heap_close(rel, NoLock); |
578 | 0 | } |
579 | 0 | } |
580 | | |
581 | | /* |
582 | | * Add listed tables to the publication. |
583 | | */ |
584 | | static void |
585 | | PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, |
586 | | AlterPublicationStmt *stmt) |
587 | 0 | { |
588 | 0 | ListCell *lc; |
589 | |
|
590 | 0 | Assert(!stmt || !stmt->for_all_tables); |
591 | |
|
592 | 0 | foreach(lc, rels) |
593 | 0 | { |
594 | 0 | Relation rel = (Relation) lfirst(lc); |
595 | 0 | ObjectAddress obj; |
596 | | |
597 | | /* Must be owner of the table or superuser. */ |
598 | 0 | if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) |
599 | 0 | aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), |
600 | 0 | RelationGetRelationName(rel)); |
601 | |
|
602 | 0 | obj = publication_add_relation(pubid, rel, if_not_exists); |
603 | 0 | if (stmt) |
604 | 0 | { |
605 | 0 | EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, |
606 | 0 | (Node *) stmt); |
607 | |
|
608 | 0 | InvokeObjectPostCreateHook(PublicationRelRelationId, |
609 | 0 | obj.objectId, 0); |
610 | 0 | } |
611 | 0 | } |
612 | 0 | } |
613 | | |
614 | | /* |
615 | | * Remove listed tables from the publication. |
616 | | */ |
617 | | static void |
618 | | PublicationDropTables(Oid pubid, List *rels, bool missing_ok) |
619 | 0 | { |
620 | 0 | ObjectAddress obj; |
621 | 0 | ListCell *lc; |
622 | 0 | Oid prid; |
623 | |
|
624 | 0 | foreach(lc, rels) |
625 | 0 | { |
626 | 0 | Relation rel = (Relation) lfirst(lc); |
627 | 0 | Oid relid = RelationGetRelid(rel); |
628 | |
|
629 | 0 | prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), |
630 | 0 | ObjectIdGetDatum(pubid)); |
631 | 0 | if (!OidIsValid(prid)) |
632 | 0 | { |
633 | 0 | if (missing_ok) |
634 | 0 | continue; |
635 | | |
636 | 0 | ereport(ERROR, |
637 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
638 | 0 | errmsg("relation \"%s\" is not part of the publication", |
639 | 0 | RelationGetRelationName(rel)))); |
640 | 0 | } |
641 | |
|
642 | 0 | ObjectAddressSet(obj, PublicationRelRelationId, prid); |
643 | 0 | performDeletion(&obj, DROP_CASCADE, 0); |
644 | 0 | } |
645 | 0 | } |
646 | | |
647 | | /* |
648 | | * Internal workhorse for changing a publication owner |
649 | | */ |
650 | | static void |
651 | | AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) |
652 | 0 | { |
653 | 0 | Form_pg_publication form; |
654 | |
|
655 | 0 | form = (Form_pg_publication) GETSTRUCT(tup); |
656 | |
|
657 | 0 | if (form->pubowner == newOwnerId) |
658 | 0 | return; |
659 | | |
660 | 0 | if (!superuser()) |
661 | 0 | { |
662 | 0 | AclResult aclresult; |
663 | | |
664 | | /* Must be owner */ |
665 | 0 | if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId())) |
666 | 0 | aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION, |
667 | 0 | NameStr(form->pubname)); |
668 | | |
669 | | /* Must be able to become new owner */ |
670 | 0 | check_is_member_of_role(GetUserId(), newOwnerId); |
671 | | |
672 | | /* New owner must have CREATE privilege on database */ |
673 | 0 | aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE); |
674 | 0 | if (aclresult != ACLCHECK_OK) |
675 | 0 | aclcheck_error(aclresult, OBJECT_DATABASE, |
676 | 0 | get_database_name(MyDatabaseId)); |
677 | |
|
678 | 0 | if (form->puballtables && !superuser_arg(newOwnerId)) |
679 | 0 | ereport(ERROR, |
680 | 0 | (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
681 | 0 | errmsg("permission denied to change owner of publication \"%s\"", |
682 | 0 | NameStr(form->pubname)), |
683 | 0 | errhint("The owner of a FOR ALL TABLES publication must be a superuser."))); |
684 | 0 | } |
685 | |
|
686 | 0 | form->pubowner = newOwnerId; |
687 | 0 | CatalogTupleUpdate(rel, &tup->t_self, tup); |
688 | | |
689 | | /* Update owner dependency reference */ |
690 | 0 | changeDependencyOnOwner(PublicationRelationId, |
691 | 0 | HeapTupleGetOid(tup), |
692 | 0 | newOwnerId); |
693 | |
|
694 | 0 | InvokeObjectPostAlterHook(PublicationRelationId, |
695 | 0 | HeapTupleGetOid(tup), 0); |
696 | 0 | } |
697 | | |
698 | | /* |
699 | | * Change publication owner -- by name |
700 | | */ |
701 | | ObjectAddress |
702 | | AlterPublicationOwner(const char *name, Oid newOwnerId) |
703 | 0 | { |
704 | 0 | Oid subid; |
705 | 0 | HeapTuple tup; |
706 | 0 | Relation rel; |
707 | 0 | ObjectAddress address; |
708 | |
|
709 | 0 | rel = heap_open(PublicationRelationId, RowExclusiveLock); |
710 | |
|
711 | 0 | tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name)); |
712 | |
|
713 | 0 | if (!HeapTupleIsValid(tup)) |
714 | 0 | ereport(ERROR, |
715 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
716 | 0 | errmsg("publication \"%s\" does not exist", name))); |
717 | |
|
718 | 0 | subid = HeapTupleGetOid(tup); |
719 | |
|
720 | 0 | AlterPublicationOwner_internal(rel, tup, newOwnerId); |
721 | |
|
722 | 0 | ObjectAddressSet(address, PublicationRelationId, subid); |
723 | |
|
724 | 0 | heap_freetuple(tup); |
725 | |
|
726 | 0 | heap_close(rel, RowExclusiveLock); |
727 | |
|
728 | 0 | return address; |
729 | 0 | } |
730 | | |
731 | | /* |
732 | | * Change publication owner -- by OID |
733 | | */ |
734 | | void |
735 | | AlterPublicationOwner_oid(Oid subid, Oid newOwnerId) |
736 | 0 | { |
737 | 0 | HeapTuple tup; |
738 | 0 | Relation rel; |
739 | |
|
740 | 0 | rel = heap_open(PublicationRelationId, RowExclusiveLock); |
741 | |
|
742 | 0 | tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid)); |
743 | |
|
744 | 0 | if (!HeapTupleIsValid(tup)) |
745 | 0 | ereport(ERROR, |
746 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
747 | 0 | errmsg("publication with OID %u does not exist", subid))); |
748 | |
|
749 | 0 | AlterPublicationOwner_internal(rel, tup, newOwnerId); |
750 | |
|
751 | 0 | heap_freetuple(tup); |
752 | |
|
753 | 0 | heap_close(rel, RowExclusiveLock); |
754 | 0 | } |