YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/sys_catalog-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <algorithm>
34
#include <memory>
35
#include <vector>
36
37
#include "yb/common/schema.h"
38
#include "yb/common/wire_protocol.h"
39
40
#include "yb/gutil/stl_util.h"
41
42
#include "yb/master/async_rpc_tasks.h"
43
#include "yb/master/catalog_manager.h"
44
#include "yb/master/master_cluster.pb.h"
45
#include "yb/master/sys_catalog-test_base.h"
46
#include "yb/master/sys_catalog.h"
47
48
#include "yb/util/net/sockaddr.h"
49
#include "yb/util/status.h"
50
51
using std::make_shared;
52
using std::string;
53
using std::shared_ptr;
54
using std::unique_ptr;
55
using yb::rpc::Messenger;
56
using yb::rpc::MessengerBuilder;
57
using yb::rpc::RpcController;
58
59
DECLARE_string(cluster_uuid);
60
61
namespace yb {
62
namespace master {
63
64
class TestTableLoader : public Visitor<PersistentTableInfo> {
65
 public:
66
2
  TestTableLoader() {}
67
2
  ~TestTableLoader() { Reset(); }
68
69
6
  void Reset() {
70
105
    for (const auto& entry :  tables) {
71
105
      entry.second->Release();
72
105
    }
73
6
    tables.clear();
74
6
  }
75
76
105
  Status Visit(const std::string& table_id, const SysTablesEntryPB& metadata) override {
77
    // Setup the table info
78
105
    TableInfo *table = new TableInfo(table_id);
79
105
    auto l = table->LockForWrite();
80
105
    l.mutable_data()->pb.CopyFrom(metadata);
81
105
    l.Commit();
82
105
    table->AddRef();
83
105
    tables[table->id()] = table;
84
105
    return Status::OK();
85
105
  }
86
87
  std::map<std::string, TableInfo*> tables;
88
};
89
90
1
TEST_F(SysCatalogTest, TestPrepareDefaultClusterConfig) {
91
92
1
  FLAGS_cluster_uuid = "invalid_uuid";
93
94
1
  enterprise::CatalogManager catalog_manager(nullptr);
95
1
  {
96
1
    CatalogManager::LockGuard lock(catalog_manager.mutex_);
97
1
    ASSERT_NOK(catalog_manager.PrepareDefaultClusterConfig(0));
98
1
  }
99
100
1
  auto dir = GetTestPath("Master") + "valid_cluster_uuid_test";
101
1
  ASSERT_OK(Env::Default()->CreateDir(dir));
102
1
  std::unique_ptr<MiniMaster> mini_master =
103
1
      std::make_unique<MiniMaster>(Env::Default(), dir, AllocateFreePort(), AllocateFreePort(), 0);
104
105
106
  // Test that config.cluster_uuid gets set to the value that we specify through flag cluster_uuid.
107
1
  FLAGS_cluster_uuid = Uuid::Generate().ToString();
108
1
  ASSERT_OK(mini_master->Start());
109
1
  auto master = mini_master->master();
110
1
  ASSERT_OK(master->WaitUntilCatalogManagerIsLeaderAndReadyForTests());
111
112
0
  SysClusterConfigEntryPB config;
113
0
  ASSERT_OK(master->catalog_manager()->GetClusterConfig(&config));
114
115
  // Verify that the cluster uuid was set in the config.
116
0
  ASSERT_EQ(FLAGS_cluster_uuid, config.cluster_uuid());
117
118
0
  mini_master->Shutdown();
119
120
  // Test that config.cluster_uuid gets set to a valid uuid when cluster_uuid flag is empty.
121
0
  dir = GetTestPath("Master") + "empty_cluster_uuid_test";
122
0
  ASSERT_OK(Env::Default()->CreateDir(dir));
123
0
  mini_master =
124
0
      std::make_unique<MiniMaster>(Env::Default(), dir, AllocateFreePort(), AllocateFreePort(), 0);
125
0
  FLAGS_cluster_uuid = "";
126
0
  ASSERT_OK(mini_master->Start());
127
0
  master = mini_master->master();
128
0
  ASSERT_OK(master->WaitUntilCatalogManagerIsLeaderAndReadyForTests());
129
130
0
  ASSERT_OK(master->catalog_manager()->GetClusterConfig(&config));
131
132
  // Check that the cluster_uuid was set.
133
0
  ASSERT_FALSE(config.cluster_uuid().empty());
134
135
  // Check that the cluster uuid is valid.
136
0
  ASSERT_OK(Uuid::FromString(config.cluster_uuid()));
137
138
0
  mini_master->Shutdown();
139
0
}
140
141
// Test the sys-catalog tables basic operations (add, update, delete,
142
// visit)
143
1
TEST_F(SysCatalogTest, TestSysCatalogTablesOperations) {
144
1
  SysCatalogTable* sys_catalog = master_->catalog_manager()->sys_catalog();
145
146
1
  unique_ptr<TestTableLoader> loader(new TestTableLoader());
147
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
148
1
  ASSERT_EQ(kNumSystemTables, loader->tables.size());
149
150
  // Create new table.
151
1
  const std::string table_id = "abc";
152
1
  scoped_refptr<TableInfo> table = master_->catalog_manager()->NewTableInfo(table_id);
153
1
  {
154
1
    auto l = table->LockForWrite();
155
1
    l.mutable_data()->pb.set_name("testtb");
156
1
    l.mutable_data()->pb.set_version(0);
157
1
    l.mutable_data()->pb.mutable_replication_info()->mutable_live_replicas()->set_num_replicas(1);
158
1
    l.mutable_data()->pb.set_state(SysTablesEntryPB::PREPARING);
159
1
    SchemaToPB(Schema(), l.mutable_data()->pb.mutable_schema());
160
    // Add the table
161
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, table));
162
163
1
    l.Commit();
164
1
  }
165
166
  // Verify it showed up.
167
1
  loader->Reset();
168
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
169
170
1
  ASSERT_EQ(1 + kNumSystemTables, loader->tables.size());
171
1
  ASSERT_METADATA_EQ(table.get(), loader->tables[table_id]);
172
173
  // Update the table
174
1
  {
175
1
    auto l = table->LockForWrite();
176
1
    l.mutable_data()->pb.set_version(1);
177
1
    l.mutable_data()->pb.set_state(SysTablesEntryPB::DELETING);
178
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, table));
179
1
    l.Commit();
