YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/snapshot-schedule-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/schema.h"
15
#include "yb/client/snapshot_test_util.h"
16
#include "yb/client/table.h"
17
#include "yb/client/table_alterer.h"
18
#include "yb/client/txn-test-base.h"
19
20
#include "yb/master/master.h"
21
#include "yb/master/master_backup.proxy.h"
22
#include "yb/master/mini_master.h"
23
24
#include "yb/tablet/tablet.h"
25
#include "yb/tablet/tablet_metadata.h"
26
#include "yb/tablet/tablet_peer.h"
27
#include "yb/tablet/tablet_retention_policy.h"
28
29
#include "yb/yql/cql/ql/util/errcodes.h"
30
31
using namespace std::literals;
32
33
DECLARE_bool(enable_history_cutoff_propagation);
34
DECLARE_int32(history_cutoff_propagation_interval_ms);
35
DECLARE_int32(timestamp_history_retention_interval_sec);
36
DECLARE_uint64(snapshot_coordinator_poll_interval_ms);
37
DECLARE_uint64(snapshot_coordinator_cleanup_delay_ms);
38
39
namespace yb {
40
namespace client {
41
42
class SnapshotScheduleTest : public TransactionTestBase<MiniCluster> {
43
 public:
44
7
  void SetUp() override {
45
7
    FLAGS_enable_history_cutoff_propagation = true;
46
7
    FLAGS_snapshot_coordinator_poll_interval_ms = 250;
47
7
    FLAGS_history_cutoff_propagation_interval_ms = 100;
48
7
    num_tablets_ = 1;
49
7
    TransactionTestBase<MiniCluster>::SetUp();
50
7
    snapshot_util_ = std::make_unique<SnapshotTestUtil>();
51
7
    snapshot_util_->SetProxy(&client_->proxy_cache());
52
7
    snapshot_util_->SetCluster(cluster_.get());
53
7
  }
54
  std::unique_ptr<SnapshotTestUtil> snapshot_util_;
55
};
56
57
0
TEST_F(SnapshotScheduleTest, Create) {
58
0
  std::vector<SnapshotScheduleId> ids;
59
0
  for (int i = 0; i != 3; ++i) {
60
0
    auto id = ASSERT_RESULT(snapshot_util_->CreateSchedule(table_));
61
0
    LOG(INFO) << "Schedule " << i << " id: " << id;
62
0
    ids.push_back(id);
63
64
0
    {
65
0
      auto schedules = ASSERT_RESULT(snapshot_util_->ListSchedules(id));
66
0
      ASSERT_EQ(schedules.size(), 1);
67
0
      ASSERT_EQ(TryFullyDecodeSnapshotScheduleId(schedules[0].id()), id);
68
0
    }
69
70
0
    auto schedules = ASSERT_RESULT(snapshot_util_->ListSchedules());
71
0
    LOG(INFO) << "Schedules: " << AsString(schedules);
72
0
    ASSERT_EQ(schedules.size(), ids.size());
73
0
    std::unordered_set<SnapshotScheduleId, SnapshotScheduleIdHash> ids_set(ids.begin(), ids.end());
74
0
    for (const auto& schedule : schedules) {
75
0
      id = TryFullyDecodeSnapshotScheduleId(schedule.id());
76
0
      auto it = ids_set.find(id);
77
0
      ASSERT_NE(it, ids_set.end()) << "Unknown id: " << id;
78
0
      ids_set.erase(it);
79
0
    }
80
0
    ASSERT_TRUE(ids_set.empty()) << "Not found ids: " << AsString(ids_set);
81
0
  }
82
0
}
83
84
0
TEST_F(SnapshotScheduleTest, Snapshot) {
85
0
  FLAGS_timestamp_history_retention_interval_sec = kTimeMultiplier;
86
87
0
  ASSERT_NO_FATALS(WriteData());
88
0
  auto schedule_id = ASSERT_RESULT(snapshot_util_->CreateSchedule(table_));
89
0
  ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id));
90
91
  // Write data to update history retention.
92
0
  ASSERT_NO_FATALS(WriteData());
93
94
0
  auto schedules = ASSERT_RESULT(snapshot_util_->ListSchedules());
95
0
  ASSERT_EQ(schedules.size(), 1);
96
0
  ASSERT_EQ(schedules[0].snapshots().size(), 1);
97
98
0
  std::this_thread::sleep_for(kSnapshotInterval / 4);
99
0
  auto snapshots = ASSERT_RESULT(snapshot_util_->ListSnapshots());
100
0
  ASSERT_EQ(snapshots.size(), 1);
101
102
0
  HybridTime first_snapshot_hybrid_time(schedules[0].snapshots()[0].entry().snapshot_hybrid_time());
103
0
  auto peers = ListTabletPeers(cluster_.get(), [table_id = table_->id()](const auto& peer) {
104
0
    return peer->tablet_metadata()->table_id() == table_id;
105
0
  });
106
0
  for (const auto& peer : peers) {
107
0
    SCOPED_TRACE(Format(
108
0
        "T $0 P $1 Table $2", peer->tablet_id(), peer->permanent_uuid(),
109
0
        peer->tablet_metadata()->table_name()));
110
0
    auto tablet = peer->tablet();
111
0
    auto history_cutoff = tablet->RetentionPolicy()->GetRetentionDirective().history_cutoff;
112
0
    ASSERT_LE(history_cutoff, first_snapshot_hybrid_time);
113
0
  }
114
115
0
  ASSERT_OK(WaitFor([this]() -> Result<bool> {
116
0
    auto snapshots = VERIFY_RESULT(snapshot_util_->ListSnapshots());
117
0
    return snapshots.size() == 2;
118
0
  }, kSnapshotInterval, "Second snapshot"));
119
120
0
  ASSERT_NO_FATALS(WriteData());
121
122
0
  ASSERT_OK(WaitFor([first_snapshot_hybrid_time, peers]() -> Result<bool> {
123
0
    for (const auto& peer : peers) {
124
0
      auto tablet = peer->tablet();
125
0
      auto history_cutoff = tablet->RetentionPolicy()->GetRetentionDirective().history_cutoff;
126
0
      if (history_cutoff <= first_snapshot_hybrid_time) {
127
0
        return false;
128
0
      }
129
0
    }
130
0
    return true;
131
0
  }, FLAGS_timestamp_history_retention_interval_sec * 1s + 5s, "History cutoff update"));
132
0
}
133
134
0
TEST_F(SnapshotScheduleTest, GC) {
135
0
  FLAGS_snapshot_coordinator_cleanup_delay_ms = 100;
136
  // When retention matches snapshot interval we expect at most 2 snapshots for schedule.
137
0
  ASSERT_RESULT(snapshot_util_->CreateSchedule(table_, kSnapshotInterval, kSnapshotInterval));
138
139
0
  std::unordered_set<SnapshotScheduleId, SnapshotScheduleIdHash> all_snapshot_ids;
140
0
  while (all_snapshot_ids.size() < 4) {
141
0
    auto snapshots = ASSERT_RESULT(
142
0
        snapshot_util_->ListSnapshots(TxnSnapshotId::Nil(), ListDeleted::kFalse));
143
0
    for (const auto& snapshot : snapshots) {
144
0
      all_snapshot_ids.insert(ASSERT_RESULT(FullyDecodeSnapshotScheduleId(snapshot.id())));
145
0
    }
146
0
    ASSERT_LE(snapshots.size(), 2);
147
148
0
    auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll);
149
0
    auto master_leader = ASSERT_RESULT(cluster_->GetLeaderMiniMaster());
150
0
    peers.push_back(master_leader->tablet_peer());
151
0
    for (const auto& peer : peers) {
152
0
      if (peer->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
153
0
        continue;
154
0
      }
155
0
      auto dir = ASSERT_RESULT(peer->tablet_metadata()->TopSnapshotsDir());
156
0
      auto children = ASSERT_RESULT(Env::Default()->GetChildren(dir, ExcludeDots::kTrue));
157
      // At most 3 files (including an extra for intents).
158
      // For e.g. [985a49e5-d7c7-491f-a95f-da8aa55a8cf9,
159
      // 105d49d2-4e55-45bf-a6a4-73a8b0977242.tmp.intents,
160
      // 105d49d2-4e55-45bf-a6a4-73a8b0977242.tmp].
161
0
      ASSERT_LE(children.size(), 3) << AsString(children);
162
0
    }
163
164
0
    std::this_thread::sleep_for(100ms);
165
0
  }
