/Users/deen/code/yugabyte-db/src/yb/client/ql-tablet-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include <shared_mutex> |
17 | | #include <thread> |
18 | | |
19 | | #include <boost/optional/optional.hpp> |
20 | | |
21 | | #include "yb/client/client-test-util.h" |
22 | | #include "yb/client/error.h" |
23 | | #include "yb/client/ql-dml-test-base.h" |
24 | | #include "yb/client/schema.h" |
25 | | #include "yb/client/session.h" |
26 | | #include "yb/client/table_handle.h" |
27 | | #include "yb/client/yb_op.h" |
28 | | |
29 | | #include "yb/common/ql_type.h" |
30 | | #include "yb/common/ql_value.h" |
31 | | #include "yb/common/schema.h" |
32 | | |
33 | | #include "yb/consensus/consensus.h" |
34 | | #include "yb/consensus/consensus.pb.h" |
35 | | #include "yb/consensus/log.h" |
36 | | #include "yb/consensus/raft_consensus.h" |
37 | | |
38 | | #include "yb/docdb/consensus_frontier.h" |
39 | | #include "yb/docdb/doc_key.h" |
40 | | |
41 | | #include "yb/gutil/casts.h" |
42 | | |
43 | | #include "yb/integration-tests/test_workload.h" |
44 | | |
45 | | #include "yb/master/catalog_entity_info.h" |
46 | | #include "yb/master/catalog_manager_if.h" |
47 | | #include "yb/master/master_defaults.h" |
48 | | #include "yb/master/master_ddl.proxy.h" |
49 | | |
50 | | #include "yb/rocksdb/db.h" |
51 | | #include "yb/rocksdb/types.h" |
52 | | |
53 | | #include "yb/rpc/rpc_controller.h" |
54 | | |
55 | | #include "yb/server/skewed_clock.h" |
56 | | |
57 | | #include "yb/tablet/tablet.h" |
58 | | #include "yb/tablet/tablet_bootstrap_if.h" |
59 | | #include "yb/tablet/tablet_metadata.h" |
60 | | #include "yb/tablet/tablet_peer.h" |
61 | | #include "yb/tablet/tablet_retention_policy.h" |
62 | | |
63 | | #include "yb/tserver/mini_tablet_server.h" |
64 | | #include "yb/tserver/tablet_server.h" |
65 | | #include "yb/tserver/ts_tablet_manager.h" |
66 | | #include "yb/tserver/tserver_service.proxy.h" |
67 | | |
68 | | #include "yb/util/random_util.h" |
69 | | #include "yb/util/shared_lock.h" |
70 | | #include "yb/util/status_format.h" |
71 | | #include "yb/util/stopwatch.h" |
72 | | #include "yb/util/tsan_util.h" |
73 | | |
74 | | #include "yb/yql/cql/ql/util/statement_result.h" |
75 | | |
76 | | using namespace std::literals; // NOLINT |
77 | | |
78 | | DECLARE_uint64(initial_seqno); |
79 | | DECLARE_int32(leader_lease_duration_ms); |
80 | | DECLARE_int64(db_write_buffer_size); |
81 | | DECLARE_string(time_source); |
82 | | DECLARE_int32(TEST_delay_execute_async_ms); |
83 | | DECLARE_int32(retryable_request_timeout_secs); |
84 | | DECLARE_bool(enable_lease_revocation); |
85 | | DECLARE_bool(rocksdb_disable_compactions); |
86 | | DECLARE_int32(rocksdb_level0_slowdown_writes_trigger); |
87 | | DECLARE_int32(rocksdb_level0_stop_writes_trigger); |
88 | | DECLARE_bool(flush_rocksdb_on_shutdown); |
89 | | DECLARE_int32(memstore_size_mb); |
90 | | DECLARE_int64(global_memstore_size_mb_max); |
91 | | DECLARE_bool(TEST_allow_stop_writes); |
92 | | DECLARE_int32(yb_num_shards_per_tserver); |
93 | | DECLARE_int32(ysql_num_shards_per_tserver); |
94 | | DECLARE_int32(transaction_table_num_tablets); |
95 | | DECLARE_int32(transaction_table_num_tablets_per_tserver); |
96 | | DECLARE_int32(TEST_tablet_inject_latency_on_apply_write_txn_ms); |
97 | | DECLARE_bool(TEST_log_cache_skip_eviction); |
98 | | DECLARE_uint64(sst_files_hard_limit); |
99 | | DECLARE_uint64(sst_files_soft_limit); |
100 | | DECLARE_int32(timestamp_history_retention_interval_sec); |
101 | | DECLARE_int32(raft_heartbeat_interval_ms); |
102 | | DECLARE_int32(history_cutoff_propagation_interval_ms); |
103 | | DECLARE_int32(TEST_preparer_batch_inject_latency_ms); |
104 | | DECLARE_double(leader_failure_max_missed_heartbeat_periods); |
105 | | DECLARE_int32(TEST_backfill_sabotage_frequency); |
106 | | DECLARE_string(regular_tablets_data_block_key_value_encoding); |
107 | | DECLARE_string(compression_type); |
108 | | |
109 | | namespace yb { |
110 | | namespace client { |
111 | | |
112 | | using ql::RowsResult; |
113 | | |
114 | | namespace { |
115 | | |
116 | | const std::string kKeyColumn = "key"s; |
117 | | const std::string kRangeKey1Column = "range_key1"s; |
118 | | const std::string kRangeKey2Column = "range_key2"s; |
119 | | const std::string kValueColumn = "int_val"s; |
120 | | const YBTableName kTable1Name(YQL_DATABASE_CQL, "my_keyspace", "ql_client_test_table1"); |
121 | | const YBTableName kTable2Name(YQL_DATABASE_CQL, "my_keyspace", "ql_client_test_table2"); |
122 | | |
123 | 0 | int32_t ValueForKey(int32_t key) { |
124 | 0 | return key * 2; |
125 | 0 | } |
126 | | |
127 | | const int kTotalKeys = 250; |
128 | | const int kBigSeqNo = 100500; |
129 | | |
130 | | } // namespace |
131 | | |
132 | | class QLTabletTest : public QLDmlTestBase<MiniCluster> { |
133 | | protected: |
134 | 29 | void SetUp() override { |
135 | 29 | server::SkewedClock::Register(); |
136 | 29 | FLAGS_time_source = server::SkewedClock::kName; |
137 | 29 | QLDmlTestBase::SetUp(); |
138 | 29 | } |
139 | | |
140 | 0 | void CreateTables(uint64_t initial_seqno1, uint64_t initial_seqno2) { |
141 | 0 | google::FlagSaver saver; |
142 | 0 | FLAGS_initial_seqno = initial_seqno1; |
143 | 0 | CreateTable(kTable1Name, &table1_); |
144 | 0 | FLAGS_initial_seqno = initial_seqno2; |
145 | 0 | CreateTable(kTable2Name, &table2_); |
146 | 0 | } |
147 | | |
148 | | std::shared_ptr<YBqlWriteOp> CreateSetValueOp( |
149 | 0 | int32_t key, int32_t value, const TableHandle& table) { |
150 | 0 | const auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
151 | 0 | auto* const req = op->mutable_request(); |
152 | 0 | QLAddInt32HashValue(req, key); |
153 | 0 | table.AddInt32ColumnValue(req, kValueColumn, value); |
154 | 0 | return op; |
155 | 0 | } |
156 | | |
157 | 0 | void SetValue(const YBSessionPtr& session, int32_t key, int32_t value, const TableHandle& table) { |
158 | 0 | auto op = CreateSetValueOp(key, value, table); |
159 | 0 | ASSERT_OK(session->ApplyAndFlush(op)); |
160 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status()) |
161 | 0 | << op->response().error_message(); |
162 | 0 | } |
163 | | |
164 | | boost::optional<int32_t> GetValue( |
165 | 0 | const YBSessionPtr& session, int32_t key, const TableHandle& table) { |
166 | 0 | const auto op = CreateReadOp(key, table); |
167 | 0 | EXPECT_OK(session->ApplyAndFlush(op)); |
168 | 0 | auto rowblock = RowsResult(op.get()).GetRowBlock(); |
169 | 0 | if (rowblock->row_count() == 0) { |
170 | 0 | return boost::none; |
171 | 0 | } |
172 | 0 | EXPECT_EQ(1, rowblock->row_count()); |
173 | 0 | const auto& value = rowblock->row(0).column(0); |
174 | 0 | EXPECT_TRUE(value.value().has_int32_value()) << "Value: " << value.value().ShortDebugString(); |
175 | 0 | return value.int32_value(); |
176 | 0 | } |
177 | | |
178 | 0 | std::shared_ptr<YBqlReadOp> CreateReadOp(int32_t key, const TableHandle& table) { |
179 | 0 | return client::CreateReadOp(key, table, kValueColumn); |
180 | 0 | } |
181 | | |
182 | | void CreateTable( |
183 | | const YBTableName& table_name, TableHandle* table, int num_tablets = 0, |
184 | 0 | bool transactional = false) { |
185 | 0 | YBSchemaBuilder builder; |
186 | 0 | builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull(); |
187 | 0 | builder.AddColumn(kValueColumn)->Type(INT32); |
188 | |
|
189 | 0 | if (num_tablets == 0) { |
190 | 0 | num_tablets = CalcNumTablets(3); |
191 | 0 | } |
192 | 0 | if (transactional) { |
193 | 0 | TableProperties table_properties; |
194 | 0 | table_properties.SetTransactional(true); |
195 | 0 | builder.SetTableProperties(table_properties); |
196 | 0 | } |
197 | 0 | ASSERT_OK(table->Create(table_name, num_tablets, client_.get(), &builder)); |
198 | 0 | } |
199 | | |
200 | 0 | YBSessionPtr CreateSession() { |
201 | 0 | auto session = client_->NewSession(); |
202 | 0 | session->SetTimeout(15s); |
203 | 0 | return session; |
204 | 0 | } |
205 | | |
206 | 0 | void FillTable(int begin, int end, const TableHandle& table) { |
207 | 0 | { |
208 | 0 | auto session = CreateSession(); |
209 | 0 | for (int i = begin; i != end; ++i) { |
210 | 0 | SetValue(session, i, ValueForKey(i), table); |
211 | 0 | } |
212 | 0 | } |
213 | 0 | VerifyTable(begin, end, table); |
214 | 0 | ASSERT_OK(WaitSync(begin, end, table)); |
215 | 0 | } |
216 | | |
217 | | CHECKED_STATUS BatchedFillTable( |
218 | 0 | const int begin, const int end, const int batch_size, const TableHandle& table) { |
219 | 0 | { |
220 | 0 | auto session = CreateSession(); |
221 | 0 | for (int i = begin; i != end; ++i) { |
222 | 0 | auto op = CreateSetValueOp(i, ValueForKey(i), table); |
223 | 0 | if ((i - begin + 1) % batch_size == 0 || i == end) { |
224 | 0 | RETURN_NOT_OK(session->ApplyAndFlush(op)); |
225 | 0 | } else { |
226 | 0 | session->Apply(op); |
227 | 0 | } |
228 | 0 | } |
229 | 0 | } |
230 | 0 | return WaitSync(begin, end, table); |
231 | 0 | } |
232 | | |
233 | 0 | void VerifyTable(int begin, int end, const TableHandle& table) { |
234 | 0 | auto session = CreateSession(); |
235 | 0 | for (int i = begin; i != end; ++i) { |
236 | 0 | auto value = GetValue(session, i, table); |
237 | 0 | ASSERT_TRUE(value.is_initialized()) << "i: " << i << ", table: " << table->name().ToString(); |
238 | 0 | ASSERT_EQ(ValueForKey(i), *value) << "i: " << i << ", table: " << table->name().ToString(); |
239 | 0 | } |
240 | 0 | } |
241 | | |
242 | | typedef std::pair<std::vector<std::string>, std::unordered_set<std::string>> TabletIdsAndReplicas; |
243 | | |
244 | 0 | Result<TabletIdsAndReplicas> GetTabletIdsAndReplicas(const TableHandle& table) { |
245 | 0 | std::vector<std::string> tablet_ids; |
246 | 0 | std::unordered_set<std::string> replicas; |
247 | |
|
248 | 0 | auto tablet_infos = GetTabletInfos(table.name()); |
249 | 0 | if (!tablet_infos) { |
250 | 0 | return STATUS_FORMAT(NotFound, |
251 | 0 | "No tablet information found for $0", |
252 | 0 | table->name()); |
253 | 0 | } |
254 | 0 | for (auto tablet_info : *tablet_infos) { |
255 | 0 | tablet_ids.emplace_back(tablet_info->tablet_id()); |
256 | 0 | auto replica_map = tablet_info->GetReplicaLocations(); |
257 | 0 | for (auto it = replica_map->begin(); it != replica_map->end(); it++) { |
258 | 0 | replicas.insert(it->first); |
259 | 0 | } |
260 | 0 | } |
261 | 0 | return TabletIdsAndReplicas(tablet_ids, replicas); |
262 | 0 | } |
263 | | |
264 | 0 | CHECKED_STATUS WaitSync(int begin, int end, const TableHandle& table) { |
265 | 0 | auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(30); |
266 | 0 | TabletIdsAndReplicas info = VERIFY_RESULT(GetTabletIdsAndReplicas(table)); |
267 | 0 | std::vector<std::string> tablet_ids = info.first; |
268 | 0 | std::unordered_set<std::string> replicas = info.second; |
269 | 0 | for (const auto& replica : replicas) { |
270 | 0 | RETURN_NOT_OK(DoWaitSync(deadline, tablet_ids, replica, begin, end, table)); |
271 | 0 | } |
272 | 0 | return Status::OK(); |
273 | 0 | } |
274 | | |
275 | | CHECKED_STATUS DoWaitSync( |
276 | | const MonoTime& deadline, |
277 | | const std::vector<std::string>& tablet_ids, |
278 | | const std::string& replica, |
279 | | int begin, |
280 | | int end, |
281 | 0 | const TableHandle& table) { |
282 | 0 | auto tserver = cluster_->find_tablet_server(replica); |
283 | 0 | if (!tserver) { |
284 | 0 | return STATUS_FORMAT(NotFound, "Tablet server for $0 not found", replica); |
285 | 0 | } |
286 | 0 | auto endpoint = tserver->server()->rpc_server()->GetBoundAddresses().front(); |
287 | 0 | auto proxy = std::make_unique<tserver::TabletServerServiceProxy>( |
288 | 0 | &tserver->server()->proxy_cache(), HostPort::FromBoundEndpoint(endpoint)); |
289 | |
|
290 | 0 | auto condition = [&]() -> Result<bool> { |
291 | 0 | for (int i = begin; i != end; ++i) { |
292 | 0 | bool found = false; |
293 | 0 | for (const std::string& tablet_id : tablet_ids) { |
294 | 0 | tserver::ReadRequestPB req; |
295 | 0 | { |
296 | 0 | std::string partition_key; |
297 | 0 | auto op = CreateReadOp(i, table); |
298 | 0 | RETURN_NOT_OK(op->GetPartitionKey(&partition_key)); |
299 | 0 | auto* ql_batch = req.add_ql_batch(); |
300 | 0 | *ql_batch = op->request(); |
301 | 0 | const auto& hash_code = PartitionSchema::DecodeMultiColumnHashValue(partition_key); |
302 | 0 | ql_batch->set_hash_code(hash_code); |
303 | 0 | ql_batch->set_max_hash_code(hash_code); |
304 | 0 | } |
305 | |
|
306 | 0 | rpc::RpcController controller; |
307 | 0 | controller.set_timeout(MonoDelta::FromSeconds(1)); |
308 | 0 | req.set_tablet_id(tablet_id); |
309 | 0 | req.set_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX); |
310 | |
|
311 | 0 | tserver::ReadResponsePB resp; |
312 | 0 | RETURN_NOT_OK(proxy->Read(req, &resp, &controller)); |
313 | |
|
314 | 0 | const auto& ql_batch = resp.ql_batch(0); |
315 | 0 | if (ql_batch.status() != QLResponsePB_QLStatus_YQL_STATUS_OK) { |
316 | 0 | return STATUS_FORMAT(RemoteError, |
317 | 0 | "Bad resp status: $0", |
318 | 0 | QLResponsePB_QLStatus_Name(ql_batch.status())); |
319 | 0 | } |
320 | 0 | Schema projection; |
321 | 0 | vector<ColumnId> column_refs; |
322 | 0 | column_refs.emplace_back(table.ColumnId(kValueColumn)); |
323 | 0 | Schema total_schema = client::internal::GetSchema(table->schema()); |
324 | 0 | RETURN_NOT_OK(total_schema.CreateProjectionByIdsIgnoreMissing(column_refs, &projection)); |
325 | 0 | std::shared_ptr<std::vector<ColumnSchema>> columns = |
326 | 0 | std::make_shared<std::vector<ColumnSchema>>(YBSchema(projection).columns()); |
327 | |
|
328 | 0 | Slice data = VERIFY_RESULT(controller.GetSidecar(ql_batch.rows_data_sidecar())); |
329 | 0 | yb::ql::RowsResult result(table->name(), columns, data.ToBuffer()); |
330 | 0 | auto row_block = result.GetRowBlock(); |
331 | 0 | if (row_block->row_count() == 1) { |
332 | 0 | if (found) { |
333 | 0 | return STATUS_FORMAT(Corruption, "Key found twice: $0", i); |
334 | 0 | } |
335 | 0 | auto value = row_block->row(0).column(0).int32_value(); |
336 | 0 | if (value != ValueForKey(i)) { |
337 | 0 | return STATUS_FORMAT(Corruption, |
338 | 0 | "Wrong value for key: $0, expected: $1", |
339 | 0 | value, |
340 | 0 | ValueForKey(i)); |
341 | 0 | } |
342 | 0 | found = true; |
343 | 0 | } |
344 | 0 | } |
345 | 0 | if (!found) { |
346 | 0 | LOG(INFO) << "Key not found: " << i; |
347 | 0 | return false; |
348 | 0 | } |
349 | 0 | } |
350 | 0 | return true; |
351 | 0 | }; |
352 | |
|
353 | 0 | return Wait(condition, deadline, "Waiting for replication"); |
354 | 0 | } |
355 | | |
356 | | CHECKED_STATUS VerifyConsistency( |
357 | 0 | int begin, int end, const TableHandle& table, int expected_rows_mismatched = 0) { |
358 | 0 | auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(30); |
359 | 0 | TabletIdsAndReplicas info = VERIFY_RESULT(GetTabletIdsAndReplicas(table)); |
360 | 0 | std::vector<std::string> tablet_ids = info.first; |
361 | 0 | std::unordered_set<std::string> replicas = info.second; |
362 | |
|
363 | 0 | for (const auto& replica : replicas) { |
364 | 0 | RETURN_NOT_OK( |
365 | 0 | DoVerify(deadline, tablet_ids, replica, begin, end, table, expected_rows_mismatched)); |
366 | 0 | } |
367 | 0 | return Status::OK(); |
368 | 0 | } |
369 | | |
370 | | CHECKED_STATUS DoVerify( |
371 | | const MonoTime& deadline, |
372 | | const std::vector<std::string>& tablet_ids, |
373 | | const std::string& replica, |
374 | | const int begin, |
375 | | const int end, |
376 | | const TableHandle& table, |
377 | 0 | const size_t expected_rows_mismatched) { |
378 | 0 | auto tserver = cluster_->find_tablet_server(replica); |
379 | 0 | if (!tserver) { |
380 | 0 | return STATUS_FORMAT(NotFound, "Tablet server for $0 not found", replica); |
381 | 0 | } |
382 | 0 | auto endpoint = tserver->server()->rpc_server()->GetBoundAddresses().front(); |
383 | 0 | auto proxy = std::make_unique<tserver::TabletServerServiceProxy>( |
384 | 0 | &tserver->server()->proxy_cache(), HostPort::FromBoundEndpoint(endpoint)); |
385 | |
|
386 | 0 | CHECK_EQ(tablet_ids.size(), 1); |
387 | 0 | for (const string& tablet_id : tablet_ids) { |
388 | 0 | tserver::VerifyTableRowRangeRequestPB req; |
389 | 0 | tserver::VerifyTableRowRangeResponsePB resp; |
390 | |
|
391 | 0 | req.set_tablet_id(tablet_id); |
392 | 0 | req.set_num_rows(end - begin + 1); |
393 | 0 | req.clear_start_key(); // empty string indicates start scan from start |
394 | | // read_time: if left empty, the safe time retrieved will be used instead |
395 | |
|
396 | 0 | rpc::RpcController controller; |
397 | 0 | controller.set_deadline(deadline); |
398 | 0 | RETURN_NOT_OK(proxy->VerifyTableRowRange(req, &resp, &controller)); |
399 | |
|
400 | 0 | if (resp.consistency_stats().size() == 0) { |
401 | 0 | return STATUS_FORMAT( |
402 | 0 | NotFound, |
403 | 0 | "Individual index consistency state missing for table $0.", |
404 | 0 | table.table()->id()); |
405 | 0 | } |
406 | 0 | for (auto it = resp.consistency_stats().begin(); it != resp.consistency_stats().end(); it++) { |
407 | 0 | if (it->second != expected_rows_mismatched) { |
408 | 0 | return STATUS_FORMAT( |
409 | 0 | Corruption, |
410 | 0 | "Incorrect number of rows reported to be dropped for index $0 built on table $1:" |
411 | 0 | "found $2 rows reported when $3 rows mismatched was expected.", |
412 | 0 | it->first, table.table()->id(), it->second, expected_rows_mismatched); |
413 | 0 | } |
414 | 0 | } |
415 | 0 | } |
416 | |
|
417 | 0 | return Status::OK(); |
418 | 0 | } |
419 | | |
420 | 0 | CHECKED_STATUS Import() { |
421 | 0 | std::this_thread::sleep_for(1s); // Wait until all tablets a synced and flushed. |
422 | 0 | EXPECT_OK(cluster_->FlushTablets()); |
423 | |
|
424 | 0 | auto source_infos = VERIFY_RESULT(GetTabletInfos(kTable1Name)); |
425 | 0 | auto dest_infos = VERIFY_RESULT(GetTabletInfos(kTable2Name)); |
426 | 0 | EXPECT_EQ(source_infos.size(), dest_infos.size()); |
427 | 0 | for (size_t i = 0; i != source_infos.size(); ++i) { |
428 | 0 | std::string start1, end1, start2, end2; |
429 | 0 | { |
430 | 0 | auto& metadata = source_infos[i]->metadata(); |
431 | 0 | SharedLock<std::remove_reference<decltype(metadata)>::type> lock(metadata); |
432 | 0 | const auto& partition = metadata.state().pb.partition(); |
433 | 0 | start1 = partition.partition_key_start(); |
434 | 0 | end1 = partition.partition_key_end(); |
435 | 0 | } |
436 | 0 | { |
437 | 0 | auto& metadata = dest_infos[i]->metadata(); |
438 | 0 | SharedLock<std::remove_reference<decltype(metadata)>::type> lock(metadata); |
439 | 0 | const auto& partition = metadata.state().pb.partition(); |
440 | 0 | start2 = partition.partition_key_start(); |
441 | 0 | end2 = partition.partition_key_end(); |
442 | 0 | } |
443 | 0 | EXPECT_EQ(start1, start2); |
444 | 0 | EXPECT_EQ(end1, end2); |
445 | 0 | } |
446 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
447 | 0 | auto* tablet_manager = cluster_->mini_tablet_server(i)->server()->tablet_manager(); |
448 | 0 | for (size_t j = 0; j != source_infos.size(); ++j) { |
449 | 0 | tablet::TabletPeerPtr source_peer, dest_peer; |
450 | 0 | tablet_manager->LookupTablet(source_infos[j]->id(), &source_peer); |
451 | 0 | EXPECT_NE(nullptr, source_peer); |
452 | 0 | auto source_dir = source_peer->tablet()->metadata()->rocksdb_dir(); |
453 | 0 | tablet_manager->LookupTablet(dest_infos[j]->id(), &dest_peer); |
454 | 0 | EXPECT_NE(nullptr, dest_peer); |
455 | 0 | auto status = dest_peer->tablet()->ImportData(source_dir); |
456 | 0 | if (!status.ok() && !status.IsNotFound()) { |
457 | 0 | return status; |
458 | 0 | } |
459 | 0 | } |
460 | 0 | } |
461 | 0 | return Status::OK(); |
462 | 0 | } |
463 | | |
464 | 0 | Result<scoped_refptr<master::TableInfo>> GetTableInfo(const YBTableName& table_name) { |
465 | 0 | auto& catalog_manager = |
466 | 0 | VERIFY_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager(); |
467 | 0 | auto all_tables = catalog_manager.GetTables(master::GetTablesMode::kAll); |
468 | 0 | for (const auto& table : all_tables) { |
469 | 0 | if (table->name() == table_name.table_name()) { |
470 | 0 | return table; |
471 | 0 | } |
472 | 0 | } |
473 | 0 | return STATUS_FORMAT(NotFound, "Table $0 not found", table_name); |
474 | 0 | } |
475 | | |
476 | 0 | Result<master::TabletInfos> GetTabletInfos(const YBTableName& table_name) { |
477 | 0 | return VERIFY_RESULT(GetTableInfo(table_name))->GetTablets(); |
478 | 0 | } |
479 | | |
480 | | Status WaitForTableCreation(const YBTableName& table_name, |
481 | 0 | master::IsCreateTableDoneResponsePB *resp) { |
482 | 0 | return LoggedWaitFor([=]() -> Result<bool> { |
483 | 0 | master::IsCreateTableDoneRequestPB req; |
484 | 0 | req.mutable_table()->set_table_name(table_name.table_name()); |
485 | 0 | req.mutable_table()->mutable_namespace_()->set_name(table_name.namespace_name()); |
486 | 0 | resp->Clear(); |
487 | |
|
488 | 0 | master::MasterDdlProxy master_proxy( |
489 | 0 | &client_->proxy_cache(), VERIFY_RESULT(cluster_->GetLeaderMasterBoundRpcAddr())); |
490 | 0 | rpc::RpcController rpc; |
491 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(30)); |
492 | |
|
493 | 0 | Status s = master_proxy.IsCreateTableDone(req, resp, &rpc); |
494 | 0 | return s.ok() && !resp->has_error(); |
495 | 0 | }, MonoDelta::FromSeconds(30), "Table Creation"); |
496 | 0 | } |
497 | | |
498 | | void TestDeletePartialKey(int num_range_keys_in_delete); |
499 | | |
500 | | void CreateAndVerifyIndexConsistency(int expected_number_rows_mismatched); |
501 | | |
502 | | TableHandle table1_; |
503 | | TableHandle table2_; |
504 | | }; |
505 | | |
506 | 0 | TEST_F(QLTabletTest, ImportToEmpty) { |
507 | 0 | CreateTables(0, kBigSeqNo); |
508 | |
|
509 | 0 | FillTable(0, kTotalKeys, table1_); |
510 | 0 | ASSERT_OK(Import()); |
511 | 0 | VerifyTable(0, kTotalKeys, table1_); |
512 | 0 | VerifyTable(0, kTotalKeys, table2_); |
513 | 0 | } |
514 | | |
515 | 0 | TEST_F(QLTabletTest, ImportToNonEmpty) { |
516 | 0 | CreateTables(0, kBigSeqNo); |
517 | |
|
518 | 0 | FillTable(0, kTotalKeys, table1_); |
519 | 0 | FillTable(kTotalKeys, 2 * kTotalKeys, table2_); |
520 | 0 | ASSERT_OK(Import()); |
521 | 0 | VerifyTable(0, 2 * kTotalKeys, table2_); |
522 | 0 | } |
523 | | |
524 | 0 | TEST_F(QLTabletTest, ImportToEmptyAndRestart) { |
525 | 0 | CreateTables(0, kBigSeqNo); |
526 | |
|
527 | 0 | FillTable(0, kTotalKeys, table1_); |
528 | 0 | ASSERT_OK(Import()); |
529 | 0 | VerifyTable(0, kTotalKeys, table2_); |
530 | |
|
531 | 0 | ASSERT_OK(cluster_->RestartSync()); |
532 | 0 | VerifyTable(0, kTotalKeys, table1_); |
533 | 0 | VerifyTable(0, kTotalKeys, table2_); |
534 | 0 | } |
535 | | |
536 | 0 | TEST_F(QLTabletTest, ImportToNonEmptyAndRestart) { |
537 | 0 | CreateTables(0, kBigSeqNo); |
538 | |
|
539 | 0 | FillTable(0, kTotalKeys, table1_); |
540 | 0 | FillTable(kTotalKeys, 2 * kTotalKeys, table2_); |
541 | |
|
542 | 0 | ASSERT_OK(Import()); |
543 | 0 | VerifyTable(0, 2 * kTotalKeys, table2_); |
544 | |
|
545 | 0 | ASSERT_OK(cluster_->RestartSync()); |
546 | 0 | VerifyTable(0, kTotalKeys, table1_); |
547 | 0 | VerifyTable(0, 2 * kTotalKeys, table2_); |
548 | 0 | } |
549 | | |
550 | 0 | TEST_F(QLTabletTest, LateImport) { |
551 | 0 | CreateTables(kBigSeqNo, 0); |
552 | |
|
553 | 0 | FillTable(0, kTotalKeys, table1_); |
554 | 0 | ASSERT_NOK(Import()); |
555 | 0 | } |
556 | | |
557 | 0 | TEST_F(QLTabletTest, OverlappedImport) { |
558 | 0 | CreateTables(kBigSeqNo - 2, kBigSeqNo); |
559 | |
|
560 | 0 | FillTable(0, kTotalKeys, table1_); |
561 | 0 | FillTable(kTotalKeys, 2 * kTotalKeys, table2_); |
562 | 0 | ASSERT_NOK(Import()); |
563 | 0 | } |
564 | | |
565 | 0 | void QLTabletTest::CreateAndVerifyIndexConsistency(const int expected_number_rows_mismatched) { |
566 | 0 | CreateTable(kTable1Name, &table1_, 1, true); |
567 | 0 | FillTable(0, kTotalKeys, table1_); |
568 | |
|
569 | 0 | TableHandle index_table; |
570 | 0 | kv_table_test::CreateIndex( |
571 | 0 | yb::client::Transactional::kTrue, 1, false, table1_, client_.get(), &index_table); |
572 | |
|
573 | 0 | ASSERT_OK(client_->WaitUntilIndexPermissionsAtLeast( |
574 | 0 | kTable1Name, index_table.name(), INDEX_PERM_READ_WRITE_AND_DELETE, 100ms /* max_wait */)); |
575 | 0 | ASSERT_OK(VerifyConsistency( |
576 | 0 | 0, kTotalKeys - 1, table1_, |
577 | 0 | expected_number_rows_mismatched)); // no missing indexed rows check |
578 | 0 | ASSERT_OK(VerifyConsistency( |
579 | 0 | 0, kTotalKeys - 1, index_table, expected_number_rows_mismatched)); // no orphans check |
580 | 0 | } |
581 | | |
582 | 0 | TEST_F(QLTabletTest, VerifyIndexRange) { CreateAndVerifyIndexConsistency(0); } |
583 | | |
584 | 0 | TEST_F(QLTabletTest, VerifyIndexRangeWithInconsistentTable) { |
585 | 0 | FLAGS_TEST_backfill_sabotage_frequency = 10; |
586 | 0 | const int kRowsDropped = kTotalKeys / FLAGS_TEST_backfill_sabotage_frequency; |
587 | 0 | CreateAndVerifyIndexConsistency(kRowsDropped); |
588 | 0 | } |
589 | | |
590 | | // Test expected number of tablets for transactions table - added for #2293. |
591 | 0 | TEST_F(QLTabletTest, TransactionsTableTablets) { |
592 | 0 | FLAGS_yb_num_shards_per_tserver = 1; |
593 | 0 | FLAGS_ysql_num_shards_per_tserver = 2; |
594 | 0 | FLAGS_transaction_table_num_tablets = 0; |
595 | 0 | FLAGS_transaction_table_num_tablets_per_tserver = 4; |
596 | |
|
597 | 0 | YBSchemaBuilder builder; |
598 | 0 | builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull(); |
599 | 0 | builder.AddColumn(kValueColumn)->Type(INT32); |
600 | | |
601 | | // Create transactional table. |
602 | 0 | TableProperties table_properties; |
603 | 0 | table_properties.SetTransactional(true); |
604 | 0 | builder.SetTableProperties(table_properties); |
605 | |
|
606 | 0 | TableHandle table; |
607 | 0 | ASSERT_OK(table.Create(kTable1Name, 8, client_.get(), &builder)); |
608 | | |
609 | | // Wait for transactions table to be created. |
610 | 0 | YBTableName table_name( |
611 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, kGlobalTransactionsTableName); |
612 | 0 | master::IsCreateTableDoneResponsePB resp; |
613 | 0 | ASSERT_OK(WaitForTableCreation(table_name, &resp)); |
614 | 0 | ASSERT_TRUE(resp.done()); |
615 | |
|
616 | 0 | auto tablets = ASSERT_RESULT(GetTabletInfos(table_name)); |
617 | 0 | ASSERT_EQ( |
618 | 0 | tablets.size(), |
619 | 0 | cluster_->num_tablet_servers() * FLAGS_transaction_table_num_tablets_per_tserver); |
620 | 0 | } |
621 | | |
622 | 0 | void DoStepDowns(MiniCluster* cluster) { |
623 | 0 | for (int j = 0; j != 5; ++j) { |
624 | 0 | StepDownAllTablets(cluster); |
625 | 0 | std::this_thread::sleep_for(5s); |
626 | 0 | } |
627 | 0 | } |
628 | | |
629 | 0 | void VerifyLogIndicies(MiniCluster* cluster) { |
630 | 0 | for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) { |
631 | 0 | auto peers = cluster->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers(); |
632 | |
|
633 | 0 | for (const auto& peer : peers) { |
634 | 0 | int64_t index = ASSERT_RESULT(peer->GetEarliestNeededLogIndex()); |
635 | 0 | ASSERT_EQ(peer->consensus()->GetLastCommittedOpId().index, index); |
636 | 0 | } |
637 | 0 | } |
638 | 0 | } |
639 | | |
640 | | namespace { |
641 | | |
642 | | constexpr auto kRetryableRequestTimeoutSecs = 4; |
643 | | |
644 | | } // namespace |
645 | | |
646 | 0 | TEST_F(QLTabletTest, GCLogWithoutWrites) { |
647 | 0 | SetAtomicFlag(kRetryableRequestTimeoutSecs, &FLAGS_retryable_request_timeout_secs); |
648 | |
|
649 | 0 | TableHandle table; |
650 | 0 | CreateTable(kTable1Name, &table); |
651 | |
|
652 | 0 | FillTable(0, kTotalKeys, table); |
653 | |
|
654 | 0 | std::this_thread::sleep_for(1s * (kRetryableRequestTimeoutSecs + 1)); |
655 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
656 | 0 | DoStepDowns(cluster_.get()); |
657 | 0 | VerifyLogIndicies(cluster_.get()); |
658 | 0 | } |
659 | | |
660 | 0 | TEST_F(QLTabletTest, GCLogWithRestartWithoutWrites) { |
661 | 0 | SetAtomicFlag(kRetryableRequestTimeoutSecs, &FLAGS_retryable_request_timeout_secs); |
662 | |
|
663 | 0 | TableHandle table; |
664 | 0 | CreateTable(kTable1Name, &table); |
665 | |
|
666 | 0 | FillTable(0, kTotalKeys, table); |
667 | |
|
668 | 0 | std::this_thread::sleep_for(1s * (kRetryableRequestTimeoutSecs + 1)); |
669 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
670 | |
|
671 | 0 | ASSERT_OK(cluster_->RestartSync()); |
672 | |
|
673 | 0 | DoStepDowns(cluster_.get()); |
674 | 0 | VerifyLogIndicies(cluster_.get()); |
675 | 0 | } |
676 | | |
677 | 0 | TEST_F(QLTabletTest, LeaderLease) { |
678 | 0 | SetAtomicFlag(false, &FLAGS_enable_lease_revocation); |
679 | |
|
680 | 0 | TableHandle table; |
681 | 0 | CreateTable(kTable1Name, &table); |
682 | |
|
683 | 0 | LOG(INFO) << "Filling table"; |
684 | 0 | FillTable(0, kTotalKeys, table); |
685 | |
|
686 | 0 | auto old_lease_ms = GetAtomicFlag(&FLAGS_leader_lease_duration_ms); |
687 | 0 | SetAtomicFlag(60 * 1000, &FLAGS_leader_lease_duration_ms); |
688 | | // Wait for lease to sync. |
689 | 0 | std::this_thread::sleep_for(2ms * old_lease_ms); |
690 | |
|
691 | 0 | LOG(INFO) << "Step down"; |
692 | 0 | StepDownAllTablets(cluster_.get()); |
693 | |
|
694 | 0 | LOG(INFO) << "Write value"; |
695 | 0 | auto session = CreateSession(); |
696 | 0 | const auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
697 | 0 | auto* const req = op->mutable_request(); |
698 | 0 | QLAddInt32HashValue(req, 1); |
699 | 0 | table.AddInt32ColumnValue(req, kValueColumn, 1); |
700 | 0 | auto status = session->ApplyAndFlush(op); |
701 | 0 | ASSERT_TRUE(status.IsIOError()) << "Status: " << status; |
702 | 0 | } |
703 | | |
704 | | // This test tries to catch situation when some entries were applied and flushed in RocksDB, |
705 | | // but is not present in persistent logs. |
706 | | // |
707 | | // If that happens than we would get situation that after restart some node has records |
708 | | // in RocksDB, but does not have log records for it. And would not be able to restore last |
709 | | // hybrid time, also this node would not be able to remotely bootstrap other nodes. |
710 | | // |
711 | | // So we just delay one of follower logs and write a random key. |
712 | | // Checking that flushed op id in RocksDB does not exceed last op id in logs. |
713 | 0 | TEST_F(QLTabletTest, WaitFlush) { |
714 | 0 | google::FlagSaver saver; |
715 | |
|
716 | 0 | constexpr int kNumTablets = 1; // Use single tablet to increase chance of bad scenario. |
717 | 0 | FLAGS_db_write_buffer_size = 10; // Use small memtable to induce background flush on each write. |
718 | |
|
719 | 0 | TestWorkload workload(cluster_.get()); |
720 | 0 | workload.set_table_name(kTable1Name); |
721 | 0 | workload.set_write_timeout_millis(30000); |
722 | 0 | workload.set_num_tablets(kNumTablets); |
723 | 0 | workload.set_num_write_threads(1); |
724 | 0 | workload.set_write_batch_size(1); |
725 | 0 | workload.set_payload_bytes(128); |
726 | 0 | workload.Setup(); |
727 | 0 | workload.Start(); |
728 | |
|
729 | 0 | std::vector<tablet::TabletPeerPtr> peers; |
730 | |
|
731 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
732 | 0 | auto tserver_peers = |
733 | 0 | cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers(); |
734 | 0 | ASSERT_EQ(tserver_peers.size(), 1); |
735 | 0 | peers.push_back(tserver_peers.front()); |
736 | 0 | } |
737 | |
|
738 | 0 | bool leader_found = false; |
739 | 0 | while (!leader_found) { |
740 | 0 | for (size_t i = 0; i != peers.size(); ++i) { |
741 | 0 | if (peers[i]->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) { |
742 | 0 | peers[(i + 1) % peers.size()]->log()->TEST_SetSleepDuration(500ms); |
743 | 0 | leader_found = true; |
744 | 0 | break; |
745 | 0 | } |
746 | 0 | } |
747 | 0 | } |
748 | |
|
749 | 0 | auto deadline = std::chrono::steady_clock::now() + 20s; |
750 | 0 | while (std::chrono::steady_clock::now() <= deadline) { |
751 | 0 | for (const auto& peer : peers) { |
752 | 0 | auto flushed_op_id = ASSERT_RESULT(peer->tablet()->MaxPersistentOpId()).regular; |
753 | 0 | auto latest_entry_op_id = peer->log()->GetLatestEntryOpId(); |
754 | 0 | ASSERT_LE(flushed_op_id.index, latest_entry_op_id.index); |
755 | 0 | } |
756 | 0 | } |
757 | |
|
758 | 0 | for (const auto& peer : peers) { |
759 | 0 | auto flushed_op_id = ASSERT_RESULT(peer->tablet()->MaxPersistentOpId()).regular; |
760 | 0 | ASSERT_GE(flushed_op_id.index, 100); |
761 | 0 | } |
762 | |
|
763 | 0 | workload.StopAndJoin(); |
764 | 0 | } |
765 | | |
766 | | |
767 | 0 | TEST_F(QLTabletTest, BoundaryValues) { |
768 | 0 | constexpr size_t kTotalThreads = 8; |
769 | 0 | constexpr int kTotalRows = 10000; |
770 | |
|
771 | 0 | TableHandle table; |
772 | 0 | CreateTable(kTable1Name, &table, 1); |
773 | |
|
774 | 0 | std::vector<std::thread> threads; |
775 | 0 | std::atomic<int> idx(0); |
776 | 0 | for (size_t t = 0; t != kTotalThreads; ++t) { |
777 | 0 | threads.emplace_back([this, &idx, &table] { |
778 | 0 | auto session = CreateSession(); |
779 | 0 | for (;;) { |
780 | 0 | auto i = idx++; |
781 | 0 | if (i >= kTotalRows) { |
782 | 0 | break; |
783 | 0 | } |
784 | | |
785 | 0 | SetValue(session, i, -i, table); |
786 | 0 | } |
787 | 0 | }); |
788 | 0 | } |
789 | 0 | const auto kSleepTime = NonTsanVsTsan(5s, 1s); |
790 | 0 | std::this_thread::sleep_for(kSleepTime); |
791 | 0 | LOG(INFO) << "Flushing tablets"; |
792 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
793 | 0 | std::this_thread::sleep_for(kSleepTime); |
794 | 0 | LOG(INFO) << "GC logs"; |
795 | 0 | ASSERT_OK(cluster_->CleanTabletLogs()); |
796 | 0 | LOG(INFO) << "Wait for threads"; |
797 | 0 | for (auto& thread : threads) { |
798 | 0 | thread.join(); |
799 | 0 | } |
800 | 0 | std::this_thread::sleep_for(kSleepTime * 5); |
801 | 0 | ASSERT_OK(cluster_->RestartSync()); |
802 | |
|
803 | 0 | size_t total_rows = 0; |
804 | 0 | for (const auto& row : TableRange(table)) { |
805 | 0 | EXPECT_EQ(row.column(0).int32_value(), -row.column(1).int32_value()); |
806 | 0 | ++total_rows; |
807 | 0 | } |
808 | 0 | ASSERT_EQ(kTotalRows, total_rows); |
809 | |
|
810 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
811 | 0 | std::this_thread::sleep_for(kSleepTime); |
812 | |
|
813 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
814 | 0 | auto peers = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers(); |
815 | 0 | ASSERT_EQ(1, peers.size()); |
816 | 0 | auto& peer = *peers[0]; |
817 | 0 | auto op_id = peer.log()->GetLatestEntryOpId(); |
818 | 0 | auto* db = peer.tablet()->TEST_db(); |
819 | 0 | int64_t max_index = 0; |
820 | 0 | int64_t min_index = std::numeric_limits<int64_t>::max(); |
821 | 0 | for (const auto& file : db->GetLiveFilesMetaData()) { |
822 | 0 | LOG(INFO) << "File: " << yb::ToString(file); |
823 | 0 | max_index = std::max( |
824 | 0 | max_index, |
825 | 0 | down_cast<docdb::ConsensusFrontier&>(*file.largest.user_frontier).op_id().index); |
826 | 0 | min_index = std::min( |
827 | 0 | min_index, |
828 | 0 | down_cast<docdb::ConsensusFrontier&>(*file.smallest.user_frontier).op_id().index); |
829 | 0 | } |
830 | | |
831 | | // Allow several entries for non write ops. |
832 | 0 | ASSERT_GE(max_index, op_id.index - 5); |
833 | 0 | ASSERT_LE(min_index, 5); |
834 | 0 | } |
835 | 0 | } |
836 | | |
837 | | // There was bug with MvccManager when clocks were skewed. |
838 | | // Client tries to read from follower and max safe time is requested w/o any limits, |
839 | | // so new operations could be added with HT lower than returned. |
840 | 0 | TEST_F(QLTabletTest, SkewedClocks) { |
841 | 0 | google::FlagSaver saver; |
842 | |
|
843 | 0 | auto delta_changers = SkewClocks(cluster_.get(), 50ms); |
844 | |
|
845 | 0 | TestWorkload workload(cluster_.get()); |
846 | 0 | workload.set_table_name(kTable1Name); |
847 | 0 | workload.set_write_timeout_millis(30000); |
848 | 0 | workload.set_num_tablets(12); |
849 | 0 | workload.set_num_write_threads(2); |
850 | 0 | workload.set_write_batch_size(1); |
851 | 0 | workload.set_payload_bytes(128); |
852 | 0 | workload.Setup(); |
853 | 0 | workload.Start(); |
854 | |
|
855 | 0 | while (workload.rows_inserted() < 100) { |
856 | 0 | std::this_thread::sleep_for(10ms); |
857 | 0 | } |
858 | |
|
859 | 0 | TableHandle table; |
860 | 0 | ASSERT_OK(table.Open(kTable1Name, client_.get())); |
861 | 0 | auto session = CreateSession(); |
862 | |
|
863 | 0 | for (int i = 0; i != 1000; ++i) { |
864 | 0 | auto op = table.NewReadOp(); |
865 | 0 | auto req = op->mutable_request(); |
866 | 0 | QLAddInt32HashValue(req, i); |
867 | 0 | auto value_column_id = table.ColumnId(kValueColumn); |
868 | 0 | req->add_selected_exprs()->set_column_id(value_column_id); |
869 | 0 | req->mutable_column_refs()->add_ids(value_column_id); |
870 | |
|
871 | 0 | QLRSColDescPB *rscol_desc = req->mutable_rsrow_desc()->add_rscol_descs(); |
872 | 0 | rscol_desc->set_name(kValueColumn); |
873 | 0 | table.ColumnType(kValueColumn)->ToQLTypePB(rscol_desc->mutable_ql_type()); |
874 | 0 | op->set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX); |
875 | 0 | ASSERT_OK(session->ApplyAndFlush(op)); |
876 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status()); |
877 | 0 | } |
878 | |
|
879 | 0 | workload.StopAndJoin(); |
880 | |
|
881 | 0 | cluster_->Shutdown(); // Need to shutdown cluster before resetting clock back. |
882 | 0 | cluster_.reset(); |
883 | 0 | } |
884 | | |
885 | 0 | TEST_F(QLTabletTest, LeaderChange) { |
886 | 0 | const int32_t kKey = 1; |
887 | 0 | const int32_t kValue1 = 2; |
888 | 0 | const int32_t kValue2 = 3; |
889 | 0 | const int32_t kValue3 = 4; |
890 | 0 | const int kNumTablets = 1; |
891 | |
|
892 | 0 | TableHandle table; |
893 | 0 | CreateTable(kTable1Name, &table, kNumTablets); |
894 | 0 | auto session = client_->NewSession(); |
895 | 0 | session->SetTimeout(60s); |
896 | | |
897 | | // Write kValue1 |
898 | 0 | SetValue(session, kKey, kValue1, table); |
899 | |
|
900 | 0 | std::string leader_id; |
901 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
902 | 0 | auto server = cluster_->mini_tablet_server(i)->server(); |
903 | 0 | auto peers = server->tablet_manager()->GetTabletPeers(); |
904 | 0 | for (const auto& peer : peers) { |
905 | 0 | if (peer->LeaderStatus() != consensus::LeaderStatus::NOT_LEADER) { |
906 | 0 | leader_id = server->permanent_uuid(); |
907 | 0 | break; |
908 | 0 | } |
909 | 0 | } |
910 | 0 | } |
911 | |
|
912 | 0 | LOG(INFO) << "Current leader: " << leader_id; |
913 | |
|
914 | 0 | ASSERT_NE(leader_id, ""); |
915 | |
|
916 | 0 | LOG(INFO) << "CAS " << kValue1 << " => " << kValue2; |
917 | 0 | const auto write_op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
918 | 0 | auto* const req = write_op->mutable_request(); |
919 | 0 | QLAddInt32HashValue(req, kKey); |
920 | 0 | table.AddInt32ColumnValue(req, kValueColumn, kValue2); |
921 | |
|
922 | 0 | table.SetColumn(req->add_column_values(), kValueColumn); |
923 | 0 | table.SetInt32Condition( |
924 | 0 | req->mutable_if_expr()->mutable_condition(), kValueColumn, QL_OP_EQUAL, kValue1); |
925 | 0 | req->mutable_column_refs()->add_ids(table.ColumnId(kValueColumn)); |
926 | 0 | session->Apply(write_op); |
927 | |
|
928 | 0 | SetAtomicFlag(30000, &FLAGS_TEST_delay_execute_async_ms); |
929 | 0 | auto flush_future = session->FlushFuture(); |
930 | 0 | std::this_thread::sleep_for(2s); |
931 | |
|
932 | 0 | SetAtomicFlag(0, &FLAGS_TEST_delay_execute_async_ms); |
933 | |
|
934 | 0 | LOG(INFO) << "Step down old leader"; |
935 | 0 | StepDownAllTablets(cluster_.get()); |
936 | | |
937 | | // Write other key to refresh leader cache. |
938 | | // Otherwise we would hang of locking the key. |
939 | 0 | LOG(INFO) << "Write other key"; |
940 | 0 | SetValue(session, kKey + 1, kValue1, table); |
941 | |
|
942 | 0 | LOG(INFO) << "Write " << kValue3; |
943 | 0 | SetValue(session, kKey, kValue3, table); |
944 | |
|
945 | 0 | ASSERT_EQ(GetValue(session, kKey, table), kValue3); |
946 | |
|
947 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
948 | 0 | auto server = cluster_->mini_tablet_server(i)->server(); |
949 | 0 | auto peers = server->tablet_manager()->GetTabletPeers(); |
950 | 0 | bool found = false; |
951 | 0 | for (const auto& peer : peers) { |
952 | 0 | if (peer->LeaderStatus() != consensus::LeaderStatus::NOT_LEADER) { |
953 | 0 | LOG(INFO) << "Request step down: " << server->permanent_uuid() << " => " << leader_id; |
954 | 0 | consensus::LeaderStepDownRequestPB req; |
955 | 0 | req.set_tablet_id(peer->tablet_id()); |
956 | 0 | req.set_new_leader_uuid(leader_id); |
957 | 0 | consensus::LeaderStepDownResponsePB resp; |
958 | 0 | ASSERT_OK(peer->consensus()->StepDown(&req, &resp)); |
959 | 0 | found = true; |
960 | 0 | break; |
961 | 0 | } |
962 | 0 | } |
963 | 0 | if (found) { |
964 | 0 | break; |
965 | 0 | } |
966 | 0 | } |
967 | |
|
968 | 0 | ASSERT_OK(flush_future.get().status); |
969 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, write_op->response().status()); |
970 | |
|
971 | 0 | ASSERT_EQ(GetValue(session, kKey, table), kValue3); |
972 | 0 | } |
973 | | |
974 | 0 | void QLTabletTest::TestDeletePartialKey(int num_range_keys_in_delete) { |
975 | 0 | YBSchemaBuilder builder; |
976 | 0 | builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull(); |
977 | 0 | builder.AddColumn(kRangeKey1Column)->Type(INT32)->PrimaryKey()->NotNull(); |
978 | 0 | builder.AddColumn(kRangeKey2Column)->Type(INT32)->PrimaryKey()->NotNull(); |
979 | 0 | builder.AddColumn(kValueColumn)->Type(INT32); |
980 | |
|
981 | 0 | TableHandle table; |
982 | 0 | ASSERT_OK(table.Create(kTable1Name, 1 /* num_tablets */, client_.get(), &builder)); |
983 | |
|
984 | 0 | const auto kValue1 = 2; |
985 | 0 | const auto kValue2 = 3; |
986 | 0 | const auto kTotalKeys = 200; |
987 | |
|
988 | 0 | auto session1 = CreateSession(); |
989 | 0 | auto session2 = CreateSession(); |
990 | 0 | for (int key = 1; key != kTotalKeys; ++key) { |
991 | 0 | { |
992 | 0 | const auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
993 | 0 | auto* const req = op->mutable_request(); |
994 | 0 | QLAddInt32HashValue(req, key); |
995 | 0 | QLAddInt32RangeValue(req, key); |
996 | 0 | QLAddInt32RangeValue(req, key); |
997 | 0 | table.AddInt32ColumnValue(req, kValueColumn, kValue1); |
998 | 0 | ASSERT_OK(session1->ApplyAndFlush(op)); |
999 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status()); |
1000 | 0 | } |
1001 | |
|
1002 | 0 | const auto op_del = table.NewWriteOp(QLWriteRequestPB::QL_STMT_DELETE); |
1003 | 0 | { |
1004 | 0 | auto* const req = op_del->mutable_request(); |
1005 | 0 | QLAddInt32HashValue(req, key); |
1006 | 0 | for (int i = 0; i != num_range_keys_in_delete; ++i) { |
1007 | 0 | QLAddInt32RangeValue(req, key); |
1008 | 0 | } |
1009 | 0 | session1->Apply(op_del); |
1010 | 0 | } |
1011 | |
|
1012 | 0 | const auto op_update = table.NewWriteOp(QLWriteRequestPB::QL_STMT_UPDATE); |
1013 | 0 | { |
1014 | 0 | auto* const req = op_update->mutable_request(); |
1015 | 0 | QLAddInt32HashValue(req, key); |
1016 | 0 | QLAddInt32RangeValue(req, key); |
1017 | 0 | QLAddInt32RangeValue(req, key); |
1018 | 0 | table.AddInt32ColumnValue(req, kValueColumn, kValue2); |
1019 | 0 | req->mutable_if_expr()->mutable_condition()->set_op(QL_OP_EXISTS); |
1020 | 0 | session2->Apply(op_update); |
1021 | 0 | } |
1022 | 0 | auto future_del = session1->FlushFuture(); |
1023 | 0 | auto future_update = session2->FlushFuture(); |
1024 | 0 | ASSERT_OK(future_del.get().status); |
1025 | 0 | ASSERT_OK(future_update.get().status); |
1026 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op_del->response().status()); |
1027 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op_update->response().status()); |
1028 | |
|
1029 | 0 | auto stored_value = GetValue(session1, key, table); |
1030 | 0 | ASSERT_TRUE(!stored_value) << "Key: " << key << ", value: " << *stored_value; |
1031 | 0 | } |
1032 | 0 | } |
1033 | | |
1034 | 0 | TEST_F(QLTabletTest, DeleteByHashKey) { |
1035 | 0 | TestDeletePartialKey(0); |
1036 | 0 | } |
1037 | | |
1038 | 0 | TEST_F(QLTabletTest, DeleteByHashAndPartialRangeKey) { |
1039 | 0 | TestDeletePartialKey(1); |
1040 | 0 | } |
1041 | | |
1042 | 0 | TEST_F(QLTabletTest, ManySstFilesBootstrap) { |
1043 | 0 | FLAGS_flush_rocksdb_on_shutdown = false; |
1044 | |
|
1045 | 0 | int key = 0; |
1046 | 0 | { |
1047 | 0 | google::FlagSaver flag_saver; |
1048 | |
|
1049 | 0 | size_t original_rocksdb_level0_stop_writes_trigger = 48; |
1050 | 0 | FLAGS_sst_files_hard_limit = std::numeric_limits<uint64_t>::max() / 4; |
1051 | 0 | FLAGS_sst_files_soft_limit = FLAGS_sst_files_hard_limit; |
1052 | 0 | FLAGS_rocksdb_level0_stop_writes_trigger = 10000; |
1053 | 0 | FLAGS_rocksdb_level0_slowdown_writes_trigger = 10000; |
1054 | 0 | FLAGS_rocksdb_disable_compactions = true; |
1055 | 0 | CreateTable(kTable1Name, &table1_, 1); |
1056 | |
|
1057 | 0 | auto session = CreateSession(); |
1058 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); |
1059 | 0 | ASSERT_EQ(peers.size(), 1); |
1060 | 0 | LOG(INFO) << "Leader: " << peers[0]->permanent_uuid(); |
1061 | 0 | int stop_key = 0; |
1062 | 0 | for (;;) { |
1063 | 0 | auto meta = peers[0]->tablet()->TEST_db()->GetLiveFilesMetaData(); |
1064 | 0 | LOG(INFO) << "Total files: " << meta.size(); |
1065 | |
|
1066 | 0 | ++key; |
1067 | 0 | SetValue(session, key, ValueForKey(key), table1_); |
1068 | 0 | if (meta.size() <= original_rocksdb_level0_stop_writes_trigger) { |
1069 | 0 | ASSERT_OK(peers[0]->tablet()->Flush(tablet::FlushMode::kSync)); |
1070 | 0 | stop_key = key + 10; |
1071 | 0 | } else if (key >= stop_key) { |
1072 | 0 | break; |
1073 | 0 | } |
1074 | 0 | } |
1075 | 0 | } |
1076 | |
|
1077 | 0 | cluster_->Shutdown(); |
1078 | |
|
1079 | 0 | LOG(INFO) << "Starting cluster"; |
1080 | |
|
1081 | 0 | ASSERT_OK(cluster_->StartSync()); |
1082 | |
|
1083 | 0 | LOG(INFO) << "Verify table"; |
1084 | |
|
1085 | 0 | VerifyTable(1, key, table1_); |
1086 | 0 | } |
1087 | | |
1088 | | class QLTabletTestSmallMemstore : public QLTabletTest { |
1089 | | public: |
1090 | 1 | void SetUp() override { |
1091 | 1 | FLAGS_memstore_size_mb = 1; |
1092 | 1 | FLAGS_global_memstore_size_mb_max = 1; |
1093 | 1 | QLTabletTest::SetUp(); |
1094 | 1 | } |
1095 | | }; |
1096 | | |
1097 | 0 | TEST_F_EX(QLTabletTest, DoubleFlush, QLTabletTestSmallMemstore) { |
1098 | 0 | SetAtomicFlag(false, &FLAGS_TEST_allow_stop_writes); |
1099 | |
|
1100 | 0 | TestWorkload workload(cluster_.get()); |
1101 | 0 | workload.set_table_name(kTable1Name); |
1102 | 0 | workload.set_write_timeout_millis(30000); |
1103 | 0 | workload.set_num_tablets(1); |
1104 | 0 | workload.set_num_write_threads(10); |
1105 | 0 | workload.set_write_batch_size(1); |
1106 | 0 | workload.set_payload_bytes(1_KB); |
1107 | 0 | workload.Setup(); |
1108 | 0 | workload.Start(); |
1109 | |
|
1110 | 0 | while (workload.rows_inserted() < RegularBuildVsSanitizers(75000, 20000)) { |
1111 | 0 | std::this_thread::sleep_for(10ms); |
1112 | 0 | } |
1113 | |
|
1114 | 0 | workload.StopAndJoin(); |
1115 | | |
1116 | | // Flush on rocksdb shutdown could produce second immutable memtable, that will stop writes. |
1117 | 0 | SetAtomicFlag(true, &FLAGS_TEST_allow_stop_writes); |
1118 | 0 | cluster_->Shutdown(); // Need to shutdown cluster before resetting clock back. |
1119 | 0 | cluster_.reset(); |
1120 | 0 | } |
1121 | | |
1122 | 0 | TEST_F(QLTabletTest, OperationMemTracking) { |
1123 | 0 | FLAGS_TEST_log_cache_skip_eviction = true; |
1124 | |
|
1125 | 0 | constexpr ssize_t kValueSize = 64_KB; |
1126 | 0 | const auto kWaitInterval = 50ms; |
1127 | |
|
1128 | 0 | YBSchemaBuilder builder; |
1129 | 0 | builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull(); |
1130 | 0 | builder.AddColumn(kValueColumn)->Type(STRING); |
1131 | |
|
1132 | 0 | TableHandle table; |
1133 | 0 | ASSERT_OK(table.Create(kTable1Name, CalcNumTablets(3), client_.get(), &builder)); |
1134 | |
|
1135 | 0 | FLAGS_TEST_tablet_inject_latency_on_apply_write_txn_ms = 1000; |
1136 | |
|
1137 | 0 | const auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
1138 | 0 | auto* const req = op->mutable_request(); |
1139 | 0 | QLAddInt32HashValue(req, 42); |
1140 | 0 | auto session = CreateSession(); |
1141 | 0 | table.AddStringColumnValue(req, kValueColumn, std::string(kValueSize, 'X')); |
1142 | 0 | session->Apply(op); |
1143 | 0 | auto future = session->FlushFuture(); |
1144 | 0 | auto server_tracker = MemTracker::GetRootTracker()->FindChild("server 1"); |
1145 | 0 | auto tablets_tracker = server_tracker->FindChild("Tablets"); |
1146 | 0 | auto log_tracker = server_tracker->FindChild("log_cache"); |
1147 | |
|
1148 | 0 | std::chrono::steady_clock::time_point deadline; |
1149 | 0 | bool tracked_by_tablets = false; |
1150 | 0 | bool tracked_by_log_cache = false; |
1151 | 0 | for (;;) { |
1152 | | // The consumption get order is important, otherwise we could get into situation where |
1153 | | // mem tracking changed between gets. |
1154 | 0 | auto log_cache_consumption = log_tracker->consumption(); |
1155 | 0 | tracked_by_log_cache = tracked_by_log_cache || log_cache_consumption >= kValueSize; |
1156 | 0 | int64_t operation_tracker_consumption = 0; |
1157 | 0 | for (auto& child : tablets_tracker->ListChildren()) { |
1158 | 0 | operation_tracker_consumption += child->FindChild("operation_tracker")->consumption(); |
1159 | 0 | } |
1160 | |
|
1161 | 0 | tracked_by_tablets = tracked_by_tablets || operation_tracker_consumption >= kValueSize; |
1162 | 0 | LOG(INFO) << "Operation tracker consumption: " << operation_tracker_consumption |
1163 | 0 | << ", log cache consumption: " << log_cache_consumption; |
1164 | | // We have overhead in both log cache and tablets. |
1165 | | // So if value is double tracked then sum consumption will be higher than double value size. |
1166 | 0 | ASSERT_LE(operation_tracker_consumption + log_cache_consumption, kValueSize * 2) |
1167 | 0 | << DumpMemoryUsage(); |
1168 | 0 | if (std::chrono::steady_clock::time_point() == deadline) { // operation did not finish yet |
1169 | 0 | if (future.wait_for(kWaitInterval) == std::future_status::ready) { |
1170 | 0 | LOG(INFO) << "Value written"; |
1171 | 0 | deadline = std::chrono::steady_clock::now() + 3s; |
1172 | 0 | ASSERT_OK(future.get().status); |
1173 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status()); |
1174 | 0 | } |
1175 | 0 | } else if (deadline < std::chrono::steady_clock::now() || tracked_by_log_cache) { |
1176 | 0 | break; |
1177 | 0 | } else { |
1178 | 0 | std::this_thread::sleep_for(kWaitInterval); |
1179 | 0 | } |
1180 | 0 | } |
1181 | |
|
1182 | 0 | ASSERT_TRUE(tracked_by_tablets); |
1183 | 0 | ASSERT_TRUE(tracked_by_log_cache); |
1184 | 0 | } |
1185 | | |
1186 | | // Checks the existance of the BlockBasedTable memtracker and verifies that its size is greater |
1187 | | // than zero after creating a table and flushing it. Then deletes the table, and verifies that |
1188 | | // the memtracker is removed. |
1189 | 0 | TEST_F(QLTabletTest, BlockCacheMemTracking) { |
1190 | 0 | const auto kSleepTime = NonTsanVsTsan(5s, 1s); |
1191 | 0 | constexpr size_t kTotalRows = 10000; |
1192 | 0 | const string kBlockTrackerName = "BlockBasedTable"; |
1193 | |
|
1194 | 0 | TableHandle table; |
1195 | 0 | CreateTable(kTable1Name, &table, 1); |
1196 | 0 | FillTable(0, kTotalRows, table); |
1197 | |
|
1198 | 0 | auto server_tracker = MemTracker::GetRootTracker()->FindChild("server 1"); |
1199 | 0 | auto block_cache_tracker = server_tracker->FindChild(kBlockTrackerName); |
1200 | 0 | ASSERT_TRUE(block_cache_tracker); |
1201 | |
|
1202 | 0 | std::this_thread::sleep_for(kSleepTime); |
1203 | 0 | LOG(INFO) << "Flushing tablets"; |
1204 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
1205 | 0 | std::this_thread::sleep_for(kSleepTime); |
1206 | |
|
1207 | 0 | auto block_cache_children = block_cache_tracker->ListChildren(); |
1208 | | // check that there is exactly one child memtracker |
1209 | 0 | ASSERT_EQ(block_cache_children.size(), 1); |
1210 | | // check that the child memtracker has a consumption greater than zero |
1211 | 0 | ASSERT_GT(block_cache_children[0]->consumption(), 0); |
1212 | |
|
1213 | 0 | LOG(INFO) << "Deleting table"; |
1214 | 0 | ASSERT_OK(client_->DeleteTable(kTable1Name, true)); |
1215 | 0 | std::this_thread::sleep_for(kSleepTime); |
1216 | | |
1217 | | // after table deletion, assert that there is no longer a block cache memtracker |
1218 | 0 | block_cache_tracker = server_tracker->FindChild(kBlockTrackerName); |
1219 | 0 | ASSERT_FALSE(block_cache_tracker); |
1220 | 0 | } |
1221 | | |
1222 | | // Checks history cutoff for cluster against previous state. |
1223 | | // Committed history cutoff should not go backward. |
1224 | | // Updates committed_history_cutoff with current state. |
1225 | | void VerifyHistoryCutoff(MiniCluster* cluster, HybridTime* prev_committed, |
1226 | 0 | const std::string& trace) { |
1227 | 0 | SCOPED_TRACE(trace); |
1228 | 0 | const auto base_delta_us = |
1229 | 0 | -FLAGS_timestamp_history_retention_interval_sec * MonoTime::kMicrosecondsPerSecond; |
1230 | 0 | constexpr auto kExtraDeltaMs = 200; |
1231 | | // Allow one 2 Raft rounds + processing delta to replicate operation, update committed and |
1232 | | // propagate it. |
1233 | 0 | const auto committed_delta_us = |
1234 | 0 | base_delta_us - |
1235 | 0 | (FLAGS_raft_heartbeat_interval_ms * 2 + kExtraDeltaMs) * MonoTime::kMicrosecondsPerMillisecond |
1236 | 0 | * kTimeMultiplier; |
1237 | |
|
1238 | 0 | HybridTime committed = HybridTime::kMin; |
1239 | 0 | auto deadline = CoarseMonoClock::now() + 5s * kTimeMultiplier; |
1240 | 0 | for (;;) { |
1241 | 0 | ASSERT_LE(CoarseMonoClock::now(), deadline); |
1242 | 0 | auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll); |
1243 | 0 | std::sort(peers.begin(), peers.end(), [](const auto& lhs, const auto& rhs) { |
1244 | 0 | return lhs->permanent_uuid() < rhs->permanent_uuid(); |
1245 | 0 | }); |
1246 | 0 | if (peers.size() != cluster->num_tablet_servers()) { |
1247 | 0 | std::this_thread::sleep_for(100ms); |
1248 | 0 | continue; |
1249 | 0 | } |
1250 | 0 | bool complete = false; |
1251 | 0 | for (size_t i = 0; i < peers.size(); ++i) { |
1252 | 0 | auto peer = peers[i]; |
1253 | 0 | SCOPED_TRACE(Format("Peer: $0", peer->permanent_uuid())); |
1254 | 0 | if (peer->state() != tablet::RaftGroupStatePB::RUNNING) { |
1255 | 0 | complete = false; |
1256 | 0 | break; |
1257 | 0 | } |
1258 | 0 | auto peer_history_cutoff = |
1259 | 0 | peer->tablet()->RetentionPolicy()->GetRetentionDirective().history_cutoff; |
1260 | 0 | committed = std::max(peer_history_cutoff, committed); |
1261 | 0 | auto min_allowed = std::min(peer->clock_ptr()->Now().AddMicroseconds(committed_delta_us), |
1262 | 0 | peer->tablet()->mvcc_manager()->LastReplicatedHybridTime()); |
1263 | 0 | if (peer_history_cutoff < min_allowed) { |
1264 | 0 | LOG(INFO) << "Committed did not catch up for " << peer->permanent_uuid() << ": " |
1265 | 0 | << peer_history_cutoff << " vs " << min_allowed; |
1266 | 0 | complete = false; |
1267 | 0 | break; |
1268 | 0 | } |
1269 | 0 | if (peer->consensus()->GetLeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) { |
1270 | 0 | complete = true; |
1271 | 0 | } |
1272 | 0 | } |
1273 | 0 | if (complete) { |
1274 | 0 | break; |
1275 | 0 | } |
1276 | 0 | std::this_thread::sleep_for(100ms); |
1277 | 0 | } |
1278 | 0 | ASSERT_GE(committed, *prev_committed); |
1279 | 0 | *prev_committed = committed; |
1280 | 0 | } |
1281 | | |
1282 | | // Basic check for history cutoff evolution |
1283 | 0 | TEST_F(QLTabletTest, HistoryCutoff) { |
1284 | 0 | FLAGS_timestamp_history_retention_interval_sec = kTimeMultiplier; |
1285 | 0 | FLAGS_history_cutoff_propagation_interval_ms = 100; |
1286 | |
|
1287 | 0 | CreateTable(kTable1Name, &table1_, /* num_tablets= */ 1); |
1288 | 0 | HybridTime committed_history_cutoff = HybridTime::kMin; |
1289 | 0 | FillTable(0, 10, table1_); |
1290 | 0 | ASSERT_NO_FATALS(VerifyHistoryCutoff(cluster_.get(), &committed_history_cutoff, "After write")); |
1291 | | |
1292 | | // Check that we restore committed state after restart. |
1293 | 0 | std::array<HybridTime, 3> peer_committed; |
1294 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
1295 | 0 | auto peers = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers(); |
1296 | 0 | ASSERT_EQ(peers.size(), 1); |
1297 | 0 | peer_committed[i] = |
1298 | 0 | peers[0]->tablet()->RetentionPolicy()->GetRetentionDirective().history_cutoff; |
1299 | 0 | LOG(INFO) << "Peer: " << peers[0]->permanent_uuid() << ", index: " << i |
1300 | 0 | << ", committed: " << peer_committed[i]; |
1301 | 0 | cluster_->mini_tablet_server(i)->Shutdown(); |
1302 | 0 | } |
1303 | |
|
1304 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
1305 | 0 | ASSERT_OK(cluster_->mini_tablet_server(i)->Start()); |
1306 | 0 | for (;;) { |
1307 | 0 | auto peers = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers(); |
1308 | 0 | ASSERT_LE(peers.size(), 1); |
1309 | 0 | if (peers.empty() || peers[0]->state() != tablet::RaftGroupStatePB::RUNNING) { |
1310 | 0 | std::this_thread::sleep_for(100ms); |
1311 | 0 | continue; |
1312 | 0 | } |
1313 | 0 | SCOPED_TRACE(Format("Peer: $0, index: $1", peers[0]->permanent_uuid(), i)); |
1314 | 0 | ASSERT_GE(peers[0]->tablet()->RetentionPolicy()->GetRetentionDirective().history_cutoff, |
1315 | 0 | peer_committed[i]); |
1316 | 0 | break; |
1317 | 0 | } |
1318 | 0 | cluster_->mini_tablet_server(i)->Shutdown(); |
1319 | 0 | } |
1320 | |
|
1321 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
1322 | 0 | ASSERT_OK(cluster_->mini_tablet_server(i)->Start()); |
1323 | 0 | } |
1324 | 0 | ASSERT_NO_FATALS(VerifyHistoryCutoff(cluster_.get(), &committed_history_cutoff, "After restart")); |
1325 | | |
1326 | | // Wait to check history cutoff advance w/o operations. |
1327 | 0 | std::this_thread::sleep_for( |
1328 | 0 | FLAGS_timestamp_history_retention_interval_sec * 1s + |
1329 | 0 | FLAGS_history_cutoff_propagation_interval_ms * 3ms); |
1330 | 0 | ASSERT_NO_FATALS(VerifyHistoryCutoff(cluster_.get(), &committed_history_cutoff, "Final")); |
1331 | 0 | } |
1332 | | |
1333 | | class QLTabletRf1Test : public QLTabletTest { |
1334 | | public: |
1335 | 2 | void SetUp() override { |
1336 | 2 | mini_cluster_opt_.num_masters = 1; |
1337 | 2 | mini_cluster_opt_.num_tablet_servers = 1; |
1338 | 2 | QLTabletTest::SetUp(); |
1339 | 2 | } |
1340 | | }; |
1341 | | |
1342 | | // For this test we don't need actually RF3 setup which also makes test flaky because of |
1343 | | // https://github.com/yugabyte/yugabyte-db/issues/4663. |
1344 | 0 | TEST_F_EX(QLTabletTest, GetMiddleKey, QLTabletRf1Test) { |
1345 | 0 | FLAGS_db_write_buffer_size = 20_KB; |
1346 | |
|
1347 | 0 | TestWorkload workload(cluster_.get()); |
1348 | 0 | workload.set_table_name(kTable1Name); |
1349 | 0 | workload.set_write_timeout_millis(30000); |
1350 | 0 | workload.set_num_tablets(1); |
1351 | 0 | workload.set_num_write_threads(2); |
1352 | 0 | workload.set_write_batch_size(1); |
1353 | 0 | workload.set_payload_bytes(16); |
1354 | 0 | workload.Setup(); |
1355 | |
|
1356 | 0 | LOG(INFO) << "Starting workload ..."; |
1357 | 0 | Stopwatch s(Stopwatch::ALL_THREADS); |
1358 | 0 | s.start(); |
1359 | 0 | workload.Start(); |
1360 | |
|
1361 | 0 | const auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); |
1362 | 0 | ASSERT_EQ(peers.size(), 1); |
1363 | 0 | const auto& tablet = *ASSERT_NOTNULL(peers[0]->tablet()); |
1364 | | |
1365 | | // We want some compactions to happen, so largest SST file will become large enough for its |
1366 | | // approximate middle key to roughly split the whole tablet into two parts that are close in size. |
1367 | 0 | while (tablet.TEST_db()->GetCurrentVersionDataSstFilesSize() < |
1368 | 0 | implicit_cast<size_t>(20 * FLAGS_db_write_buffer_size)) { |
1369 | 0 | std::this_thread::sleep_for(100ms); |
1370 | 0 | } |
1371 | |
|
1372 | 0 | workload.StopAndJoin(); |
1373 | 0 | s.stop(); |
1374 | 0 | LOG(INFO) << "Workload stopped, it took: " << AsString(s.elapsed()); |
1375 | |
|
1376 | 0 | LOG(INFO) << "Rows inserted: " << workload.rows_inserted(); |
1377 | 0 | LOG(INFO) << "Number of SST files: " << tablet.TEST_db()->GetCurrentVersionNumSSTFiles(); |
1378 | |
|
1379 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
1380 | |
|
1381 | 0 | const auto encoded_split_key = ASSERT_RESULT(tablet.GetEncodedMiddleSplitKey()); |
1382 | 0 | LOG(INFO) << "Encoded split key: " << Slice(encoded_split_key).ToDebugString(); |
1383 | |
|
1384 | 0 | if (tablet.metadata()->partition_schema()->IsHashPartitioning()) { |
1385 | 0 | docdb::DocKey split_key; |
1386 | 0 | Slice key_slice = encoded_split_key; |
1387 | 0 | ASSERT_OK(split_key.DecodeFrom(&key_slice, docdb::DocKeyPart::kUpToHashCode)); |
1388 | 0 | ASSERT_TRUE(key_slice.empty()) << "Extra bytes after decoding: " << key_slice.ToDebugString(); |
1389 | 0 | ASSERT_EQ(split_key.hashed_group().size() + split_key.range_group().size(), 0) |
1390 | 0 | << "Hash-based partition: middle key should only have encoded hash code"; |
1391 | 0 | LOG(INFO) << "Split key: " << AsString(split_key); |
1392 | 0 | } else { |
1393 | 0 | docdb::SubDocKey split_key; |
1394 | 0 | ASSERT_OK(split_key.FullyDecodeFrom(encoded_split_key, docdb::HybridTimeRequired::kFalse)); |
1395 | 0 | ASSERT_EQ(split_key.num_subkeys(), 0) |
1396 | 0 | << "Range-based partition: middle doc key should not have sub doc key components"; |
1397 | 0 | LOG(INFO) << "Split key: " << AsString(split_key); |
1398 | 0 | } |
1399 | | |
1400 | | // Checking number of keys less/bigger than the approximate middle key. |
1401 | 0 | size_t total_keys = 0; |
1402 | 0 | size_t num_keys_less = 0; |
1403 | |
|
1404 | 0 | rocksdb::ReadOptions read_opts; |
1405 | 0 | read_opts.query_id = rocksdb::kDefaultQueryId; |
1406 | 0 | std::unique_ptr<rocksdb::Iterator> iter(tablet.TEST_db()->NewIterator(read_opts)); |
1407 | |
|
1408 | 0 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { |
1409 | 0 | Slice key = iter->key(); |
1410 | 0 | if (key.Less(encoded_split_key)) { |
1411 | 0 | ++num_keys_less; |
1412 | 0 | } |
1413 | 0 | ++total_keys; |
1414 | 0 | } |
1415 | |
|
1416 | 0 | LOG(INFO) << "Total keys: " << total_keys; |
1417 | 0 | LOG(INFO) << "Number of keys less than approximate middle key: " << num_keys_less; |
1418 | 0 | const auto num_keys_less_percent = 100 * num_keys_less / total_keys; |
1419 | |
|
1420 | 0 | LOG(INFO) << Format( |
1421 | 0 | "Number of keys less than approximate middle key: $0 ($1%)", num_keys_less, |
1422 | 0 | num_keys_less_percent); |
1423 | |
|
1424 | 0 | ASSERT_GE(num_keys_less_percent, 40); |
1425 | 0 | ASSERT_LE(num_keys_less_percent, 60); |
1426 | 0 | } |
1427 | | |
1428 | | namespace { |
1429 | | |
1430 | 0 | std::vector<OpId> GetLastAppliedOpIds(const std::vector<tablet::TabletPeerPtr>& peers) { |
1431 | 0 | std::vector<OpId> last_applied_op_ids; |
1432 | 0 | for (auto& peer : peers) { |
1433 | 0 | const auto last_applied_op_id = peer->consensus()->GetLastAppliedOpId(); |
1434 | 0 | VLOG(1) << "Peer: " << AsString(peer->permanent_uuid()) |
1435 | 0 | << ", last applied op ID: " << AsString(last_applied_op_id); |
1436 | 0 | last_applied_op_ids.push_back(last_applied_op_id); |
1437 | 0 | } |
1438 | 0 | return last_applied_op_ids; |
1439 | 0 | } |
1440 | | |
1441 | 0 | Result<OpId> GetAllAppliedOpId(const std::vector<tablet::TabletPeerPtr>& peers) { |
1442 | 0 | for (auto& peer : peers) { |
1443 | 0 | if (peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) { |
1444 | 0 | return peer->raft_consensus()->GetAllAppliedOpId(); |
1445 | 0 | } |
1446 | 0 | } |
1447 | 0 | return STATUS(NotFound, "No leader found"); |
1448 | 0 | } |
1449 | | |
1450 | | CHECKED_STATUS WaitForAppliedOpIdsStabilized( |
1451 | 0 | const std::vector<tablet::TabletPeerPtr>& peers, const MonoDelta& timeout) { |
1452 | 0 | std::vector<OpId> prev_last_applied_op_ids; |
1453 | 0 | return WaitFor( |
1454 | 0 | [&]() { |
1455 | 0 | std::vector<OpId> last_applied_op_ids = GetLastAppliedOpIds(peers); |
1456 | 0 | LOG(INFO) << "last_applied_op_ids: " << AsString(last_applied_op_ids); |
1457 | 0 | if (last_applied_op_ids == prev_last_applied_op_ids) { |
1458 | 0 | return true; |
1459 | 0 | } |
1460 | 0 | prev_last_applied_op_ids = last_applied_op_ids; |
1461 | 0 | return false; |
1462 | 0 | }, |
1463 | 0 | timeout, "Waiting for applied op IDs to stabilize", 2000ms * kTimeMultiplier, 1); |
1464 | 0 | } |
1465 | | |
1466 | | } // namespace |
1467 | | |
1468 | 0 | TEST_F(QLTabletTest, LastAppliedOpIdTracking) { |
1469 | 0 | constexpr auto kAppliesTimeout = 10s * kTimeMultiplier; |
1470 | |
|
1471 | 0 | TableHandle table; |
1472 | 0 | CreateTable(kTable1Name, &table, /* num_tablets =*/1); |
1473 | 0 | auto session = client_->NewSession(); |
1474 | 0 | session->SetTimeout(60s); |
1475 | |
|
1476 | 0 | LOG(INFO) << "Writing data..."; |
1477 | 0 | int key = 0; |
1478 | 0 | for (; key < 10; ++key) { |
1479 | 0 | SetValue(session, key, key, table); |
1480 | 0 | } |
1481 | 0 | LOG(INFO) << "Writing completed"; |
1482 | |
|
1483 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll); |
1484 | |
|
1485 | 0 | ASSERT_OK(WaitForAppliedOpIdsStabilized(peers, kAppliesTimeout)); |
1486 | 0 | auto last_applied_op_ids = GetLastAppliedOpIds(peers); |
1487 | 0 | LOG(INFO) << "last_applied_op_ids: " << AsString(last_applied_op_ids); |
1488 | 0 | auto all_applied_op_id = ASSERT_RESULT(GetAllAppliedOpId(peers)); |
1489 | 0 | LOG(INFO) << "all_applied_op_id: " << AsString(all_applied_op_id); |
1490 | 0 | for (const auto& last_applied_op_id : last_applied_op_ids) { |
1491 | 0 | ASSERT_EQ(last_applied_op_id, all_applied_op_id); |
1492 | 0 | } |
1493 | |
|
1494 | 0 | LOG(INFO) << "Shutting down TS-0"; |
1495 | 0 | cluster_->mini_tablet_server(0)->Shutdown(); |
1496 | |
|
1497 | 0 | peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll); |
1498 | |
|
1499 | 0 | LOG(INFO) << "Writing more data..."; |
1500 | 0 | for (; key < 20; ++key) { |
1501 | 0 | SetValue(session, key, key, table); |
1502 | 0 | } |
1503 | 0 | LOG(INFO) << "Writing completed"; |
1504 | |
|
1505 | 0 | ASSERT_OK(WaitForAppliedOpIdsStabilized(peers, kAppliesTimeout)); |
1506 | 0 | auto new_all_applied_op_id = ASSERT_RESULT(GetAllAppliedOpId(peers)); |
1507 | | // We expect turned off TS to lag behind and not let all applied OP ids to advance. |
1508 | | // In case TS-0 was leader, all_applied_op_id will be 0 on a new leader until it hears from TS-0. |
1509 | 0 | ASSERT_TRUE(new_all_applied_op_id == all_applied_op_id || new_all_applied_op_id.empty()); |
1510 | | |
1511 | | // Save max applied op ID. |
1512 | 0 | last_applied_op_ids = GetLastAppliedOpIds(peers); |
1513 | 0 | auto max_applied_op_id = OpId::Min(); |
1514 | 0 | for (const auto& last_applied_op_id : last_applied_op_ids) { |
1515 | 0 | max_applied_op_id = std::max(max_applied_op_id, last_applied_op_id); |
1516 | 0 | } |
1517 | 0 | ASSERT_GT(max_applied_op_id, all_applied_op_id); |
1518 | |
|
1519 | 0 | LOG(INFO) << "Restarting TS-0"; |
1520 | 0 | ASSERT_OK(cluster_->mini_tablet_server(0)->Start()); |
1521 | | |
1522 | | // TS-0 should catch up on applied ops. |
1523 | 0 | ASSERT_OK(WaitFor( |
1524 | 0 | [&]() -> Result<bool> { |
1525 | 0 | return VERIFY_RESULT(GetAllAppliedOpId(peers)) == max_applied_op_id; |
1526 | 0 | }, |
1527 | 0 | kAppliesTimeout, "Waiting for all ops to apply")); |
1528 | 0 | last_applied_op_ids = GetLastAppliedOpIds(peers); |
1529 | 0 | for (const auto& last_applied_op_id : last_applied_op_ids) { |
1530 | 0 | ASSERT_EQ(last_applied_op_id, max_applied_op_id); |
1531 | 0 | } |
1532 | 0 | } |
1533 | | |
1534 | 0 | TEST_F(QLTabletTest, SlowPrepare) { |
1535 | 0 | FLAGS_TEST_preparer_batch_inject_latency_ms = 100; |
1536 | |
|
1537 | 0 | const int kNumTablets = 1; |
1538 | |
|
1539 | 0 | auto session = client_->NewSession(); |
1540 | 0 | session->SetTimeout(60s); |
1541 | |
|
1542 | 0 | TestWorkload workload(cluster_.get()); |
1543 | 0 | workload.set_table_name(kTable1Name); |
1544 | 0 | workload.set_write_timeout_millis(30000 * kTimeMultiplier); |
1545 | 0 | workload.set_num_tablets(kNumTablets); |
1546 | 0 | workload.set_num_write_threads(2); |
1547 | 0 | workload.set_write_batch_size(1); |
1548 | 0 | workload.Setup(); |
1549 | 0 | workload.Start(); |
1550 | |
|
1551 | 0 | std::this_thread::sleep_for(2s); |
1552 | 0 | StepDownAllTablets(cluster_.get()); |
1553 | |
|
1554 | 0 | workload.StopAndJoin(); |
1555 | 0 | } |
1556 | | |
1557 | 0 | TEST_F(QLTabletTest, ElectUnsynchronizedFollower) { |
1558 | 0 | TableHandle table; |
1559 | 0 | CreateTable(kTable1Name, &table, 1); |
1560 | |
|
1561 | 0 | auto unsynchronized_follower = cluster_->mini_tablet_server(0)->server()->permanent_uuid(); |
1562 | 0 | LOG(INFO) << "Unsynchronized follower: " << unsynchronized_follower; |
1563 | 0 | cluster_->mini_tablet_server(0)->Shutdown(); |
1564 | |
|
1565 | 0 | auto session = CreateSession(); |
1566 | 0 | SetValue(session, 1, -1, table); |
1567 | |
|
1568 | 0 | auto leader_idx = ASSERT_RESULT(ServerWithLeaders(cluster_.get())); |
1569 | 0 | LOG(INFO) << "Leader: " << cluster_->mini_tablet_server(leader_idx)->server()->permanent_uuid(); |
1570 | 0 | auto follower_idx = 1 ^ 2 ^ leader_idx; |
1571 | 0 | LOG(INFO) << "Turning off follower: " |
1572 | 0 | << cluster_->mini_tablet_server(follower_idx)->server()->permanent_uuid(); |
1573 | 0 | cluster_->mini_tablet_server(follower_idx)->Shutdown(); |
1574 | 0 | auto peers = |
1575 | 0 | cluster_->mini_tablet_server(leader_idx)->server()->tablet_manager()->GetTabletPeers(); |
1576 | 0 | ASSERT_EQ(peers.size(), 1); |
1577 | 0 | { |
1578 | 0 | google::FlagSaver flag_saver; |
1579 | 0 | consensus::LeaderStepDownRequestPB req; |
1580 | 0 | req.set_tablet_id(peers.front()->tablet_id()); |
1581 | 0 | req.set_force_step_down(true); |
1582 | 0 | req.set_new_leader_uuid(unsynchronized_follower); |
1583 | 0 | consensus::LeaderStepDownResponsePB resp; |
1584 | |
|
1585 | 0 | FLAGS_leader_failure_max_missed_heartbeat_periods = 10000; |
1586 | 0 | ASSERT_OK(peers.front()->raft_consensus()->StepDown(&req, &resp)); |
1587 | 0 | ASSERT_FALSE(resp.has_error()) << resp.error().ShortDebugString(); |
1588 | 0 | } |
1589 | | |
1590 | 0 | ASSERT_OK(cluster_->mini_tablet_server(0)->Start()); |
1591 | |
|
1592 | 0 | ASSERT_NO_FATALS(SetValue(session, 2, -2, table)); |
1593 | |
|
1594 | 0 | ASSERT_OK(cluster_->mini_tablet_server(follower_idx)->Start()); |
1595 | 0 | } |
1596 | | |
1597 | 0 | TEST_F(QLTabletTest, FollowerRestartDuringWrite) { |
1598 | 0 | TableHandle table; |
1599 | 0 | CreateTable(kTable1Name, &table, 1); |
1600 | |
|
1601 | 0 | for (auto iter = 0; iter != 6; ++iter) { |
1602 | 0 | auto session = CreateSession(); |
1603 | 0 | SetValue(session, 1, -1, table); |
1604 | |
|
1605 | 0 | auto leader_idx = ASSERT_RESULT(ServerWithLeaders(cluster_.get())); |
1606 | 0 | LOG(INFO) << "Leader: " << cluster_->mini_tablet_server(leader_idx)->server()->permanent_uuid(); |
1607 | 0 | auto follower_idx = (leader_idx + 1) % cluster_->num_tablet_servers(); |
1608 | 0 | auto follower = cluster_->mini_tablet_server(follower_idx)->server(); |
1609 | 0 | LOG(INFO) << "Follower: " << follower->permanent_uuid(); |
1610 | 0 | auto follower_peers = follower->tablet_manager()->GetTabletPeers(); |
1611 | 0 | for (const auto& peer : follower_peers) { |
1612 | 0 | peer->raft_consensus()->TEST_DelayUpdate(FLAGS_raft_heartbeat_interval_ms / 2 * 1ms); |
1613 | 0 | } |
1614 | |
|
1615 | 0 | SetValue(session, 2, -2, table); |
1616 | 0 | std::this_thread::sleep_for(FLAGS_raft_heartbeat_interval_ms / 2 * 1ms); |
1617 | 0 | SetValue(session, 3, -3, table); |
1618 | | |
1619 | | // Shutdown follower, so it would not accept updates and exponential backoff will turn to send |
1620 | | // empty operations. |
1621 | 0 | cluster_->mini_tablet_server(follower_idx)->Shutdown(); |
1622 | | |
1623 | | // Wait exponential backoff goes to empty operations. |
1624 | 0 | std::this_thread::sleep_for(FLAGS_raft_heartbeat_interval_ms * 3ms); |
1625 | |
|
1626 | 0 | SetValue(session, 4, -4, table); |
1627 | |
|
1628 | 0 | ASSERT_OK(cluster_->mini_tablet_server(follower_idx)->Start()); |
1629 | | |
1630 | | // Wait until newly started follower receive a new operation. |
1631 | | // Without fix for GH #7145 it would crash in this case. |
1632 | 0 | std::this_thread::sleep_for(FLAGS_raft_heartbeat_interval_ms * 3ms); |
1633 | |
|
1634 | 0 | ASSERT_OK(cluster_->RestartSync()); |
1635 | 0 | } |
1636 | 0 | } |
1637 | | |
1638 | 0 | TEST_F_EX(QLTabletTest, DataBlockKeyValueEncoding, QLTabletRf1Test) { |
1639 | 0 | constexpr auto kNumRows = 4000; |
1640 | 0 | constexpr auto kNumRowsPerBatch = 100; |
1641 | | // We are testing delta encoding, but not compression. |
1642 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_compression_type) = "NoCompression"; |
1643 | |
|
1644 | 0 | struct SstSizes { |
1645 | 0 | size_t regular_table = 0; |
1646 | 0 | size_t index_table = 0; |
1647 | 0 | }; |
1648 | 0 | std::unordered_map<rocksdb::KeyValueEncodingFormat, SstSizes, EnumHash> sst_sizes; |
1649 | |
|
1650 | 0 | constexpr auto kEncodingSharedPrefix = |
1651 | 0 | rocksdb::KeyValueEncodingFormat::kKeyDeltaEncodingSharedPrefix; |
1652 | 0 | constexpr auto kEncodingThreeSharedParts = |
1653 | 0 | rocksdb::KeyValueEncodingFormat::kKeyDeltaEncodingThreeSharedParts; |
1654 | |
|
1655 | 0 | for (auto encoding : {kEncodingSharedPrefix, kEncodingThreeSharedParts}) { |
1656 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_regular_tablets_data_block_key_value_encoding) = |
1657 | 0 | KeyValueEncodingFormatToString(encoding); |
1658 | 0 | const YBTableName table_name( |
1659 | 0 | YQL_DATABASE_CQL, "my_keyspace", Format("ql_client_test_table_$0", encoding)); |
1660 | 0 | TableHandle table; |
1661 | 0 | CreateTable(table_name, &table, /* num_tablets = */ 1); |
1662 | 0 | ASSERT_OK(BatchedFillTable(/* begin = */ 0, /* end = */ kNumRows, kNumRowsPerBatch, table)); |
1663 | |
|
1664 | 0 | TableHandle index_table; |
1665 | 0 | kv_table_test::CreateIndex( |
1666 | 0 | yb::client::Transactional::kTrue, /* indexed_column_index = */ 1, |
1667 | 0 | /* use_mangled_names = */ false, table, client_.get(), &index_table); |
1668 | 0 | ASSERT_OK(client_->WaitUntilIndexPermissionsAtLeast( |
1669 | 0 | table_name, index_table.name(), INDEX_PERM_READ_WRITE_AND_DELETE, /* max_wait = */ 10s)); |
1670 | |
|
1671 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
1672 | |
|
1673 | 0 | auto get_tablet_size = [](tablet::Tablet* tablet) -> Result<size_t> { |
1674 | 0 | RETURN_NOT_OK(tablet->Flush(tablet::FlushMode::kSync)); |
1675 | 0 | RETURN_NOT_OK(tablet->ForceFullRocksDBCompact()); |
1676 | 0 | return tablet->GetCurrentVersionSstFilesSize(); |
1677 | 0 | }; |
1678 | |
|
1679 | 0 | for (const auto& tablet_peer : ListTableTabletPeers(cluster_.get(), table->id())) { |
1680 | 0 | sst_sizes[encoding].regular_table += ASSERT_RESULT(get_tablet_size(tablet_peer->tablet())); |
1681 | 0 | } |
1682 | 0 | for (const auto& tablet_peer : ListTableTabletPeers(cluster_.get(), index_table->id())) { |
1683 | 0 | sst_sizes[encoding].index_table += ASSERT_RESULT(get_tablet_size(tablet_peer->tablet())); |
1684 | 0 | } |
1685 | 0 | } |
1686 | 0 | ASSERT_GT( |
1687 | 0 | 1.0 * sst_sizes[kEncodingSharedPrefix].regular_table / |
1688 | 0 | sst_sizes[kEncodingThreeSharedParts].regular_table, |
1689 | 0 | 1.3); |
1690 | 0 | ASSERT_EQ(sst_sizes[kEncodingSharedPrefix].index_table, |
1691 | 0 | sst_sizes[kEncodingThreeSharedParts].index_table); |
1692 | 0 | } |
1693 | | |
1694 | | } // namespace client |
1695 | | } // namespace yb |