180
1
  }
181
182
1
  loader->Reset();
183
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
184
1
  ASSERT_EQ(1 + kNumSystemTables, loader->tables.size());
185
1
  ASSERT_METADATA_EQ(table.get(), loader->tables[table_id]);
186
187
  // Delete the table
188
1
  loader->Reset();
189
1
  ASSERT_OK(sys_catalog->Delete(kLeaderTerm, table));
190
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
191
1
  ASSERT_EQ(kNumSystemTables, loader->tables.size());
192
1
}
193
194
// Verify that data mutations are not available from metadata() until commit.
195
1
TEST_F(SysCatalogTest, TestTableInfoCommit) {
196
1
  scoped_refptr<TableInfo> table(master_->catalog_manager()->NewTableInfo("123"));
197
198
  // Mutate the table, under the write lock.
199
1
  auto writer_lock = table->LockForWrite();
200
1
  writer_lock.mutable_data()->pb.set_name("foo");
201
202
  // Changes should not be visible to a reader.
203
  // The reader can still lock for read, since readers don't block
204
  // writers in the RWC lock.
205
1
  {
206
1
    auto reader_lock = table->LockForRead();
207
1
    ASSERT_NE("foo", reader_lock->name());
208
1
  }
209
1
  writer_lock.mutable_data()->set_state(SysTablesEntryPB::RUNNING, "running");
210
211
212
1
  {
213
1
    auto reader_lock = table->LockForRead();
214
1
    ASSERT_NE("foo", reader_lock->pb.name());
215
1
    ASSERT_NE("running", reader_lock->pb.state_msg());
216
1
    ASSERT_NE(SysTablesEntryPB::RUNNING, reader_lock->pb.state());
217
1
  }
218
219
  // Commit the changes
220
1
  writer_lock.Commit();
221
222
  // Verify that the data is visible
223
1
  {
224
1
    auto reader_lock = table->LockForRead();
225
1
    ASSERT_EQ("foo", reader_lock->pb.name());
226
1
    ASSERT_EQ("running", reader_lock->pb.state_msg());
227
1
    ASSERT_EQ(SysTablesEntryPB::RUNNING, reader_lock->pb.state());
228
1
  }
229
1
}
230
231
class TestTabletLoader : public Visitor<PersistentTabletInfo> {
232
 public:
233
1
  TestTabletLoader() {}
234
1
  ~TestTabletLoader() { Reset(); }
235
236
5
  void Reset() {
237
93
    for (const auto& entry : tablets) {
238
93
      entry.second->Release();
239
93
    }
240
5
    tablets.clear();
241
5
  }
242
243
93
  Status Visit(const std::string& tablet_id, const SysTabletsEntryPB& metadata) override {
244
    // Setup the tablet info
245
93
    TabletInfo *tablet = new TabletInfo(nullptr, tablet_id);
246
93
    auto l = tablet->LockForWrite();
247
93
    l.mutable_data()->pb.CopyFrom(metadata);
248
93
    l.Commit();
249
93
    tablet->AddRef();
250
93
    tablets[tablet->id()] = tablet;
251
93
    return Status::OK();
252
93
  }
253
254
  std::map<std::string, TabletInfo*> tablets;
255
};
256
257
// Create a new TabletInfo. The object is in uncommitted
258
// state.
259
static TabletInfo *CreateTablet(TableInfo *table,
260
                                const string& tablet_id,
261
                                const string& start_key,
262
3
                                const string& end_key) {
263
3
  TabletInfo *tablet = new TabletInfo(table, tablet_id);
264
3
  tablet->mutable_metadata()->StartMutation();
265
3
  auto* metadata = &tablet->mutable_metadata()->mutable_dirty()->pb;
266
3
  metadata->set_state(SysTabletsEntryPB::PREPARING);
267
3
  metadata->mutable_partition()->set_partition_key_start(start_key);
268
3
  metadata->mutable_partition()->set_partition_key_end(end_key);
269
3
  metadata->set_table_id(table->id());
270
3
  return tablet;
271
3
}
272
273
// Test the sys-catalog tablets basic operations (add, update, delete,
274
// visit)
275
1
TEST_F(SysCatalogTest, TestSysCatalogTabletsOperations) {
276
1
  scoped_refptr<TableInfo> table(master_->catalog_manager()->NewTableInfo("abc"));
277
  // This leaves all three in StartMutation.
278
1
  scoped_refptr<TabletInfo> tablet1(CreateTablet(table.get(), "123", "a", "b"));
279
1
  scoped_refptr<TabletInfo> tablet2(CreateTablet(table.get(), "456", "b", "c"));
280
1
  scoped_refptr<TabletInfo> tablet3(CreateTablet(table.get(), "789", "c", "d"));
281
282
1
  SysCatalogTable* sys_catalog = master_->catalog_manager()->sys_catalog();
283
284
1
  unique_ptr<TestTabletLoader> loader(new TestTabletLoader());
285
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
286
1
  ASSERT_EQ(kNumSystemTables, loader->tablets.size());
287
288
  // Add tablet1 and tablet2
289
1
  {
290
1
    std::vector<TabletInfo*> tablets;
291
1
    tablets.push_back(tablet1.get());
292
1
    tablets.push_back(tablet2.get());
293
294
1
    loader->Reset();
295
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, tablets));
296
1
    tablet1->mutable_metadata()->CommitMutation();
297
1
    tablet2->mutable_metadata()->CommitMutation();
298
299
1
    ASSERT_OK(sys_catalog->Visit(loader.get()));
300
1
    ASSERT_EQ(2 + kNumSystemTables, loader->tablets.size());
301
1
    ASSERT_METADATA_EQ(tablet1.get(), loader->tablets[tablet1->id()]);