166
0
}
167
168
0
TEST_F(SnapshotScheduleTest, Index) {
169
0
  FLAGS_timestamp_history_retention_interval_sec = kTimeMultiplier;
170
171
0
  auto schedule_id = ASSERT_RESULT(snapshot_util_->CreateSchedule(table_));
172
0
  ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id));
173
174
0
  CreateIndex(Transactional::kTrue, 1, false);
175
0
  auto hybrid_time = cluster_->mini_master(0)->master()->clock()->Now();
176
0
  constexpr int kTransaction = 0;
177
0
  constexpr auto op_type = WriteOpType::INSERT;
178
179
0
  auto session = CreateSession();
180
0
  for (size_t r = 0; r != kNumRows; ++r) {
181
0
    ASSERT_OK(kv_table_test::WriteRow(
182
0
        &index_, session, KeyForTransactionAndIndex(kTransaction, r),
183
0
        ValueForTransactionAndIndex(kTransaction, r, op_type), op_type));
184
0
  }
185
186
0
  LOG(INFO) << "Index columns: " << AsString(index_.AllColumnNames());
187
0
  for (size_t r = 0; r != kNumRows; ++r) {
188
0
    const auto key = KeyForTransactionAndIndex(kTransaction, r);
189
0
    const auto fetched = ASSERT_RESULT(kv_table_test::SelectRow(
190
0
        &index_, session, key, kValueColumn));
191
0
    ASSERT_EQ(key, fetched);
192
0
  }
