YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/master/sys_catalog-test_ent.cc
Line
Count
Source
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/master/catalog_entity_info.h"
14
#include "yb/master/sys_catalog.h"
15
#include "yb/master/sys_catalog-test_base.h"
16
17
namespace yb {
18
namespace master {
19
namespace enterprise {
20
21
class TestCDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {
22
 public:
23
1
  TestCDCStreamLoader() {}
24
1
  ~TestCDCStreamLoader() { Reset(); }
25
26
3
  void Reset() {
27
1
    for (CDCStreamInfo* stream : streams) {
28
1
      stream->Release();
29
1
    }
30
3
    streams.clear();
31
3
  }
32
33
1
  Status Visit(const CDCStreamId& stream_id, const SysCDCStreamEntryPB& metadata) override {
34
    // Setup the CDC stream info.
35
1
    CDCStreamInfo* const stream = new CDCStreamInfo(stream_id);
36
1
    auto l = stream->LockForWrite();
37
1
    l.mutable_data()->pb.CopyFrom(metadata);
38
1
    l.Commit();
39
1
    stream->AddRef();
40
1
    streams.push_back(stream);
41
1
    return Status::OK();
42
1
  }
43
44
  vector<CDCStreamInfo*> streams;
45
};
46
47
class TestUniverseReplicationLoader : public Visitor<PersistentUniverseReplicationInfo> {
48
 public:
49
1
  TestUniverseReplicationLoader() {}
50
1
  ~TestUniverseReplicationLoader() { Reset(); }
51
52
3
  void Reset() {
53
1
    for (UniverseReplicationInfo* universe : universes) {
54
1
      universe->Release();
55
1
    }
56
3
    universes.clear();
57
3
  }
58
59
  Status Visit(
60
1
      const std::string& producer_id, const SysUniverseReplicationEntryPB& metadata) override {
61
    // Setup the universe replication info.
62
1
    UniverseReplicationInfo* const universe = new UniverseReplicationInfo(producer_id);
63
1
    auto l = universe->LockForWrite();
64
1
    l.mutable_data()->pb.CopyFrom(metadata);
65
1
    l.Commit();
66
1
    universe->AddRef();
67
1
    universes.push_back(universe);
68
1
    return Status::OK();
69
1
  }
70
71
  vector<UniverseReplicationInfo*> universes;
72
};
73
74
// Test the sys-catalog CDC stream basic operations (add, delete, visit).
75
1
TEST_F(SysCatalogTest, TestSysCatalogCDCStreamOperations) {
76
1
  SysCatalogTable* const sys_catalog = &master_->sys_catalog();
77
78
1
  auto loader = std::make_unique<TestCDCStreamLoader>();
79
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
80
81
  // 1. CHECK ADD_CDCSTREAM.
82
1
  auto stream = make_scoped_refptr<CDCStreamInfo>("deadbeafdeadbeafdeadbeafdeadbeaf");
83
1
  {
84
1
    auto l = stream->LockForWrite();
85
1
    l.mutable_data()->pb.add_table_id("test_table");
86
    // Add the CDC stream.
87
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, stream));
88
1
    l.Commit();
89
1
  }
90
91
  // Verify it showed up.
92
1
  loader->Reset();
93
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
94
1
  ASSERT_EQ(1, loader->streams.size());
95
1
  ASSERT_METADATA_EQ(stream.get(), loader->streams[0]);
96
97
  // 2. CHECK DELETE_CDCSTREAM.
98
1
  ASSERT_OK(sys_catalog->Delete(kLeaderTerm, stream));
99
100
  // Verify the result.
101
1
  loader->Reset();
102
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
103
1
  ASSERT_EQ(0, loader->streams.size());
104
1
}
105
106
// Test the sys-catalog universe replication basic operations (add, delete, visit).
107
1
TEST_F(SysCatalogTest, TestSysCatalogUniverseReplicationOperations) {
108
1
  SysCatalogTable* const sys_catalog = &master_->sys_catalog();
109
110
1
  auto loader = std::make_unique<TestUniverseReplicationLoader>();
111
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
112
113
  // 1. CHECK ADD_UNIVERSE_REPLICATION.
114
1
  auto universe = make_scoped_refptr<UniverseReplicationInfo>("deadbeafdeadbeafdeadbeafdeadbeaf");
115
1
  {
116
1
    auto l = universe->LockForWrite();
117
1
    l.mutable_data()->pb.add_tables("producer_table_id");
118
    // Add the universe replication info.
119
1
    ASSERT_OK(sys_catalog->Upsert(kLeaderTerm, universe));
120
1
    l.Commit();
121
1
  }
122
123
  // Verify it showed up.
124
1
  loader->Reset();
125
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
126
1
  ASSERT_EQ(1, loader->universes.size());
127
1
  ASSERT_METADATA_EQ(universe.get(), loader->universes[0]);
128
129
  // 2. CHECK DELETE_UNIVERSE_REPLICATION.
130
1
  ASSERT_OK(sys_catalog->Delete(kLeaderTerm, universe));
131
132
  // Verify the result.
133
1
  loader->Reset();
134
1
  ASSERT_OK(sys_catalog->Visit(loader.get()));
135
1
  ASSERT_EQ(0, loader->universes.size());
136
1
}
137
138
} // namespace enterprise
139
} // namespace master
140
} // namespace yb