302
1
    ASSERT_METADATA_EQ(tablet2.get(), loader->tablets[tablet2->id()]);
303
1
  }
304
305
  // Update tablet1
306
1
  {
307
1
    std::vector<TabletInfo*> tablets;
308
1
    tablets.push_back(tablet1.get());
309
310
1
    auto l1 = tablet1->LockForWrite();
311
1
    l1.mutable_data()->pb.set_state(SysTabletsEntryPB::RUNNING);
312
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, tablets));
313
1
    l1.Commit();
314
315
1
    loader->Reset();
316
1
    ASSERT_OK(sys_catalog->Visit(loader.get()));
317
1
    ASSERT_EQ(2 + kNumSystemTables, loader->tablets.size());
318
1
    ASSERT_METADATA_EQ(tablet1.get(), loader->tablets[tablet1->id()]);
319
1
    ASSERT_METADATA_EQ(tablet2.get(), loader->tablets[tablet2->id()]);
320
1
  }
321
322
  // Add tablet3 and Update tablet1 and tablet2
323
1
  {
324
1
    std::vector<TabletInfo *> to_add;
325
1
    std::vector<TabletInfo *> to_update;
326
327
1
    to_add.push_back(tablet3.get());
328
1
    to_update.push_back(tablet1.get());
329
1
    to_update.push_back(tablet2.get());
330
331
1
    auto l1 = tablet1->LockForWrite();
332
1
    l1.mutable_data()->pb.set_state(SysTabletsEntryPB::REPLACED);
333
1
    auto l2 = tablet2->LockForWrite();
334
1
    l2.mutable_data()->pb.set_state(SysTabletsEntryPB::RUNNING);
335
336
1
    loader->Reset();
337
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, to_add, to_update));
338
339
1
    l1.Commit();
340
1
    l2.Commit();
341
    // This was still open from the initial create!
342
1
    tablet3->mutable_metadata()->CommitMutation();
343
344
1
    ASSERT_OK(sys_catalog->Visit(loader.get()));
345
1
    ASSERT_EQ(3 + kNumSystemTables, loader->tablets.size());
346
1
    ASSERT_METADATA_EQ(tablet1.get(), loader->tablets[tablet1->id()]);
347
1
    ASSERT_METADATA_EQ(tablet2.get(), loader->tablets[tablet2->id()]);
348
1
    ASSERT_METADATA_EQ(tablet3.get(), loader->tablets[tablet3->id()]);
349
1
  }
350
351
  // Delete tablet1 and tablet3 tablets
352
1
  {
353
1
    std::vector<TabletInfo*> tablets;
354
1
    tablets.push_back(tablet1.get());
355
1
    tablets.push_back(tablet3.get());
356
357
1
    loader->Reset();
358
1
    ASSERT_OK(sys_catalog->Delete(kLeaderTerm, tablets));
359
1
    ASSERT_OK(sys_catalog->Visit(loader.get()));
360
1
    ASSERT_EQ(1 + kNumSystemTables, loader->tablets.size());
361
1
    ASSERT_METADATA_EQ(tablet2.get(), loader->tablets[tablet2->id()]);
362
1
  }
363
1
}
364
365
// Verify that data mutations are not available from metadata() until commit.
366
1
TEST_F(SysCatalogTest, TestTabletInfoCommit) {
367
1
  scoped_refptr<TabletInfo> tablet(new TabletInfo(nullptr, "123"));
368
369
  // Mutate the tablet, the changes should not be visible
370
1
  auto l = tablet->LockForWrite();
371
1
  PartitionPB* partition = l.mutable_data()->pb.mutable_partition();
372
1
  partition->set_partition_key_start("a");
373
1
  partition->set_partition_key_end("b");
374
1
  l.mutable_data()->set_state(SysTabletsEntryPB::RUNNING, "running");
375
1
  {
376
    // Changes shouldn't be visible, and lock should still be
377
    // acquired even though the mutation is under way.
378
1
    auto read_lock = tablet->LockForRead();
379
1
    ASSERT_NE("a", read_lock->pb.partition().partition_key_start());
380
1
    ASSERT_NE("b", read_lock->pb.partition().partition_key_end());
381
1
    ASSERT_NE("running", read_lock->pb.state_msg());
382
1
    ASSERT_NE(SysTabletsEntryPB::RUNNING,
383
1
              read_lock->pb.state());
384
1
  }
385
386
  // Commit the changes
387
1
  l.Commit();
388
389
  // Verify that the data is visible
390
1
  {
391
1
    auto read_lock = tablet->LockForRead();
392
1
    ASSERT_EQ("a", read_lock->pb.partition().partition_key_start());
393
1
    ASSERT_EQ("b", read_lock->pb.partition().partition_key_end());
394
1
    ASSERT_EQ("running", read_lock->pb.state_msg());
395
1
    ASSERT_EQ(SysTabletsEntryPB::RUNNING,
396
1
              read_lock->pb.state());
397
1
  }
398
1
}
399
400
class TestClusterConfigLoader : public Visitor<PersistentClusterConfigInfo> {
401
 public:
402
1
  TestClusterConfigLoader() {}
403
1
  ~TestClusterConfigLoader() { Reset(); }
404
405
  virtual Status Visit(
406
4
      const std::string& fake_id, const SysClusterConfigEntryPB& metadata) override {
407
0
    CHECK(!config_info) << "We either got multiple config_info entries, or we didn't Reset()";
408
4
    config_info = new ClusterConfigInfo();
409
4
    auto l = config_info->LockForWrite();
410
4
    l.mutable_data()->pb.CopyFrom(metadata);
411
4
    l.Commit();
412
4
    config_info->AddRef();
413
4
    return Status::OK();
414
4
  }
415
416
4
  void Reset() {
417
4
    if (config_info) {
418
4
      config_info->Release();
419
4
      config_info = nullptr;
420
4
    }
421
4
  }
422
423
  ClusterConfigInfo* config_info = nullptr;
424
};
425
426
// Test the sys-catalog tables basic operations (add, update, delete, visit)
427
1
TEST_F(SysCatalogTest, TestSysCatalogPlacementOperations) {
428
1
  SysCatalogTable* sys_catalog = master_->catalog_manager()->sys_catalog();
429
430
1
  unique_ptr<TestClusterConfigLoader> loader(new TestClusterConfigLoader());
431
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
432
1
  ASSERT_TRUE(loader->config_info);
433
1
  {
434
1
    auto l = loader->config_info->LockForRead();
435
1
    ASSERT_EQ(l->pb.version(), 0);
436
1
    ASSERT_EQ(l->pb.replication_info().live_replicas().placement_blocks_size(), 0);
437
1
  }
438
439
  // Test modifications directly through the Sys catalog API.
440
441
  // Create a config_info block.
442
1
  scoped_refptr<ClusterConfigInfo> config_info(new ClusterConfigInfo());
443
1
  {
444
1
    auto l = config_info->LockForWrite();
445
1
    auto pb = l.mutable_data()
446
1
                  ->pb.mutable_replication_info()
447
1
                  ->mutable_live_replicas()
448
1
                  ->add_placement_blocks();
449
1
    auto cloud_info = pb->mutable_cloud_info();
450
1
    cloud_info->set_placement_cloud("cloud");
451
1
    cloud_info->set_placement_region("region");
452
1
    cloud_info->set_placement_zone("zone");
453
1
    pb->set_min_num_replicas(100);
454
455
    // Set it in the sys_catalog. It already has the default entry, so we use update.
456
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, config_info));
457
1
    l.Commit();
