YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/master/catalog_manager.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#ifndef ENT_SRC_YB_MASTER_CATALOG_MANAGER_H
14
#define ENT_SRC_YB_MASTER_CATALOG_MANAGER_H
15
16
#include "../../../../src/yb/master/catalog_manager.h"
17
#include "yb/master/master_snapshot_coordinator.h"
18
#include "yb/master/snapshot_coordinator_context.h"
19
20
namespace yb {
21
22
class UniverseKeyRegistryPB;
23
24
namespace master {
25
namespace enterprise {
26
27
YB_DEFINE_ENUM(CreateObjects, (kOnlyTables)(kOnlyIndexes));
28
29
class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorContext {
30
  typedef yb::master::CatalogManager super;
31
 public:
32
  explicit CatalogManager(yb::master::Master* master)
33
8.07k
      : super(master), snapshot_coordinator_(this) {}
34
35
  virtual ~CatalogManager();
36
  void CompleteShutdown();
37
38
  CHECKED_STATUS RunLoaders(int64_t term) override REQUIRES(mutex_);
39
40
  // API to start a snapshot creation.
41
  CHECKED_STATUS CreateSnapshot(const CreateSnapshotRequestPB* req,
42
                                CreateSnapshotResponsePB* resp,
43
                                rpc::RpcContext* rpc);
44
45
  // API to list all available snapshots.
46
  CHECKED_STATUS ListSnapshots(const ListSnapshotsRequestPB* req,
47
                               ListSnapshotsResponsePB* resp);
48
49
  CHECKED_STATUS ListSnapshotRestorations(const ListSnapshotRestorationsRequestPB* req,
50
                                          ListSnapshotRestorationsResponsePB* resp);
51
52
  // API to restore a snapshot.
53
  CHECKED_STATUS RestoreSnapshot(const RestoreSnapshotRequestPB* req,
54
                                 RestoreSnapshotResponsePB* resp);
55
56
  // API to delete a snapshot.
57
  CHECKED_STATUS DeleteSnapshot(const DeleteSnapshotRequestPB* req,
58
                                DeleteSnapshotResponsePB* resp,
59
                                rpc::RpcContext* rpc);
60
61
  CHECKED_STATUS ImportSnapshotMeta(const ImportSnapshotMetaRequestPB* req,
62
                                    ImportSnapshotMetaResponsePB* resp,
63
                                    rpc::RpcContext* rpc);
64
65
  CHECKED_STATUS CreateSnapshotSchedule(const CreateSnapshotScheduleRequestPB* req,
66
                                        CreateSnapshotScheduleResponsePB* resp,
67
                                        rpc::RpcContext* rpc);
68
69
  CHECKED_STATUS ListSnapshotSchedules(const ListSnapshotSchedulesRequestPB* req,
70
                                       ListSnapshotSchedulesResponsePB* resp,
71
                                       rpc::RpcContext* rpc);
72
73
  CHECKED_STATUS DeleteSnapshotSchedule(const DeleteSnapshotScheduleRequestPB* req,
74
                                        DeleteSnapshotScheduleResponsePB* resp,
75
                                        rpc::RpcContext* rpc);
76
77
  CHECKED_STATUS ChangeEncryptionInfo(const ChangeEncryptionInfoRequestPB* req,
78
                                      ChangeEncryptionInfoResponsePB* resp) override;
79
80
  CHECKED_STATUS UpdateXClusterConsumerOnTabletSplit(
81
      const TableId& consumer_table_id, const SplitTabletIds& split_tablet_ids) override;
82
83
  CHECKED_STATUS UpdateXClusterProducerOnTabletSplit(
84
      const TableId& producer_table_id, const SplitTabletIds& split_tablet_ids) override;
85
86
  CHECKED_STATUS InitCDCConsumer(const std::vector<CDCConsumerStreamInfo>& consumer_info,
87
                                 const std::string& master_addrs,
88
                                 const std::string& producer_universe_uuid,
89
                                 std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks);
90
91
  void HandleCreateTabletSnapshotResponse(TabletInfo *tablet, bool error) override;
92
93
  void HandleRestoreTabletSnapshotResponse(TabletInfo *tablet, bool error) override;
94
95
  void HandleDeleteTabletSnapshotResponse(
96
      const SnapshotId& snapshot_id, TabletInfo *tablet, bool error) override;
97
98
  void DumpState(std::ostream* out, bool on_disk_dump = false) const override;
99
100
  // Fills the heartbeat response with the decrypted universe key registry.
101
  CHECKED_STATUS FillHeartbeatResponse(const TSHeartbeatRequestPB* req,
102
                                       TSHeartbeatResponsePB* resp) override;
103
104
  // Is encryption at rest enabled for this cluster.
105
  CHECKED_STATUS IsEncryptionEnabled(const IsEncryptionEnabledRequestPB* req,
106
                                     IsEncryptionEnabledResponsePB* resp);
107
108
  // Create a new CDC stream with the specified attributes.
109
  CHECKED_STATUS CreateCDCStream(const CreateCDCStreamRequestPB* req,
110
                                 CreateCDCStreamResponsePB* resp,
111
                                 rpc::RpcContext* rpc);
112
113
  // Delete the specified CDCStream.
114
      CHECKED_STATUS DeleteCDCStream(const DeleteCDCStreamRequestPB* req,
115
                                 DeleteCDCStreamResponsePB* resp,
116
                                 rpc::RpcContext* rpc);
117
118
  // List CDC streams (optionally, for a given table).
119
  CHECKED_STATUS ListCDCStreams(const ListCDCStreamsRequestPB* req,
120
                                ListCDCStreamsResponsePB* resp) override;
121
122
  // Fetch CDC stream info corresponding to a db stream id
123
  CHECKED_STATUS GetCDCDBStreamInfo(const GetCDCDBStreamInfoRequestPB* req,
124
                                    GetCDCDBStreamInfoResponsePB* resp) override;
125
126
  // Get CDC stream.
127
  CHECKED_STATUS GetCDCStream(const GetCDCStreamRequestPB* req,
128
                              GetCDCStreamResponsePB* resp,
129
                              rpc::RpcContext* rpc);
130
131
  // Update a CDC stream.
132
  CHECKED_STATUS UpdateCDCStream(const UpdateCDCStreamRequestPB* req,
133
                                 UpdateCDCStreamResponsePB* resp,
134
                                 rpc::RpcContext* rpc);
135
136
  // Delete CDC streams for a table.
137
  CHECKED_STATUS DeleteCDCStreamsForTable(const TableId& table_id) override;
138
  CHECKED_STATUS DeleteCDCStreamsForTables(const vector<TableId>& table_ids) override;
139
140
  // Setup Universe Replication to consume data from another YB universe.
141
  CHECKED_STATUS SetupUniverseReplication(const SetupUniverseReplicationRequestPB* req,
142
                                          SetupUniverseReplicationResponsePB* resp,
143
                                          rpc::RpcContext* rpc);
144
145
  // Delete Universe Replication.
146
  CHECKED_STATUS DeleteUniverseReplication(const DeleteUniverseReplicationRequestPB* req,
147
                                           DeleteUniverseReplicationResponsePB* resp,
148
                                           rpc::RpcContext* rpc);
149
150
  // Alter Universe Replication.
151
  CHECKED_STATUS AlterUniverseReplication(const AlterUniverseReplicationRequestPB* req,
152
                                          AlterUniverseReplicationResponsePB* resp,
153
                                          rpc::RpcContext* rpc);
154
155
  // Rename an existing Universe Replication.
156
  CHECKED_STATUS RenameUniverseReplication(scoped_refptr<UniverseReplicationInfo> universe,
157
                                           const AlterUniverseReplicationRequestPB* req,
158
                                           AlterUniverseReplicationResponsePB* resp,
159
                                           rpc::RpcContext* rpc);
160
161
  // Enable/Disable an Existing Universe Replication.
162
  CHECKED_STATUS SetUniverseReplicationEnabled(const SetUniverseReplicationEnabledRequestPB* req,
163
                                               SetUniverseReplicationEnabledResponsePB* resp,
164
                                               rpc::RpcContext* rpc);
165
166
  // Get Universe Replication.
167
  CHECKED_STATUS GetUniverseReplication(const GetUniverseReplicationRequestPB* req,
168
                                        GetUniverseReplicationResponsePB* resp,
169
                                        rpc::RpcContext* rpc);
170
171
  // Checks if the universe is in an active state or has failed during setup.
172
  CHECKED_STATUS IsSetupUniverseReplicationDone(const IsSetupUniverseReplicationDoneRequestPB* req,
173
                                                IsSetupUniverseReplicationDoneResponsePB* resp,
174
                                                rpc::RpcContext* rpc);
175
176
  // On a producer side split, creates new pollers on the consumer for the new tablet children.
177
  CHECKED_STATUS UpdateConsumerOnProducerSplit(const UpdateConsumerOnProducerSplitRequestPB* req,
178
                                               UpdateConsumerOnProducerSplitResponsePB* resp,
179
                                               rpc::RpcContext* rpc);
180
181
  // Find all the CDC streams that have been marked as DELETED.
182
  CHECKED_STATUS FindCDCStreamsMarkedAsDeleting(std::vector<scoped_refptr<CDCStreamInfo>>* streams);
183
184
  // Delete specified CDC streams.
185
  CHECKED_STATUS CleanUpDeletedCDCStreams(const std::vector<scoped_refptr<CDCStreamInfo>>& streams);
186
187
  bool IsCdcEnabled(const TableInfo& table_info) const override;
188
189
7.94k
  tablet::SnapshotCoordinator& snapshot_coordinator() override {
190
7.94k
    return snapshot_coordinator_;
191
7.94k
  }
192
193
  Result<size_t> GetNumLiveTServersForActiveCluster() override;
194
195
 private:
196
  friend class SnapshotLoader;
197
  friend class yb::master::ClusterLoadBalancer;
198
  friend class CDCStreamLoader;
199
  friend class UniverseReplicationLoader;
200
201
  CHECKED_STATUS RestoreEntry(const SysRowEntry& entry, const SnapshotId& snapshot_id)
202
      REQUIRES(mutex_);
203
204
  // Per table structure for external cluster snapshot importing to this cluster.
205
  // Old IDs mean IDs on external cluster, new IDs - IDs on this cluster.
206
  struct ExternalTableSnapshotData {
207
0
    bool is_index() const {
208
0
      return !table_entry_pb.indexed_table_id().empty();
209
0
    }
210
211
    NamespaceId old_namespace_id;
212
    TableId old_table_id;
213
    TableId new_table_id;
214
    SysTablesEntryPB table_entry_pb;
215
    std::string pg_schema_name;
216
    size_t num_tablets = 0;
217
    typedef std::pair<std::string, std::string> PartitionKeys;
218
    typedef std::map<PartitionKeys, TabletId> PartitionToIdMap;
219
    typedef std::vector<PartitionPB> Partitions;
220
    Partitions partitions;
221
    PartitionToIdMap new_tablets_map;
222
    // Mapping: Old tablet ID -> New tablet ID.
223
    google::protobuf::RepeatedPtrField<IdPairPB>* tablet_id_map = nullptr;
224
225
    ImportSnapshotMetaResponsePB_TableMetaPB* table_meta = nullptr;
226
  };
227
228
  // Map: old_namespace_id (key) -> new_namespace_id (value) + db_type.
229
  typedef std::pair<NamespaceId, YQLDatabase> NamespaceData;
230
  typedef std::map<NamespaceId, NamespaceData> NamespaceMap;
231
  typedef std::map<TableId, ExternalTableSnapshotData> ExternalTableSnapshotDataMap;
232
233
  CHECKED_STATUS ImportSnapshotPreprocess(const SnapshotInfoPB& snapshot_pb,
234
                                          ImportSnapshotMetaResponsePB* resp,
235
                                          NamespaceMap* namespace_map,
236
                                          ExternalTableSnapshotDataMap* tables_data);
237
  CHECKED_STATUS ImportSnapshotCreateObject(const SnapshotInfoPB& snapshot_pb,
238
                                            ImportSnapshotMetaResponsePB* resp,
239
                                            NamespaceMap* namespace_map,
240
                                            ExternalTableSnapshotDataMap* tables_data,
241
                                            CreateObjects create_objects);
242
  CHECKED_STATUS ImportSnapshotWaitForTables(const SnapshotInfoPB& snapshot_pb,
243
                                             ImportSnapshotMetaResponsePB* resp,
244
                                             ExternalTableSnapshotDataMap* tables_data,
245
                                             CoarseTimePoint deadline);
246
  CHECKED_STATUS ImportSnapshotProcessTablets(const SnapshotInfoPB& snapshot_pb,
247
                                              ImportSnapshotMetaResponsePB* resp,
248
                                              ExternalTableSnapshotDataMap* tables_data);
249
  void DeleteNewSnapshotObjects(const NamespaceMap& namespace_map,
250
                                const ExternalTableSnapshotDataMap& tables_data);
251
252
  // Helper function for ImportTableEntry.
253
  Result<bool> CheckTableForImport(
254
      scoped_refptr<TableInfo> table,
255
      ExternalTableSnapshotData* snapshot_data) REQUIRES_SHARED(mutex_);
256
257
  CHECKED_STATUS ImportNamespaceEntry(const SysRowEntry& entry,
258
                                      NamespaceMap* namespace_map);
259
  CHECKED_STATUS RecreateTable(const NamespaceId& new_namespace_id,
260
                               const ExternalTableSnapshotDataMap& table_map,
261
                               ExternalTableSnapshotData* table_data);
262
  CHECKED_STATUS RepartitionTable(scoped_refptr<TableInfo> table,
263
                                  const ExternalTableSnapshotData* table_data);
264
  CHECKED_STATUS ImportTableEntry(const NamespaceMap& namespace_map,
265
                                  const ExternalTableSnapshotDataMap& table_map,
266
                                  ExternalTableSnapshotData* s_data);
267
  CHECKED_STATUS PreprocessTabletEntry(const SysRowEntry& entry,
268
                                       ExternalTableSnapshotDataMap* table_map);
269
  CHECKED_STATUS ImportTabletEntry(const SysRowEntry& entry,
270
                                   ExternalTableSnapshotDataMap* table_map);
271
272
  TabletInfos GetTabletInfos(const std::vector<TabletId>& ids) override;
273
274
  Result<SysRowEntries> CollectEntries(
275
      const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables,
276
      CollectFlags flags);
277
278
  Result<SysRowEntries> CollectEntriesForSnapshot(
279
      const google::protobuf::RepeatedPtrField<TableIdentifierPB>& tables) override;
280
281
  server::Clock* Clock() override;
282
283
  const Schema& schema() override;
284
285
  void Submit(std::unique_ptr<tablet::Operation> operation, int64_t leader_term) override;
286
287
  AsyncTabletSnapshotOpPtr CreateAsyncTabletSnapshotOp(
288
      const TabletInfoPtr& tablet, const std::string& snapshot_id,
289
      tserver::TabletSnapshotOpRequestPB::Operation operation,
290
      TabletSnapshotOperationCallback callback) override;
291
292
  void ScheduleTabletSnapshotOp(const AsyncTabletSnapshotOpPtr& operation) override;
293
294
  CHECKED_STATUS RestoreSysCatalog(
295
      SnapshotScheduleRestoration* restoration, tablet::Tablet* tablet,
296
      Status* complete_status) override;
297
298
  CHECKED_STATUS VerifyRestoredObjects(const SnapshotScheduleRestoration& restoration) override;
299
300
  void CleanupHiddenObjects(const ScheduleMinRestoreTime& schedule_min_restore_time) override;
301
  void CleanupHiddenTablets(
302
      const std::vector<TabletInfoPtr>& hidden_tablets,
303
      const ScheduleMinRestoreTime& schedule_min_restore_time);
304
  // Will filter tables content, so pass it by value here.
305
  void CleanupHiddenTables(std::vector<TableInfoPtr> tables);
306
307
  rpc::Scheduler& Scheduler() override;
308
309
  int64_t LeaderTerm() override;
310
311
  Result<bool> IsTablePartOfSomeSnapshotSchedule(const TableInfo& table_info) override;
312
313
  Result<SnapshotSchedulesToObjectIdsMap> MakeSnapshotSchedulesToObjectIdsMap(
314
      SysRowEntryType type) override;
315
316
  static void SetTabletSnapshotsState(SysSnapshotEntryPB::State state,
317
                                      SysSnapshotEntryPB* snapshot_pb);
318
319
  // Create the cdc_state table if needed (i.e. if it does not exist already).
320
  //
321
  // This is called at the end of CreateCDCStream.
322
  CHECKED_STATUS CreateCdcStateTableIfNeeded(rpc::RpcContext *rpc);
323
324
  // Check if cdc_state table creation is done.
325
  CHECKED_STATUS IsCdcStateTableCreated(IsCreateTableDoneResponsePB* resp);
326
327
  // Return all CDC streams.
328
  void GetAllCDCStreams(std::vector<scoped_refptr<CDCStreamInfo>>* streams);
329
330
  // Mark specified CDC streams as DELETING so they can be removed later.
331
  CHECKED_STATUS MarkCDCStreamsAsDeleting(const std::vector<scoped_refptr<CDCStreamInfo>>& streams);
332
333
  // Find CDC streams for a table.
334
  std::vector<scoped_refptr<CDCStreamInfo>> FindCDCStreamsForTable(const TableId& table_id) const;
335
336
  bool CDCStreamExistsUnlocked(const CDCStreamId& stream_id) override REQUIRES_SHARED(mutex_);
337
338
  CHECKED_STATUS FillHeartbeatResponseEncryption(const SysClusterConfigEntryPB& cluster_config,
339
                                                 const TSHeartbeatRequestPB* req,
340
                                                 TSHeartbeatResponsePB* resp);
341
342
  CHECKED_STATUS FillHeartbeatResponseCDC(const SysClusterConfigEntryPB& cluster_config,
343
                                          const TSHeartbeatRequestPB* req,
344
                                          TSHeartbeatResponsePB* resp);
345
346
  // Helper functions for GetTableSchemaCallback, GetTablegroupSchemaCallback
347
  // and GetColocatedTabletSchemaCallback.
348
349
  // Validates a single table's schema with the corresponding table on the consumer side, and
350
  // updates consumer_table_id with the new table id. Return the consumer table schema if the
351
  // validation is successful.
352
  CHECKED_STATUS ValidateTableSchema(
353
      const std::shared_ptr<client::YBTableInfo>& info,
354
      const std::unordered_map<TableId, std::string>& table_bootstrap_ids,
355
      GetTableSchemaResponsePB* resp);
356
  // Adds a validated table to the sys catalog table map for the given universe, and if all tables
357
  // have been validated, creates a CDC stream for each table.
358
  CHECKED_STATUS AddValidatedTableAndCreateCdcStreams(
359
      scoped_refptr<UniverseReplicationInfo> universe,
360
      const std::unordered_map<TableId, std::string>& table_bootstrap_ids,
361
      const TableId& producer_table,
362
      const TableId& consumer_table);
363
364
  void GetTableSchemaCallback(
365
      const std::string& universe_id, const std::shared_ptr<client::YBTableInfo>& info,
366
      const std::unordered_map<TableId, std::string>& producer_bootstrap_ids, const Status& s);
367
  void GetTablegroupSchemaCallback(
368
      const std::string& universe_id, const std::shared_ptr<std::vector<client::YBTableInfo>>& info,
369
      const TablegroupId& producer_tablegroup_id,
370
      const std::unordered_map<TableId, std::string>& producer_bootstrap_ids, const Status& s);
371
  void GetColocatedTabletSchemaCallback(
372
      const std::string& universe_id, const std::shared_ptr<std::vector<client::YBTableInfo>>& info,
373
      const std::unordered_map<TableId, std::string>& producer_bootstrap_ids, const Status& s);
374
  void GetCDCStreamCallback(const CDCStreamId& bootstrap_id,
375
                            std::shared_ptr<TableId> table_id,
376
                            std::shared_ptr<std::unordered_map<std::string, std::string>> options,
377
                            const std::string& universe_id,
378
                            const TableId& table,
379
                            std::shared_ptr<CDCRpcTasks> cdc_rpc,
380
                            const Status& s);
381
  void AddCDCStreamToUniverseAndInitConsumer(const std::string& universe_id, const TableId& table,
382
                                             const Result<CDCStreamId>& stream_id,
383
                                             std::function<void()> on_success_cb = nullptr);
384
385
  void MergeUniverseReplication(scoped_refptr<UniverseReplicationInfo> info);
386
  CHECKED_STATUS DeleteUniverseReplicationUnlocked(scoped_refptr<UniverseReplicationInfo> info);
387
  void MarkUniverseReplicationFailed(scoped_refptr<UniverseReplicationInfo> universe,
388
                                     const Status& failure_status);
389
390
  // Checks if table has at least one cdc stream (includes producers for xCluster replication).
391
  bool IsTableCdcProducer(const TableInfo& table_info) const override REQUIRES_SHARED(mutex_);
392
393
  // Checks if the table is a consumer in an xCluster replication universe.
394
  bool IsTableCdcConsumer(const TableInfo& table_info) const REQUIRES_SHARED(mutex_);
395
396
  // Maps producer universe id to the corresponding cdc stream for that table.
397
  typedef std::unordered_map<std::string, CDCStreamId> XClusterConsumerTableStreamInfoMap;
398
399
  // Gets the set of CDC stream info for an xCluster consumer table.
400
  XClusterConsumerTableStreamInfoMap GetXClusterStreamInfoForConsumerTable(const TableId& table_id)
401
      const;
402
403
  CHECKED_STATUS CreateTransactionAwareSnapshot(
404
      const CreateSnapshotRequestPB& req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc);
405
406
  CHECKED_STATUS CreateNonTransactionAwareSnapshot(
407
      const CreateSnapshotRequestPB* req, CreateSnapshotResponsePB* resp, rpc::RpcContext* rpc);
408
409
  CHECKED_STATUS RestoreNonTransactionAwareSnapshot(const SnapshotId& snapshot_id);
410
411
  CHECKED_STATUS DeleteNonTransactionAwareSnapshot(const SnapshotId& snapshot_id);
412
413
  void Started() override;
414
415
  void SysCatalogLoaded(int64_t term) override;
416
417
  // Snapshot map: snapshot-id -> SnapshotInfo.
418
  typedef std::unordered_map<SnapshotId, scoped_refptr<SnapshotInfo>> SnapshotInfoMap;
419
  SnapshotInfoMap non_txn_snapshot_ids_map_;
420
  SnapshotId current_snapshot_id_;
421
422
  // mutex on should_send_universe_key_registry_mutex_.
423
  mutable simple_spinlock should_send_universe_key_registry_mutex_;
424
  // Should catalog manager resend latest universe key registry to tserver.
425
  std::unordered_map<TabletServerId, bool> should_send_universe_key_registry_
426
  GUARDED_BY(should_send_universe_key_registry_mutex_);
427
428
  // CDC Stream map: CDCStreamId -> CDCStreamInfo.
429
  typedef std::unordered_map<CDCStreamId, scoped_refptr<CDCStreamInfo>> CDCStreamInfoMap;
430
  CDCStreamInfoMap cdc_stream_map_ GUARDED_BY(mutex_);
431
432
  // Map of tables -> number of cdc streams they are producers for.
433
  std::unordered_map<TableId, int> cdc_stream_tables_count_map_ GUARDED_BY(mutex_);
434
435
  // Map of all consumer tables that are part of xcluster replication, to a map of the stream infos.
436
  std::unordered_map<TableId, XClusterConsumerTableStreamInfoMap>
437
      xcluster_consumer_tables_to_stream_map_ GUARDED_BY(mutex_);
438
439
  typedef std::unordered_map<std::string, scoped_refptr<UniverseReplicationInfo>>
440
      UniverseReplicationInfoMap;
441
  UniverseReplicationInfoMap universe_replication_map_ GUARDED_BY(mutex_);
442
443
  // mutex on should_send_consumer_registry_mutex_.
444
  mutable simple_spinlock should_send_consumer_registry_mutex_;
445
  // Should catalog manager resend latest consumer registry to tserver.
446
  std::unordered_map<TabletServerId, bool> should_send_consumer_registry_
447
  GUARDED_BY(should_send_consumer_registry_mutex_);
448
449
  MasterSnapshotCoordinator snapshot_coordinator_;
450
451
  DISALLOW_COPY_AND_ASSIGN(CatalogManager);
452
};
453
454
} // namespace enterprise
455
} // namespace master
456
} // namespace yb
457
458
#endif // ENT_SRC_YB_MASTER_CATALOG_MANAGER_H