193
194
0
  auto peers = ListTabletPeers(cluster_.get(), [index_id = index_->id()](const auto& peer) {
195
0
    return peer->tablet_metadata()->table_id() == index_id;
196
0
  });
197
198
0
  ASSERT_OK(WaitFor([this, peers, hybrid_time]() -> Result<bool> {
199
0
    auto snapshots = VERIFY_RESULT(snapshot_util_->ListSnapshots());
200
0
    if (snapshots.size() == 2) {
201
0
      return true;
202
0
    }
203
204
0
    for (const auto& peer : peers) {
205
0
      SCOPED_TRACE(Format(
206
0
          "T $0 P $1 Table $2", peer->tablet_id(), peer->permanent_uuid(),
207
0
          peer->tablet_metadata()->table_name()));
208
0
      auto tablet = peer->tablet();
209
0
      auto history_cutoff = tablet->RetentionPolicy()->GetRetentionDirective().history_cutoff;
210
0
      SCHECK_LE(history_cutoff, hybrid_time, IllegalState, "Too big history cutoff");
211
0
    }
212
213
0
    return false;
214
0
  }, kSnapshotInterval, "Second snapshot"));
215
0
}
216
217
0
TEST_F(SnapshotScheduleTest, Restart) {
218
0
  ASSERT_NO_FATALS(WriteData());
219
0
  auto schedule_id = ASSERT_RESULT(snapshot_util_->CreateSchedule(table_));
220
0
  ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id));
221
0
  ASSERT_OK(cluster_->RestartSync());
222
223
0
  auto schedules = ASSERT_RESULT(snapshot_util_->ListSchedules());
224
0
  ASSERT_EQ(schedules.size(), 1);
225
0
  ASSERT_EQ(schedules[0].snapshots().size(), 1);
226
0
  ASSERT_EQ(schedules[0].snapshots()[0].entry().state(), master::SysSnapshotEntryPB::COMPLETE);