458
1
  }
459
460
  // Check data from sys_catalog.
461
1
  loader->Reset();
462
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
463
1
  ASSERT_TRUE(loader->config_info);
464
1
  ASSERT_METADATA_EQ(config_info.get(), loader->config_info);
465
466
1
  {
467
1
    auto l = config_info->LockForWrite();
468
1
    auto pb = l.mutable_data()
469
1
                  ->pb.mutable_replication_info()
470
1
                  ->mutable_live_replicas()
471
1
                  ->mutable_placement_blocks(0);
472
1
    auto cloud_info = pb->mutable_cloud_info();
473
    // Update some config_info info.
474
1
    cloud_info->set_placement_cloud("cloud2");
475
1
    pb->set_min_num_replicas(200);
476
    // Update it in the sys_catalog.
477
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, config_info));
478
1
    l.Commit();
479
1
  }
480
481
  // Check data from sys_catalog.
482
1
  loader->Reset();
483
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
484
1
  ASSERT_TRUE(loader->config_info);
485
1
  ASSERT_METADATA_EQ(config_info.get(), loader->config_info);
486
487
  // Test data through the CatalogManager API.
488
489
  // Get the config from the CatalogManager and test it is the default, as we didn't use the
490
  // CatalogManager to update it.
491
1
  SysClusterConfigEntryPB config;
492
1
  ASSERT_OK(master_->catalog_manager()->GetClusterConfig(&config));
493
1
  ASSERT_EQ(config.version(), 0);
494
1
  ASSERT_EQ(config.replication_info().live_replicas().placement_blocks_size(), 0);
495
496
  // Update a field in the previously used in memory state and set through proper API.
497
1
  {
498
1
    auto l = config_info->LockForWrite();
499
1
    auto pb = l.mutable_data()
500
1
                  ->pb.mutable_replication_info()
501
1
                  ->mutable_live_replicas()
502
1
                  ->mutable_placement_blocks(0);
503
1
    pb->set_min_num_replicas(300);
504
505
1
    ChangeMasterClusterConfigRequestPB req;
506
1
    *req.mutable_cluster_config() = l.mutable_data()->pb;
507
1
    ChangeMasterClusterConfigResponsePB resp;
508
509
    // Verify that we receive an error when trying to change the cluster uuid.
510
1
    req.mutable_cluster_config()->set_cluster_uuid("some-cluster-uuid");
511
1
    auto status = master_->catalog_manager()->SetClusterConfig(&req, &resp);
512
1
    ASSERT_TRUE(status.IsInvalidArgument());
513
514
    // Setting the cluster uuid should make the request succeed.
515
1
    req.mutable_cluster_config()->set_cluster_uuid(config.cluster_uuid());
516
517
1
    ASSERT_OK(master_->catalog_manager()->SetClusterConfig(&req, &resp));
518
1
    l.Commit();
519
1
  }
520
521
  // Confirm the in memory state does not match the config we get from the CatalogManager API, due
522
  // to version mismatch.
523
1
  ASSERT_OK(master_->catalog_manager()->GetClusterConfig(&config));
524
1
  {
525
1
    auto l = config_info->LockForRead();
526
1
    ASSERT_FALSE(PbEquals(l->pb, config));
527
1
    ASSERT_EQ(l->pb.version(), 0);
528
1
    ASSERT_EQ(config.version(), 1);
529
1
    ASSERT_TRUE(PbEquals(
530
1
        l->pb.replication_info().live_replicas().placement_blocks(0),
531
1
        config.replication_info().live_replicas().placement_blocks(0)));
532
1
  }
533
534
  // Reload the data again and check that it matches expectations.
535
1
  loader->Reset();
536
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
537
1
  ASSERT_TRUE(loader->config_info);
538
1
  {
539
1
    auto l = loader->config_info->LockForRead();
540
1
    ASSERT_TRUE(PbEquals(l->pb, config));
541
1
    ASSERT_TRUE(l->pb.has_version());
542
1
    ASSERT_EQ(l->pb.version(), 1);
543
1
    ASSERT_EQ(l->pb.replication_info().live_replicas().placement_blocks_size(), 1);
544
1
  }
545
1
}
546
547
class TestNamespaceLoader : public Visitor<PersistentNamespaceInfo> {
548
 public:
549
1
  TestNamespaceLoader() {}
550
1
  ~TestNamespaceLoader() { Reset(); }
551
552
4
  void Reset() {
553
14
    for (NamespaceInfo* ni : namespaces) {
554
14
      ni->Release();
555
14
    }
556
4
    namespaces.clear();
557
4
  }
558
559
14
  Status Visit(const std::string& ns_id, const SysNamespaceEntryPB& metadata) override {
560
    // Setup the namespace info
561
14
    NamespaceInfo* const ns = new NamespaceInfo(ns_id);
562
14
    auto l = ns->LockForWrite();
563
14
    l.mutable_data()->pb.CopyFrom(metadata);
564
14
    l.Commit();
565
14
    ns->AddRef();
566
14
    namespaces.push_back(ns);
567
14
    return Status::OK();
568
14
  }
569
570
  vector<NamespaceInfo*> namespaces;
571
};
572
573
// Test the sys-catalog namespaces basic operations (add, update, delete, visit)
574
1
TEST_F(SysCatalogTest, TestSysCatalogNamespacesOperations) {
575
1
  SysCatalogTable* const sys_catalog = master_->catalog_manager()->sys_catalog();
576
577
  // 1. CHECK SYSTEM NAMESPACE COUNT
578
1
  unique_ptr<TestNamespaceLoader> loader(new TestNamespaceLoader());
579
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
580
581
1
  ASSERT_EQ(kNumSystemNamespaces, loader->namespaces.size());
582
583
  // 2. CHECK ADD_NAMESPACE
584
  // Create new namespace.
585
1
  scoped_refptr<NamespaceInfo> ns(new NamespaceInfo("deadbeafdeadbeafdeadbeafdeadbeaf"));
586
1
  {
587
1
    auto l = ns->LockForWrite();
588
1
    l.mutable_data()->pb.set_name("test_ns");
589
    // Add the namespace
590
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, ns));
591
592
1
    l.Commit();
593
1
  }
594
595
  // Verify it showed up.
596
1
  loader->Reset();
597
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
598
1
  ASSERT_EQ(1 + kNumSystemNamespaces, loader->namespaces.size());
599
1
  ASSERT_METADATA_EQ(ns.get(), loader->namespaces[loader->namespaces.size() - 1]);
600
601
  // 3. CHECK UPDATE_NAMESPACE
602
  // Update the namespace
603
1
  {
604
1
    auto l = ns->LockForWrite();
605
1
    l.mutable_data()->pb.set_name("test_ns_new_name");
606
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, ns));
607
1
    l.Commit();
608
1
  }
609
610
  // Verify it showed up.
611
1
  loader->Reset();
612
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
613
1
  ASSERT_EQ(1 + kNumSystemNamespaces, loader->namespaces.size());
614
1
  ASSERT_METADATA_EQ(ns.get(), loader->namespaces[loader->namespaces.size() - 1]);
615
616
  // 4. CHECK DELETE_NAMESPACE
617
  // Delete the namespace
618
1
  ASSERT_OK(sys_catalog->Delete(kLeaderTerm, ns));
619
620
  // Verify the result.
621
1
  loader->Reset();
622
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
623
1
  ASSERT_EQ(kNumSystemNamespaces, loader->namespaces.size());
624
1
}
625
626
// Verify that data mutations are not available from metadata() until commit.
627
1
TEST_F(SysCatalogTest, TestNamespaceInfoCommit) {
628
1
  scoped_refptr<NamespaceInfo> ns(new NamespaceInfo("deadbeafdeadbeafdeadbeafdeadbeaf"));
629
630
  // Mutate the namespace, under the write lock.
631
1
  auto writer_lock = ns->LockForWrite();
632
1
  writer_lock.mutable_data()->pb.set_name("foo");
633
634
  // Changes should not be visible to a reader.
635
  // The reader can still lock for read, since readers don't block
636
  // writers in the RWC lock.
637
1
  {
638
1
    auto reader_lock = ns->LockForRead();
639
1
    ASSERT_NE("foo", reader_lock->name());
640
1
  }
641
642
  // Commit the changes
643
1
  writer_lock.Commit();
644
645
  // Verify that the data is visible
646
1
  {
647
1
    auto reader_lock = ns->LockForRead();
648
1
    ASSERT_EQ("foo", reader_lock->name());
649
1
  }
650
1
}
651
652
class TestUDTypeLoader : public Visitor<PersistentUDTypeInfo> {
653
 public:
654
1
  TestUDTypeLoader() {}
655
1
  ~TestUDTypeLoader() { Reset(); }
656
657
3
  void Reset() {
658
1
    for (UDTypeInfo* tp : udtypes) {
659
1
      tp->Release();
660
1
    }
661
3
    udtypes.clear();
662
3
  }
663
664
1
  Status Visit(const std::string& udtype_id, const SysUDTypeEntryPB& metadata) override {
665
    // Setup the udtype info
666
1
    UDTypeInfo* const tp = new UDTypeInfo(udtype_id);
667
1
    auto l = tp->LockForWrite();
668
1
    l.mutable_data()->pb.CopyFrom(metadata);
669
1
    l.Commit();
670
1
    tp->AddRef();
671
1
    udtypes.push_back(tp);
672
1
    return Status::OK();
673
1
  }
674
675
  vector<UDTypeInfo*> udtypes;
676
};
677
678
class TestRedisConfigLoader : public Visitor<PersistentRedisConfigInfo> {
679
 public:
680
1
  TestRedisConfigLoader() {}
681
1
  ~TestRedisConfigLoader() { Reset(); }
682
683
6
  void Reset() {
684
5
    for (RedisConfigInfo* rci : config_entries) {
685
5
      rci->Release();
686
5
    }
687
6
    config_entries.clear();
688
6
  }
689
690
5
  Status Visit(const std::string& key, const SysRedisConfigEntryPB& metadata) override {
691
    // Setup the redis config info
692
5
    RedisConfigInfo* const rci = new RedisConfigInfo(key);
693
5
    auto l = rci->LockForWrite();
694
5
    l.mutable_data()->pb.CopyFrom(metadata);
695
5
    l.Commit();
696
5
    rci->AddRef();
697
5
    config_entries.push_back(rci);
698
5
    return Status::OK();
699
5
  }
700
701
  vector<RedisConfigInfo*> config_entries;
702
};
703
704
// Test the sys-catalog redis config basic operations (add, visit, drop)
705
1
TEST_F(SysCatalogTest, TestSysCatalogRedisConfigOperations) {
706
1
  SysCatalogTable* const sys_catalog = master_->catalog_manager()->sys_catalog();
707
708
1
  unique_ptr<TestRedisConfigLoader> loader(new TestRedisConfigLoader());
709
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
710
711
  // Set redis config information
712
1
  SysRedisConfigEntryPB config_entry;
713
1
  config_entry.set_key("key1");
714
1
  config_entry.add_args("value1.1");
715
1
  config_entry.add_args("value1.2");
716
717
  // 1. CHECK Add entry
718
1
  scoped_refptr<RedisConfigInfo> rci = new RedisConfigInfo("key1");
719
1
  {
720
1
    auto l = rci->LockForWrite();
721
1
    l.mutable_data()->pb = std::move(config_entry);
722
    // Add the redis config
723
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, rci));
724
1
    l.Commit();