227
0
}
228
229
0
TEST_F(SnapshotScheduleTest, RestoreSchema) {
230
0
  ASSERT_NO_FATALS(WriteData());
231
0
  auto schedule_id = ASSERT_RESULT(snapshot_util_->CreateSchedule(table_));
232
0
  auto hybrid_time = cluster_->mini_master(0)->master()->clock()->Now();
233
0
  auto old_schema = table_.schema();
234
0
  auto alterer = client_->NewTableAlterer(table_.name());
235
0
  auto* column = alterer->AddColumn("new_column");
236
0
  column->Type(DataType::INT32);
237
0
  ASSERT_OK(alterer->Alter());
238
0
  ASSERT_OK(table_.Reopen());
239
0
  ASSERT_NO_FATALS(WriteData(WriteOpType::UPDATE));
240
0
  ASSERT_NO_FATALS(VerifyData(WriteOpType::UPDATE));
241
0
  ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id));
242
243
0
  auto schedules = ASSERT_RESULT(snapshot_util_->ListSchedules());
244
0
  ASSERT_EQ(schedules.size(), 1);
245
0
  const auto& snapshots = schedules[0].snapshots();
246
0
  ASSERT_EQ(snapshots.size(), 1);
247
0
  ASSERT_EQ(snapshots[0].entry().state(), master::SysSnapshotEntryPB::COMPLETE);
248
249
0
  ASSERT_OK(snapshot_util_->RestoreSnapshot(
250
0
      TryFullyDecodeTxnSnapshotId(snapshots[0].id()), hybrid_time));
251
252
0
  auto select_result = SelectRow(CreateSession(), 1, kValueColumn);
253
0
  ASSERT_NOK(select_result);
254
0
  ASSERT_TRUE(select_result.status().IsQLError());
255
0
  ASSERT_EQ(ql::QLError(select_result.status()), ql::ErrorCode::WRONG_METADATA_VERSION);
256
0
  ASSERT_OK(table_.Reopen());
257
0
  ASSERT_EQ(old_schema, table_.schema());
258
0
  ASSERT_NO_FATALS(VerifyData());
259
0
}
260
261
0
TEST_F(SnapshotScheduleTest, RemoveNewTablets) {
262
0
  const auto kInterval = 5s * kTimeMultiplier;
263
0
  const auto kRetention = kInterval * 2;
264
0
  auto schedule_id = ASSERT_RESULT(snapshot_util_->CreateSchedule(
265
0
      table_, WaitSnapshot::kTrue, kInterval, kRetention));
266
0
  auto before_index_ht = cluster_->mini_master(0)->master()->clock()->Now();
267
0
  CreateIndex(Transactional::kTrue, 1, false);
268
0
  auto after_time_ht = cluster_->mini_master(0)->master()->clock()->Now();
269
0
  ASSERT_OK(snapshot_util_->WaitScheduleSnapshot(schedule_id, after_time_ht));
270
0
  auto snapshot_id = ASSERT_RESULT(snapshot_util_->PickSuitableSnapshot(schedule_id,
271
0
                                                                        before_index_ht));
272
0
  ASSERT_OK(snapshot_util_->RestoreSnapshot(snapshot_id, before_index_ht));
273
0
  ASSERT_OK(WaitFor([this]() -> Result<bool> {
274
0
    auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
275
0
    for (const auto& peer : peers) {
276
0
      auto metadata = peer->tablet_metadata();
277
0
      if (metadata->indexed_table_id() != "") {
278
0
        LOG(INFO) << "T " << peer->tablet_id() << " P " << peer->permanent_uuid() << ": table "
279
0
                  << metadata->table_name() << ", indexed: "
280
0
                  << metadata->indexed_table_id();
281
0
        return false;
282
0
      }
283
0
    }
284
0
    return true;
285
0
  }, kRetention + kInterval * 2, "Cleanup obsolete tablets"));
286
0
}
287
288
} // namespace client
289
} // namespace yb