725
1
  }
726
727
  // Verify it showed up.
728
1
  loader->Reset();
729
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
730
  // The default config is empty
731
1
  ASSERT_EQ(1, loader->config_entries.size());
732
1
  ASSERT_METADATA_EQ(rci.get(), loader->config_entries[0]);
733
734
  // Update the same config.
735
1
  SysRedisConfigEntryPB* metadata;
736
1
  rci->mutable_metadata()->StartMutation();
737
1
  metadata = &rci->mutable_metadata()->mutable_dirty()->pb;
738
739
1
  metadata->clear_args();
740
1
  metadata->add_args("value1b");
741
742
1
  ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, rci));
743
1
  rci->mutable_metadata()->CommitMutation();
744
745
  // Verify config entries.
746
1
  loader->Reset();
747
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
748
  //  The default config is empty.
749
1
  ASSERT_EQ(1, loader->config_entries.size());
750
1
  ASSERT_METADATA_EQ(rci.get(), loader->config_entries[0]);
751
752
  // Add another key
753
1
  {
754
    // Set redis config information
755
1
    SysRedisConfigEntryPB config_entry;
756
1
    config_entry.set_key("key2");
757
1
    config_entry.add_args("value2.1");
758
1
    config_entry.add_args("value2.2");
759
760
    // 1. CHECK Add entry
761
1
    scoped_refptr<RedisConfigInfo> rci2 = new RedisConfigInfo("key2");
762
1
    {
763
1
      auto l = rci2->LockForWrite();
764
1
      l.mutable_data()->pb = std::move(config_entry);
765
      // Add the redis config
766
1
      ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, rci2));
767
1
      l.Commit();
768
1
    }
769
770
    // Verify it showed up.
771
1
    loader->Reset();
772
1
    ASSERT_OK(sys_catalog->Visit(loader.get()));
773
    // The default config is empty
774
1
    ASSERT_EQ(2, loader->config_entries.size());
775
1
    ASSERT_METADATA_EQ(rci2.get(), loader->config_entries[1]);
776
777
    // 2. CHECK DELETE RedisConfig
778
1
    ASSERT_OK(sys_catalog->Delete(kLeaderTerm, rci2));
779
780
    // Verify the result.
781
1
    loader->Reset();
782
1
    ASSERT_OK(sys_catalog->Visit(loader.get()));
783
1
    ASSERT_EQ(1, loader->config_entries.size());
784
1
  }
785
  // 2. CHECK DELETE RedisConfig
786
1
  ASSERT_OK(sys_catalog->Delete(kLeaderTerm, rci));
787
788
  // Verify the result.
789
1
  loader->Reset();
790
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
791
1
  ASSERT_EQ(0, loader->config_entries.size());
792
1
}
793
794
class TestSysConfigLoader : public Visitor<PersistentSysConfigInfo> {
795
 public:
796
1
  TestSysConfigLoader() {}
797
1
  ~TestSysConfigLoader() { Reset(); }
798
799
3
  void Reset() {
800
10
    for (SysConfigInfo* sys_config : sys_configs) {
801
10
      sys_config->Release();
802
10
    }
803
3
    sys_configs.clear();
804
3
  }
805
806
10
  Status Visit(const string& id, const SysConfigEntryPB& metadata) override {
807
808
    // Setup the sysconfig info.
809
10
    SysConfigInfo* const sys_config = new SysConfigInfo(id /* config_type */);
810
10
    auto l = sys_config->LockForWrite();
811
10
    l.mutable_data()->pb.CopyFrom(metadata);
812
10
    l.Commit();
813
10
    sys_config->AddRef();
814
10
    sys_configs.push_back(sys_config);
815
10
    LOG(INFO) << " Current SysConfigInfo: " << sys_config->ToString();
816
10
    return Status::OK();
817
10
  }
818
819
  vector<SysConfigInfo*> sys_configs;
820
};
821
822
// Test the sys-catalog sys-config basic operations (add, visit, drop).
823
1
TEST_F(SysCatalogTest, TestSysCatalogSysConfigOperations) {
824
1
  SysCatalogTable* const sys_catalog = master_->catalog_manager()->sys_catalog();
825
826
  // 1. Verify that when master initializes:
827
  //   a. "security-config" entry is set up with roles_version = 0.
828
  //   b. "ysql-catalog-configuration" entry is set up with version = 0 and the transactional YSQL
829
  //      sys catalog flag is set to true.
830
  //   c. "transaction-tables-config" entry is set up with version = 0.
831
1
  scoped_refptr<SysConfigInfo> security_config = new SysConfigInfo(kSecurityConfigType);
832
1
  {
833
1
    auto l = security_config->LockForWrite();
834
1
    l.mutable_data()->pb.mutable_security_config()->set_roles_version(0);
835
1
    l.Commit();
836
1
  }
837
1
  scoped_refptr<SysConfigInfo> ysql_catalog_config = new SysConfigInfo(kYsqlCatalogConfigType);
838
1
  {
839
1
    auto l = ysql_catalog_config->LockForWrite();
840
1
    auto& ysql_catalog_config_pb = *l.mutable_data()->pb.mutable_ysql_catalog_config();
841
1
    ysql_catalog_config_pb.set_version(0);
842
1
    ysql_catalog_config_pb.set_transactional_sys_catalog_enabled(true);
843
1
    l.Commit();
844
1
  }
845
1
  scoped_refptr<SysConfigInfo> transaction_tables_config =
846
1
      new SysConfigInfo(kTransactionTablesConfigType);
847
1
  {
848
1
    auto l = transaction_tables_config->LockForWrite();
849
1
    auto& transaction_tables_config_pb = *l.mutable_data()->pb.mutable_transaction_tables_config();
850
1
    transaction_tables_config_pb.set_version(0);
851
1
    l.Commit();
852
1
  }
853
1
  unique_ptr<TestSysConfigLoader> loader(new TestSysConfigLoader());
854
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
855
1
  ASSERT_EQ(3, loader->sys_configs.size());
856
1
  ASSERT_METADATA_EQ(security_config.get(), loader->sys_configs[0]);
857
1
  ASSERT_METADATA_EQ(transaction_tables_config.get(), loader->sys_configs[1]);
858
1
  ASSERT_METADATA_EQ(ysql_catalog_config.get(), loader->sys_configs[2]);
859
860
  // 2. Add a new SysConfigEntryPB and verify it shows up.
861
1
  scoped_refptr<SysConfigInfo> test_config = new SysConfigInfo("test-security-configuration");
862
1
  {
863
1
    auto l = test_config->LockForWrite();
864
1
    l.mutable_data()->pb.mutable_security_config()->set_roles_version(1234);
865
866
    // Add the test_config.
867
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, test_config));
868
1
    l.Commit();
869
1
  }
870
1
  loader->Reset();
871
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
872
1
  ASSERT_EQ(4, loader->sys_configs.size());
873
1
  ASSERT_METADATA_EQ(security_config.get(), loader->sys_configs[0]);
874
1
  ASSERT_METADATA_EQ(test_config.get(), loader->sys_configs[1]);
875
1
  ASSERT_METADATA_EQ(transaction_tables_config.get(), loader->sys_configs[2]);
876
1
  ASSERT_METADATA_EQ(ysql_catalog_config.get(), loader->sys_configs[3]);
877
878
  // 2. Remove the SysConfigEntry and verify that it got removed.
879
1
  ASSERT_OK(sys_catalog->Delete(kLeaderTerm, test_config));
880
1
  loader->Reset();
881
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
882
1
  ASSERT_EQ(3, loader->sys_configs.size());
883
1
  ASSERT_METADATA_EQ(security_config.get(), loader->sys_configs[0]);
884
1
  ASSERT_METADATA_EQ(transaction_tables_config.get(), loader->sys_configs[1]);
885
1
  ASSERT_METADATA_EQ(ysql_catalog_config.get(), loader->sys_configs[2]);
886
1
}
887
888
class TestRoleLoader : public Visitor<PersistentRoleInfo> {
889
 public:
890
1
  TestRoleLoader() {}
891
1
  ~TestRoleLoader() { Reset(); }
892
893
4
  void Reset() {
894
6
    for (RoleInfo* rl : roles) {
895
6
      rl->Release();
896
6
    }
897
4
    roles.clear();
898
4
  }
899
900
6
  Status Visit(const RoleName& role_name, const SysRoleEntryPB& metadata) override {
901
902
    // Setup the role info
903
6
    RoleInfo* const rl = new RoleInfo(role_name);
904
6
    auto l = rl->LockForWrite();
905
6
    l.mutable_data()->pb.CopyFrom(metadata);
906
6
    l.Commit();
907
6
    rl->AddRef();
908
6
    roles.push_back(rl);
909
6
    LOG(INFO) << " Current Role: " << rl->ToString();
910
6
    return Status::OK();
911
6
  }
912
913
  vector<RoleInfo*> roles;
914
};
915
916
// Test the sys-catalog role/permissions basic operations (add, visit, drop)
917
1
TEST_F(SysCatalogTest, TestSysCatalogRoleOperations) {
918
1
  SysCatalogTable* const sys_catalog = master_->catalog_manager()->sys_catalog();
919
920
1
  unique_ptr<TestRoleLoader> loader(new TestRoleLoader());
921
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
922
923
  // Set role information
924
1
  SysRoleEntryPB role_entry;
925
1
  role_entry.set_role("test_role");
926
1
  role_entry.set_can_login(false);
927
1
  role_entry.set_is_superuser(true);
928
1
  role_entry.set_salted_hash("test_password");
929
930
  // 1. CHECK ADD_ROLE
931
1
  scoped_refptr<RoleInfo> rl = new RoleInfo("test_role");
932
1
  {
933
1
    auto l = rl->LockForWrite();
934
1
    l.mutable_data()->pb = std::move(role_entry);
935
    // Add the role
936
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, rl));
937
1
    l.Commit();
938
1
  }
939
940
  // Verify it showed up.
941
1
  loader->Reset();
942
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
943
  // The first role is the default cassandra role
944
1
  ASSERT_EQ(2, loader->roles.size());
945
1
  ASSERT_METADATA_EQ(rl.get(), loader->roles[1]);
946
947
  // Adding permissions
948
1
  SysRoleEntryPB* metadata;
949
1
  rl->mutable_metadata()->StartMutation();
950
1
  metadata = &rl->mutable_metadata()->mutable_dirty()->pb;
951
  // Set permission information;
952
1
  ResourcePermissionsPB* currentResource;
953
1
  currentResource = metadata->add_resources();
954
1
  currentResource->set_canonical_resource("data/keyspace1/table1");
955
1
  currentResource->set_resource_name("table1");
956
1
  currentResource->set_namespace_name("keyspace1");
957
1
  currentResource->set_resource_type(ResourceType::TABLE);
958
959
1
  currentResource->add_permissions(PermissionType::SELECT_PERMISSION);
960
1
  currentResource->add_permissions(PermissionType::MODIFY_PERMISSION);
961
1
  currentResource->add_permissions(PermissionType::AUTHORIZE_PERMISSION);
962
963
1
  currentResource = metadata->add_resources();
964
1
  currentResource->set_canonical_resource("roles/test_role");
965
1
  currentResource->set_resource_type(ResourceType::ROLE);
966
967
1
  currentResource->add_permissions(PermissionType::DROP_PERMISSION);
968
969
1
  ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, rl));
970
1
  rl->mutable_metadata()->CommitMutation();
971
972
  // Verify permissions
973
1
  loader->Reset();
974
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
975
  // The first role is the default cassandra role
976
1
  ASSERT_EQ(2, loader->roles.size());
977
1
  ASSERT_METADATA_EQ(rl.get(), loader->roles[1]);
978
979
  // 2. CHECK DELETE Role
980
1
  ASSERT_OK(sys_catalog->Delete(kLeaderTerm, rl));
981
982
  // Verify the result.
983
1
  loader->Reset();
984
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
985
1
  ASSERT_EQ(1, loader->roles.size());
986
1
}
987
988
989
// Test the sys-catalog udtype basic operations (add, delete, visit)
990
1
TEST_F(SysCatalogTest, TestSysCatalogUDTypeOperations) {
991
1
  SysCatalogTable* const sys_catalog = master_->catalog_manager()->sys_catalog();
992
993
1
  unique_ptr<TestUDTypeLoader> loader(new TestUDTypeLoader());
994
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
995
996
  // 1. CHECK ADD_UDTYPE
997
1
  scoped_refptr<UDTypeInfo> tp(new UDTypeInfo("deadbeafdeadbeafdeadbeafdeadbeaf"));
998
1
  {
999
1
    auto l = tp->LockForWrite();
1000
1
    l.mutable_data()->pb.set_name("test_tp");
1001
1
    l.mutable_data()->pb.set_namespace_id(kSystemNamespaceId);
1002
    // Add the udtype
1003
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, tp));
1004
1
    l.Commit();
1005
1
  }
1006
1007
  // Verify it showed up.
1008
1
  loader->Reset();
1009
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
1010
1
  ASSERT_EQ(1, loader->udtypes.size());
1011
1
  ASSERT_METADATA_EQ(tp.get(), loader->udtypes[0]);
1012
1013
  // 2. CHECK DELETE_UDTYPE
1014
1
  ASSERT_OK(sys_catalog->Delete(kLeaderTerm, tp));
1015
1016
  // Verify the result.
1017
1
  loader->Reset();
1018
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
1019
1
  ASSERT_EQ(0, loader->udtypes.size());
1020
1
}
1021
1022
// Test the tasks tracker in catalog manager.
1023
1
TEST_F(SysCatalogTest, TestCatalogManagerTasksTracker) {
1024
  // Configure number of tasks flag and keep time flag.
1025
1
  SetAtomicFlag(100, &FLAGS_tasks_tracker_num_tasks);
1026
1
  SetAtomicFlag(100, &FLAGS_tasks_tracker_keep_time_multiplier);
1027
1028
1
  SysCatalogTable* sys_catalog = master_->catalog_manager()->sys_catalog();
1029
1030
1
  unique_ptr<TestTableLoader> loader(new TestTableLoader());
1031
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
1032
1
  ASSERT_EQ(kNumSystemTables, loader->tables.size());
1033
1034
  // Create new table.
1035
1
  const std::string table_id = "abc";
1036
1
  scoped_refptr<TableInfo> table = master_->catalog_manager()->NewTableInfo(table_id);
1037
1
  {
1038
1
    auto l = table->LockForWrite();
1039
1
    l.mutable_data()->pb.set_name("testtb");
1040
1
    l.mutable_data()->pb.set_version(0);
1041
1
    l.mutable_data()->pb.mutable_replication_info()->mutable_live_replicas()->set_num_replicas(1);
1042
1
    l.mutable_data()->pb.set_state(SysTablesEntryPB::PREPARING);
1043
1
    SchemaToPB(Schema(), l.mutable_data()->pb.mutable_schema());
1044
    // Add the table.
1045
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, table));
1046
1047
1
    l.Commit();
1048
1
  }
1049
1050
  // Verify it showed up.
1051
1
  loader->Reset();
1052
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
1053
1054
1
  ASSERT_EQ(1 + kNumSystemTables, loader->tables.size());
1055
1
  ASSERT_METADATA_EQ(table.get(), loader->tables[table_id]);
1056
1057
  // Add tasks to the table (more than can fit in the cbuf).
1058
111
  for (int task_id = 0; task_id < FLAGS_tasks_tracker_num_tasks + 10; ++task_id) {
1059
110
    scoped_refptr<TabletInfo> tablet(new TabletInfo(table, kSysCatalogTableId));
1060
110
    auto task = std::make_shared<AsyncTruncate>(
1061
110
        master_, master_->catalog_manager()->AsyncTaskPool(), tablet);
1062
110
    table->AddTask(task);
1063
110
  }
1064
1065
  // Verify initial cbuf size is correct.
1066
1
  ASSERT_EQ(master_->catalog_manager()->GetRecentTasks().size(), FLAGS_tasks_tracker_num_tasks);
1067
1068
  // Wait for background task to run (length of two wait intervals).
1069
1
  usleep(2 * (1000 * FLAGS_catalog_manager_bg_task_wait_ms));
1070
1071
  // Verify that tasks were not cleaned up.
1072
1
  ASSERT_EQ(master_->catalog_manager()->GetRecentTasks().size(), FLAGS_tasks_tracker_num_tasks);
1073
1074
  // Set keep time flag to small multiple of the wait interval.
1075
1
  SetAtomicFlag(1, &FLAGS_tasks_tracker_keep_time_multiplier);
1076
1077
  // Wait for background task to run (length of two wait intervals).
1078
1
  usleep(2 * (1000 * FLAGS_catalog_manager_bg_task_wait_ms));
1079
1080
  // Verify that tasks were cleaned up.
1081
1
  ASSERT_EQ(master_->catalog_manager()->GetRecentTasks().size(), 0);
1082
1083
  // Cleanup tasks.
1084
1
  table->AbortTasksAndClose();
1085
1
}
1086
1087
} // namespace master
1088
} // namespace yb