/Users/deen/code/yugabyte-db/src/yb/client/ql-stress-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/bfql/gen_opcodes.h" |
15 | | |
16 | | #include "yb/client/client.h" |
17 | | #include "yb/client/error.h" |
18 | | #include "yb/client/ql-dml-test-base.h" |
19 | | #include "yb/client/rejection_score_source.h" |
20 | | #include "yb/client/schema.h" |
21 | | #include "yb/client/session.h" |
22 | | #include "yb/client/table.h" |
23 | | #include "yb/client/table_handle.h" |
24 | | #include "yb/client/transaction.h" |
25 | | #include "yb/client/transaction_manager.h" |
26 | | #include "yb/client/yb_op.h" |
27 | | |
28 | | #include "yb/common/ql_value.h" |
29 | | #include "yb/common/schema.h" |
30 | | |
31 | | #include "yb/consensus/log.h" |
32 | | #include "yb/consensus/log_reader.h" |
33 | | #include "yb/consensus/raft_consensus.h" |
34 | | #include "yb/consensus/retryable_requests.h" |
35 | | |
36 | | #include "yb/docdb/consensus_frontier.h" |
37 | | #include "yb/docdb/doc_key.h" |
38 | | #include "yb/docdb/docdb_rocksdb_util.h" |
39 | | |
40 | | #include "yb/rocksdb/metadata.h" |
41 | | #include "yb/rocksdb/utilities/checkpoint.h" |
42 | | |
43 | | #include "yb/rpc/messenger.h" |
44 | | |
45 | | #include "yb/server/hybrid_clock.h" |
46 | | |
47 | | #include "yb/tablet/tablet.h" |
48 | | #include "yb/tablet/tablet_metadata.h" |
49 | | #include "yb/tablet/tablet_options.h" |
50 | | #include "yb/tablet/tablet_peer.h" |
51 | | |
52 | | #include "yb/tserver/mini_tablet_server.h" |
53 | | #include "yb/tserver/tablet_server.h" |
54 | | #include "yb/tserver/ts_tablet_manager.h" |
55 | | |
56 | | #include "yb/util/debug-util.h" |
57 | | #include "yb/util/format.h" |
58 | | #include "yb/util/metrics.h" |
59 | | #include "yb/util/random_util.h" |
60 | | #include "yb/util/size_literals.h" |
61 | | #include "yb/util/status_format.h" |
62 | | #include "yb/util/status_log.h" |
63 | | #include "yb/util/test_thread_holder.h" |
64 | | #include "yb/util/tsan_util.h" |
65 | | |
66 | | #include "yb/yql/cql/ql/util/statement_result.h" |
67 | | |
68 | | DECLARE_bool(TEST_combine_batcher_errors); |
69 | | DECLARE_bool(allow_preempting_compactions); |
70 | | DECLARE_bool(detect_duplicates_for_retryable_requests); |
71 | | DECLARE_bool(enable_ondisk_compression); |
72 | | DECLARE_double(TEST_respond_write_failed_probability); |
73 | | DECLARE_double(transaction_max_missed_heartbeat_periods); |
74 | | DECLARE_int32(TEST_max_write_waiters); |
75 | | DECLARE_int32(client_read_write_timeout_ms); |
76 | | DECLARE_int32(log_cache_size_limit_mb); |
77 | | DECLARE_int32(log_min_seconds_to_retain); |
78 | | DECLARE_int32(raft_heartbeat_interval_ms); |
79 | | DECLARE_int32(retryable_request_range_time_limit_secs); |
80 | | DECLARE_int32(rocksdb_level0_file_num_compaction_trigger); |
81 | | DECLARE_int32(rocksdb_level0_slowdown_writes_trigger); |
82 | | DECLARE_int32(rocksdb_max_background_compactions); |
83 | | DECLARE_int32(rocksdb_universal_compaction_min_merge_width); |
84 | | DECLARE_int32(rocksdb_universal_compaction_size_ratio); |
85 | | DECLARE_int64(db_write_buffer_size); |
86 | | DECLARE_int64(remote_bootstrap_rate_limit_bytes_per_sec); |
87 | | DECLARE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec); |
88 | | DECLARE_int64(transaction_rpc_timeout_ms); |
89 | | DECLARE_uint64(log_segment_size_bytes); |
90 | | DECLARE_uint64(sst_files_hard_limit); |
91 | | DECLARE_uint64(sst_files_soft_limit); |
92 | | |
93 | | METRIC_DECLARE_counter(majority_sst_files_rejections); |
94 | | |
95 | | using namespace std::literals; |
96 | | |
97 | | using rocksdb::checkpoint::CreateCheckpoint; |
98 | | using rocksdb::UserFrontierPtr; |
99 | | using yb::tablet::TabletOptions; |
100 | | using yb::docdb::InitRocksDBOptions; |
101 | | |
102 | | DECLARE_bool(enable_ysql); |
103 | | |
104 | | namespace yb { |
105 | | namespace client { |
106 | | |
107 | | namespace { |
108 | | |
109 | | const std::string kValueColumn = "v"; |
110 | | |
111 | | } |
112 | | |
113 | | class QLStressTest : public QLDmlTestBase<MiniCluster> { |
114 | | public: |
115 | 17 | QLStressTest() { |
116 | 17 | } |
117 | | |
118 | 17 | void SetUp() override { |
119 | | // To prevent automatic creation of the transaction status table. |
120 | 17 | SetAtomicFlag(false, &FLAGS_enable_ysql); |
121 | | |
122 | 17 | ASSERT_NO_FATALS(QLDmlTestBase::SetUp()); |
123 | | |
124 | 0 | YBSchemaBuilder b; |
125 | 0 | InitSchemaBuilder(&b); |
126 | 0 | CompleteSchemaBuilder(&b); |
127 | |
|
128 | 0 | ASSERT_OK(table_.Create(kTableName, NumTablets(), client_.get(), &b)); |
129 | 0 | ASSERT_OK(WaitForTabletLeaders()); |
130 | 0 | } |
131 | | |
132 | 0 | virtual void CompleteSchemaBuilder(YBSchemaBuilder* b) {} |
133 | | |
134 | 0 | virtual int NumTablets() { |
135 | 0 | return CalcNumTablets(3); |
136 | 0 | } |
137 | | |
138 | 0 | virtual void InitSchemaBuilder(YBSchemaBuilder* builder) { |
139 | 0 | builder->AddColumn("h")->Type(INT32)->HashPrimaryKey()->NotNull(); |
140 | 0 | builder->AddColumn(kValueColumn)->Type(STRING); |
141 | 0 | } |
142 | | |
143 | 0 | CHECKED_STATUS WaitForTabletLeaders() { |
144 | 0 | const MonoTime deadline = MonoTime::Now() + 10s * kTimeMultiplier; |
145 | 0 | for (const auto& tablet_id : ListTabletIdsForTable(cluster_.get(), table_->id())) { |
146 | 0 | RETURN_NOT_OK(WaitUntilTabletHasLeader(cluster_.get(), tablet_id, deadline)); |
147 | 0 | } |
148 | 0 | return Status::OK(); |
149 | 0 | } |
150 | | |
151 | | YBqlWriteOpPtr InsertRow(const YBSessionPtr& session, |
152 | | const TableHandle& table, |
153 | | int32_t key, |
154 | 0 | const std::string& value) { |
155 | 0 | auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
156 | 0 | auto* const req = op->mutable_request(); |
157 | 0 | QLAddInt32HashValue(req, key); |
158 | 0 | table.AddStringColumnValue(req, kValueColumn, value); |
159 | 0 | session->Apply(op); |
160 | 0 | return op; |
161 | 0 | } |
162 | | |
163 | | CHECKED_STATUS WriteRow(const YBSessionPtr& session, |
164 | | const TableHandle& table, |
165 | | int32_t key, |
166 | 0 | const std::string& value) { |
167 | 0 | auto op = InsertRow(session, table, key, value); |
168 | 0 | RETURN_NOT_OK(session->Flush()); |
169 | 0 | if (op->response().status() != QLResponsePB::YQL_STATUS_OK) { |
170 | 0 | return STATUS_FORMAT( |
171 | 0 | RemoteError, "Write failed: $0", QLResponsePB::QLStatus_Name(op->response().status())); |
172 | 0 | } |
173 | | |
174 | 0 | return Status::OK(); |
175 | 0 | } |
176 | | |
177 | 0 | YBqlReadOpPtr SelectRow(const YBSessionPtr& session, const TableHandle& table, int32_t key) { |
178 | 0 | auto op = table.NewReadOp(); |
179 | 0 | auto* const req = op->mutable_request(); |
180 | 0 | QLAddInt32HashValue(req, key); |
181 | 0 | table.AddColumns({kValueColumn}, req); |
182 | 0 | session->Apply(op); |
183 | 0 | return op; |
184 | 0 | } |
185 | | |
186 | 0 | Result<QLValue> ReadRow(const YBSessionPtr& session, const TableHandle& table, int32_t key) { |
187 | 0 | auto op = SelectRow(session, table, key); |
188 | 0 | RETURN_NOT_OK(session->Flush()); |
189 | 0 | if (op->response().status() != QLResponsePB::YQL_STATUS_OK) { |
190 | 0 | return STATUS_FORMAT( |
191 | 0 | RemoteError, "Read failed: $0", QLResponsePB::QLStatus_Name(op->response().status())); |
192 | 0 | } |
193 | 0 | auto rowblock = ql::RowsResult(op.get()).GetRowBlock(); |
194 | 0 | if (rowblock->row_count() != 1) { |
195 | 0 | return STATUS_FORMAT(NotFound, "Bad count for $0, count: $1", key, rowblock->row_count()); |
196 | 0 | } |
197 | 0 | const auto& row = rowblock->row(0); |
198 | 0 | return row.column(0); |
199 | 0 | } |
200 | | |
201 | | YBqlWriteOpPtr InsertRow(const YBSessionPtr& session, |
202 | | int32_t key, |
203 | 0 | const std::string& value) { |
204 | 0 | return QLStressTest::InsertRow(session, table_, key, value); |
205 | 0 | } |
206 | | |
207 | | CHECKED_STATUS WriteRow(const YBSessionPtr& session, |
208 | | int32_t key, |
209 | 0 | const std::string& value) { |
210 | 0 | return QLStressTest::WriteRow(session, table_, key, value); |
211 | 0 | } |
212 | | |
213 | 0 | YBqlReadOpPtr SelectRow(const YBSessionPtr& session, int32_t key) { |
214 | 0 | return QLStressTest::SelectRow(session, table_, key); |
215 | 0 | } |
216 | | |
217 | 0 | Result<QLValue> ReadRow(const YBSessionPtr& session, int32_t key) { |
218 | 0 | return QLStressTest::ReadRow(session, table_, key); |
219 | 0 | } |
220 | | |
221 | | TransactionManager CreateTxnManager(); |
222 | | |
223 | | void VerifyFlushedFrontiers(); |
224 | | |
225 | | void TestRetryWrites(bool restarts); |
226 | | |
227 | | bool CheckRetryableRequestsCountsAndLeaders(size_t total_leaders, size_t* total_entries); |
228 | | |
229 | | void AddWriter( |
230 | | std::string value_prefix, std::atomic<int>* key, TestThreadHolder* thread_holder, |
231 | | const std::chrono::nanoseconds& sleep_duration = std::chrono::nanoseconds(), |
232 | | bool allow_failures = false, TransactionManager* txn_manager = nullptr, |
233 | | double transactional_write_probability = 0.0); |
234 | | |
235 | | void TestWriteRejection(); |
236 | | |
237 | | TableHandle table_; |
238 | | |
239 | | int checkpoint_index_ = 0; |
240 | | }; |
241 | | |
242 | | /* |
243 | | * Create a lot of tables and check that each of them are usable (can read/write to them). |
244 | | * Test enough rows/keys to ensure that most tablets will be hit. |
245 | | */ |
246 | 0 | TEST_F(QLStressTest, LargeNumberOfTables) { |
247 | 0 | int num_tables = NonTsanVsTsan(20, 10); |
248 | 0 | int num_tablets_per_table = NonTsanVsTsan(3, 1); |
249 | 0 | auto session = NewSession(); |
250 | 0 | for (int i = 0; i < num_tables; i++) { |
251 | 0 | YBSchemaBuilder b; |
252 | 0 | InitSchemaBuilder(&b); |
253 | 0 | CompleteSchemaBuilder(&b); |
254 | 0 | TableHandle table; |
255 | 0 | client::YBTableName table_name( |
256 | 0 | YQL_DATABASE_CQL, "my_keyspace", "ql_client_test_table_" + std::to_string(i)); |
257 | 0 | ASSERT_OK(table.Create(table_name, num_tablets_per_table, client_.get(), &b)); |
258 | |
|
259 | 0 | int num_rows = num_tablets_per_table * 5; |
260 | 0 | for (int key = i; key < i + num_rows; key++) { |
261 | 0 | string value = "value_" + std::to_string(key); |
262 | 0 | ASSERT_OK(WriteRow(session, table, key, value)); |
263 | 0 | auto read_value = ASSERT_RESULT(ReadRow(session, table, key)); |
264 | 0 | ASSERT_EQ(read_value.string_value(), value) << read_value.ToString(); |
265 | 0 | } |
266 | 0 | } |
267 | 0 | } |
268 | | |
269 | | bool QLStressTest::CheckRetryableRequestsCountsAndLeaders( |
270 | 0 | size_t expected_leaders, size_t* total_entries) { |
271 | 0 | size_t total_leaders = 0; |
272 | 0 | *total_entries = 0; |
273 | 0 | bool result = true; |
274 | 0 | size_t replicated_limit = FLAGS_detect_duplicates_for_retryable_requests ? 1 : 0; |
275 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll); |
276 | 0 | for (const auto& peer : peers) { |
277 | 0 | auto leader = peer->LeaderStatus() != consensus::LeaderStatus::NOT_LEADER; |
278 | 0 | if (!peer->tablet() || peer->tablet()->metadata()->table_id() != table_.table()->id()) { |
279 | 0 | continue; |
280 | 0 | } |
281 | 0 | size_t tablet_entries = peer->tablet()->TEST_CountRegularDBRecords(); |
282 | 0 | auto raft_consensus = down_cast<consensus::RaftConsensus*>(peer->consensus()); |
283 | 0 | auto request_counts = raft_consensus->TEST_CountRetryableRequests(); |
284 | 0 | LOG(INFO) << "T " << peer->tablet()->tablet_id() << " P " << peer->permanent_uuid() |
285 | 0 | << ", entries: " << tablet_entries |
286 | 0 | << ", running: " << request_counts.running |
287 | 0 | << ", replicated: " << request_counts.replicated |
288 | 0 | << ", leader: " << leader |
289 | 0 | << ", term: " << raft_consensus->LeaderTerm(); |
290 | 0 | if (leader) { |
291 | 0 | *total_entries += tablet_entries; |
292 | 0 | ++total_leaders; |
293 | 0 | } |
294 | | // Last write request could be rejected as duplicate, so followers would not be able to |
295 | | // cleanup replicated requests. |
296 | 0 | if (request_counts.running != 0 || (leader && request_counts.replicated > replicated_limit)) { |
297 | 0 | result = false; |
298 | 0 | } |
299 | 0 | } |
300 | |
|
301 | 0 | if (total_leaders != expected_leaders) { |
302 | 0 | LOG(INFO) << "Expected " << expected_leaders << " leaders, found " << total_leaders; |
303 | 0 | return false; |
304 | 0 | } |
305 | | |
306 | 0 | if (result && FLAGS_detect_duplicates_for_retryable_requests) { |
307 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); |
308 | 0 | for (const auto& peer : peers) { |
309 | 0 | if (peer->tablet()->metadata()->table_id() != table_.table()->id()) { |
310 | 0 | continue; |
311 | 0 | } |
312 | 0 | auto db = peer->tablet()->TEST_db(); |
313 | 0 | rocksdb::ReadOptions read_opts; |
314 | 0 | read_opts.query_id = rocksdb::kDefaultQueryId; |
315 | 0 | std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(read_opts)); |
316 | 0 | std::unordered_map<std::string, std::string> keys; |
317 | |
|
318 | 0 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { |
319 | 0 | Slice key = iter->key(); |
320 | 0 | EXPECT_OK(DocHybridTime::DecodeFromEnd(&key)); |
321 | 0 | auto emplace_result = keys.emplace(key.ToBuffer(), iter->key().ToBuffer()); |
322 | 0 | if (!emplace_result.second) { |
323 | 0 | LOG(ERROR) |
324 | 0 | << "Duplicate key: " << docdb::SubDocKey::DebugSliceToString(iter->key()) |
325 | 0 | << " vs " << docdb::SubDocKey::DebugSliceToString(emplace_result.first->second); |
326 | 0 | } |
327 | 0 | } |
328 | 0 | } |
329 | 0 | } |
330 | |
|
331 | 0 | return result; |
332 | 0 | } |
333 | | |
334 | 0 | TransactionManager QLStressTest::CreateTxnManager() { |
335 | 0 | server::ClockPtr clock(new server::HybridClock(WallClock())); |
336 | 0 | EXPECT_OK(clock->Init()); |
337 | 0 | return TransactionManager(client_.get(), clock, client::LocalTabletFilter()); |
338 | 0 | } |
339 | | |
340 | 0 | void QLStressTest::TestRetryWrites(bool restarts) { |
341 | 0 | const size_t kConcurrentWrites = 5; |
342 | | // Used only when table is transactional. |
343 | 0 | const double kTransactionalWriteProbability = 0.5; |
344 | |
|
345 | 0 | SetAtomicFlag(0.25, &FLAGS_TEST_respond_write_failed_probability); |
346 | |
|
347 | 0 | const bool transactional = table_.table()->schema().table_properties().is_transactional(); |
348 | 0 | boost::optional<TransactionManager> txn_manager; |
349 | 0 | if (transactional) { |
350 | 0 | txn_manager = CreateTxnManager(); |
351 | 0 | } |
352 | |
|
353 | 0 | TestThreadHolder thread_holder; |
354 | 0 | std::atomic<int32_t> key_source(0); |
355 | 0 | for (int i = 0; i != kConcurrentWrites; ++i) { |
356 | 0 | thread_holder.AddThreadFunctor( |
357 | 0 | [this, &key_source, &stop_requested = thread_holder.stop_flag(), |
358 | 0 | &txn_manager, kTransactionalWriteProbability] { |
359 | 0 | auto session = NewSession(); |
360 | 0 | while (!stop_requested.load(std::memory_order_acquire)) { |
361 | 0 | int32_t key = key_source.fetch_add(1, std::memory_order_acq_rel); |
362 | 0 | YBTransactionPtr txn; |
363 | 0 | if (txn_manager && |
364 | 0 | RandomActWithProbability(kTransactionalWriteProbability)) { |
365 | 0 | txn = std::make_shared<YBTransaction>(txn_manager.get_ptr()); |
366 | 0 | ASSERT_OK(txn->Init(IsolationLevel::SNAPSHOT_ISOLATION)); |
367 | 0 | session->SetTransaction(txn); |
368 | 0 | } else { |
369 | 0 | session->SetTransaction(nullptr); |
370 | 0 | } |
371 | |
|
372 | 0 | auto op = InsertRow(session, key, Format("value_$0", key)); |
373 | 0 | auto flush_status = session->FlushAndGetOpsErrors(); |
374 | 0 | const auto& status = flush_status.status; |
375 | 0 | if (status.ok()) { |
376 | 0 | ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK); |
377 | |
|
378 | 0 | if (txn) { |
379 | 0 | auto commit_status = txn->CommitFuture().get(); |
380 | 0 | if (!commit_status.ok()) { |
381 | 0 | LOG(INFO) << "Commit failed, key: " << key << ", txn: " << txn->id() |
382 | 0 | << ", commit failed: " << commit_status; |
383 | 0 | ASSERT_TRUE(commit_status.IsExpired()); |
384 | 0 | } |
385 | 0 | } |
386 | 0 | continue; |
387 | 0 | } |
388 | 0 | ASSERT_TRUE(status.IsIOError()) << "Status: " << AsString(status); |
389 | 0 | ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_RUNTIME_ERROR); |
390 | 0 | ASSERT_EQ(op->response().error_message(), "Duplicate request"); |
391 | 0 | } |
392 | 0 | }); |
393 | 0 | } |
394 | |
|
395 | 0 | if (restarts) { |
396 | 0 | thread_holder.AddThread(RestartsThread(cluster_.get(), 5s, &thread_holder.stop_flag())); |
397 | 0 | } |
398 | |
|
399 | 0 | thread_holder.WaitAndStop(restarts ? 60s : 15s); |
400 | |
|
401 | 0 | int written_keys = key_source.load(std::memory_order_acquire); |
402 | 0 | auto session = NewSession(); |
403 | 0 | for (int key = 0; key != written_keys; ++key) { |
404 | 0 | auto value = ASSERT_RESULT(ReadRow(session, key)); |
405 | 0 | ASSERT_EQ(value.string_value(), Format("value_$0", key)); |
406 | 0 | } |
407 | |
|
408 | 0 | size_t total_entries = 0; |
409 | 0 | size_t expected_leaders = table_.table()->GetPartitionCount(); |
410 | 0 | ASSERT_OK(WaitFor( |
411 | 0 | std::bind(&QLStressTest::CheckRetryableRequestsCountsAndLeaders, this, |
412 | 0 | expected_leaders, &total_entries), |
413 | 0 | 15s, "Retryable requests cleanup and leader wait")); |
414 | | |
415 | | // We have 2 entries per row. |
416 | 0 | if (FLAGS_detect_duplicates_for_retryable_requests) { |
417 | 0 | ASSERT_EQ(total_entries, written_keys * 2); |
418 | 0 | } else { |
419 | | // If duplicate request tracking is disabled, then total_entries should be greater than |
420 | | // written keys, otherwise test does not work. |
421 | 0 | ASSERT_GT(total_entries, written_keys * 2); |
422 | 0 | } |
423 | |
|
424 | 0 | ASSERT_GE(written_keys, RegularBuildVsSanitizers(100, 40)); |
425 | 0 | } |
426 | | |
427 | 0 | TEST_F(QLStressTest, RetryWrites) { |
428 | 0 | FLAGS_detect_duplicates_for_retryable_requests = true; |
429 | 0 | TestRetryWrites(false /* restarts */); |
430 | 0 | } |
431 | | |
432 | 0 | TEST_F(QLStressTest, RetryWritesWithRestarts) { |
433 | 0 | FLAGS_detect_duplicates_for_retryable_requests = true; |
434 | 0 | TestRetryWrites(true /* restarts */); |
435 | 0 | } |
436 | | |
437 | 0 | void SetTransactional(YBSchemaBuilder* builder) { |
438 | 0 | TableProperties table_properties; |
439 | 0 | table_properties.SetTransactional(true); |
440 | 0 | builder->SetTableProperties(table_properties); |
441 | 0 | } |
442 | | |
443 | | class QLTransactionalStressTest : public QLStressTest { |
444 | | public: |
445 | 2 | void SetUp() override { |
446 | 2 | FLAGS_transaction_rpc_timeout_ms = |
447 | 2 | std::chrono::duration_cast<std::chrono::milliseconds>(1min).count(); |
448 | 2 | FLAGS_transaction_max_missed_heartbeat_periods = 1000000; |
449 | 2 | FLAGS_retryable_request_range_time_limit_secs = 600; |
450 | 2 | ASSERT_NO_FATALS(QLStressTest::SetUp()); |
451 | 2 | } |
452 | | |
453 | 0 | void CompleteSchemaBuilder(YBSchemaBuilder* builder) override { |
454 | 0 | SetTransactional(builder); |
455 | 0 | } |
456 | | }; |
457 | | |
458 | 0 | TEST_F_EX(QLStressTest, RetryTransactionalWrites, QLTransactionalStressTest) { |
459 | 0 | FLAGS_detect_duplicates_for_retryable_requests = true; |
460 | 0 | TestRetryWrites(false /* restarts */); |
461 | 0 | } |
462 | | |
463 | 0 | TEST_F_EX(QLStressTest, RetryTransactionalWritesWithRestarts, QLTransactionalStressTest) { |
464 | 0 | FLAGS_detect_duplicates_for_retryable_requests = true; |
465 | 0 | TestRetryWrites(true /* restarts */); |
466 | 0 | } |
467 | | |
468 | 0 | TEST_F(QLStressTest, RetryWritesDisabled) { |
469 | 0 | FLAGS_detect_duplicates_for_retryable_requests = false; |
470 | 0 | TestRetryWrites(false /* restarts */); |
471 | 0 | } |
472 | | |
473 | | class QLStressTestIntValue : public QLStressTest { |
474 | | private: |
475 | 0 | void InitSchemaBuilder(YBSchemaBuilder* builder) override { |
476 | 0 | builder->AddColumn("h")->Type(INT32)->HashPrimaryKey()->NotNull(); |
477 | 0 | builder->AddColumn(kValueColumn)->Type(INT64); |
478 | 0 | } |
479 | | }; |
480 | | |
481 | | // This test does 100 concurrent increments of the same row. |
482 | | // It is expected that resulting value will be equal to 100. |
483 | 0 | TEST_F_EX(QLStressTest, Increment, QLStressTestIntValue) { |
484 | 0 | const auto kIncrements = 100; |
485 | 0 | const auto kKey = 1; |
486 | |
|
487 | 0 | auto session = NewSession(); |
488 | 0 | { |
489 | 0 | auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
490 | 0 | auto* const req = op->mutable_request(); |
491 | 0 | QLAddInt32HashValue(req, kKey); |
492 | 0 | table_.AddInt64ColumnValue(req, kValueColumn, 0); |
493 | 0 | ASSERT_OK(session->ApplyAndFlush(op)); |
494 | 0 | ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK); |
495 | 0 | } |
496 | |
|
497 | 0 | std::vector<YBqlWriteOpPtr> write_ops; |
498 | 0 | std::vector<std::shared_future<FlushStatus>> futures; |
499 | |
|
500 | 0 | auto value_column_id = table_.ColumnId(kValueColumn); |
501 | 0 | for (int i = 0; i != kIncrements; ++i) { |
502 | 0 | auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_UPDATE); |
503 | 0 | auto* const req = op->mutable_request(); |
504 | 0 | QLAddInt32HashValue(req, kKey); |
505 | 0 | req->mutable_column_refs()->add_ids(value_column_id); |
506 | 0 | auto* column_value = req->add_column_values(); |
507 | 0 | column_value->set_column_id(value_column_id); |
508 | 0 | auto* bfcall = column_value->mutable_expr()->mutable_bfcall(); |
509 | 0 | bfcall->set_opcode(to_underlying(bfql::BFOpcode::OPCODE_AddI64I64_80)); |
510 | 0 | bfcall->add_operands()->set_column_id(value_column_id); |
511 | 0 | bfcall->add_operands()->mutable_value()->set_int64_value(1); |
512 | 0 | write_ops.push_back(op); |
513 | 0 | } |
514 | |
|
515 | 0 | for (const auto& op : write_ops) { |
516 | 0 | session->Apply(op); |
517 | 0 | futures.push_back(session->FlushFuture()); |
518 | 0 | } |
519 | |
|
520 | 0 | for (size_t i = 0; i != write_ops.size(); ++i) { |
521 | 0 | ASSERT_OK(futures[i].get().status); |
522 | 0 | ASSERT_EQ(write_ops[i]->response().status(), QLResponsePB::YQL_STATUS_OK); |
523 | 0 | } |
524 | |
|
525 | 0 | auto value = ASSERT_RESULT(ReadRow(session, kKey)); |
526 | 0 | ASSERT_EQ(value.int64_value(), kIncrements) << value.ToString(); |
527 | 0 | } |
528 | | |
529 | | class QLStressTestSingleTablet : public QLStressTest { |
530 | | private: |
531 | 0 | int NumTablets() override { |
532 | 0 | return 1; |
533 | 0 | } |
534 | | }; |
535 | | |
536 | | // This test has the following scenario: |
537 | | // Add some operations to the old leader, but don't add to other nodes. |
538 | | // Switch leadership to a new leader, but don't accept updates from new leader by old leader. |
539 | | // Also don't replicate no op by the new leader. |
540 | | // Switch leadership back to the old leader. |
541 | | // New leader should successfully accept old operations from old leader. |
542 | 0 | TEST_F_EX(QLStressTest, ShortTimeLeaderDoesNotReplicateNoOp, QLStressTestSingleTablet) { |
543 | 0 | auto session = NewSession(); |
544 | 0 | ASSERT_OK(WriteRow(session, 0, "value0")); |
545 | |
|
546 | 0 | auto leaders = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); |
547 | 0 | ASSERT_EQ(1, leaders.size()); |
548 | 0 | auto old_leader = leaders[0]; |
549 | |
|
550 | 0 | auto followers = ListTabletPeers(cluster_.get(), ListPeersFilter::kNonLeaders); |
551 | 0 | ASSERT_EQ(2, followers.size()); |
552 | 0 | tablet::TabletPeerPtr temp_leader = followers[0]; |
553 | 0 | tablet::TabletPeerPtr always_follower = followers[1]; |
554 | |
|
555 | 0 | ASSERT_OK(WaitFor([old_leader, always_follower]() -> Result<bool> { |
556 | 0 | auto leader_op_id = old_leader->consensus()->GetLastReceivedOpId(); |
557 | 0 | auto follower_op_id = always_follower->consensus()->GetLastReceivedOpId(); |
558 | 0 | return follower_op_id == leader_op_id; |
559 | 0 | }, 5s, "Follower catch up")); |
560 | |
|
561 | 0 | for (const auto& follower : followers) { |
562 | 0 | down_cast<consensus::RaftConsensus*>(follower->consensus())->TEST_RejectMode( |
563 | 0 | consensus::RejectMode::kAll); |
564 | 0 | } |
565 | |
|
566 | 0 | InsertRow(session, 1, "value1"); |
567 | 0 | auto flush_future = session->FlushFuture(); |
568 | |
|
569 | 0 | InsertRow(session, 2, "value2"); |
570 | 0 | auto flush_future2 = session->FlushFuture(); |
571 | | |
572 | | // Give leader some time to receive operation. |
573 | | // TODO wait for specific event. |
574 | 0 | std::this_thread::sleep_for(1s); |
575 | |
|
576 | 0 | LOG(INFO) << "Step down old leader " << old_leader->permanent_uuid() |
577 | 0 | << " in favor of " << temp_leader->permanent_uuid(); |
578 | |
|
579 | 0 | ASSERT_OK(StepDown(old_leader, temp_leader->permanent_uuid(), ForceStepDown::kFalse)); |
580 | |
|
581 | 0 | down_cast<consensus::RaftConsensus*>(old_leader->consensus())->TEST_RejectMode( |
582 | 0 | consensus::RejectMode::kAll); |
583 | 0 | down_cast<consensus::RaftConsensus*>(temp_leader->consensus())->TEST_RejectMode( |
584 | 0 | consensus::RejectMode::kNone); |
585 | 0 | down_cast<consensus::RaftConsensus*>(always_follower->consensus())->TEST_RejectMode( |
586 | 0 | consensus::RejectMode::kNonEmpty); |
587 | |
|
588 | 0 | ASSERT_OK(WaitForLeaderOfSingleTablet( |
589 | 0 | cluster_.get(), temp_leader, 20s, "Waiting for new leader")); |
590 | | |
591 | | // Give new leader some time to request lease. |
592 | | // TODO wait for specific event. |
593 | 0 | std::this_thread::sleep_for(3s); |
594 | 0 | auto temp_leader_safe_time = ASSERT_RESULT( |
595 | 0 | temp_leader->tablet()->SafeTime(tablet::RequireLease::kTrue)); |
596 | 0 | LOG(INFO) << "Safe time: " << temp_leader_safe_time; |
597 | |
|
598 | 0 | LOG(INFO) << "Transferring leadership from " << temp_leader->permanent_uuid() |
599 | 0 | << " back to " << old_leader->permanent_uuid(); |
600 | 0 | ASSERT_OK(StepDown(temp_leader, old_leader->permanent_uuid(), ForceStepDown::kTrue)); |
601 | |
|
602 | 0 | ASSERT_OK(WaitForLeaderOfSingleTablet( |
603 | 0 | cluster_.get(), old_leader, 20s, "Waiting old leader to restore leadership")); |
604 | |
|
605 | 0 | down_cast<consensus::RaftConsensus*>(always_follower->consensus())->TEST_RejectMode( |
606 | 0 | consensus::RejectMode::kNone); |
607 | |
|
608 | 0 | ASSERT_OK(WriteRow(session, 3, "value3")); |
609 | |
|
610 | 0 | ASSERT_OK(flush_future.get().status); |
611 | 0 | ASSERT_OK(flush_future2.get().status); |
612 | 0 | } |
613 | | |
614 | | namespace { |
615 | | |
616 | 0 | void VerifyFlushedFrontier(const UserFrontierPtr& frontier, OpId* op_id) { |
617 | 0 | ASSERT_TRUE(frontier); |
618 | 0 | if (frontier) { |
619 | 0 | *op_id = down_cast<docdb::ConsensusFrontier*>(frontier.get())->op_id(); |
620 | 0 | ASSERT_GT(op_id->term, 0); |
621 | 0 | ASSERT_GT(op_id->index, 0); |
622 | 0 | } |
623 | 0 | } |
624 | | |
625 | | } // anonymous namespace |
626 | 0 | void QLStressTest::VerifyFlushedFrontiers() { |
627 | 0 | for (const auto& mini_tserver : cluster_->mini_tablet_servers()) { |
628 | 0 | auto peers = mini_tserver->server()->tablet_manager()->GetTabletPeers(); |
629 | 0 | for (const auto& peer : peers) { |
630 | 0 | rocksdb::DB* db = peer->tablet()->TEST_db(); |
631 | 0 | OpId op_id; |
632 | 0 | ASSERT_NO_FATALS(VerifyFlushedFrontier(db->GetFlushedFrontier(), &op_id)); |
633 | | |
634 | | // Also check that if we checkpoint this DB and open the checkpoint separately, the |
635 | | // flushed frontier non-zero as well. |
636 | 0 | std::string checkpoint_dir; |
637 | 0 | ASSERT_OK(Env::Default()->GetTestDirectory(&checkpoint_dir)); |
638 | 0 | checkpoint_dir += Format("/checkpoint_$0", checkpoint_index_); |
639 | 0 | checkpoint_index_++; |
640 | |
|
641 | 0 | ASSERT_OK(CreateCheckpoint(db, checkpoint_dir)); |
642 | |
|
643 | 0 | rocksdb::Options options; |
644 | 0 | auto tablet_options = TabletOptions(); |
645 | 0 | tablet_options.rocksdb_env = db->GetEnv(); |
646 | 0 | InitRocksDBOptions(&options, "", nullptr, tablet_options); |
647 | 0 | std::unique_ptr<rocksdb::DB> checkpoint_db; |
648 | 0 | rocksdb::DB* checkpoint_db_raw_ptr = nullptr; |
649 | |
|
650 | 0 | options.create_if_missing = false; |
651 | 0 | ASSERT_OK(rocksdb::DB::Open(options, checkpoint_dir, &checkpoint_db_raw_ptr)); |
652 | 0 | checkpoint_db.reset(checkpoint_db_raw_ptr); |
653 | 0 | OpId checkpoint_op_id; |
654 | 0 | ASSERT_NO_FATALS( |
655 | 0 | VerifyFlushedFrontier(checkpoint_db->GetFlushedFrontier(), &checkpoint_op_id)); |
656 | 0 | ASSERT_OK(Env::Default()->DeleteRecursively(checkpoint_dir)); |
657 | |
|
658 | 0 | ASSERT_LE(op_id, checkpoint_op_id); |
659 | 0 | } |
660 | 0 | } |
661 | 0 | } |
662 | | |
663 | 0 | TEST_F_EX(QLStressTest, FlushCompact, QLStressTestSingleTablet) { |
664 | 0 | std::atomic<int> key; |
665 | |
|
666 | 0 | TestThreadHolder thread_holder; |
667 | |
|
668 | 0 | AddWriter("value_", &key, &thread_holder); |
669 | |
|
670 | 0 | auto start_time = MonoTime::Now(); |
671 | 0 | const auto kTimeout = MonoDelta::FromSeconds(60); |
672 | 0 | int num_iter = 0; |
673 | 0 | while (MonoTime::Now() - start_time < kTimeout) { |
674 | 0 | ++num_iter; |
675 | 0 | std::this_thread::sleep_for(1s); |
676 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
677 | 0 | ASSERT_NO_FATALS(VerifyFlushedFrontiers()); |
678 | 0 | std::this_thread::sleep_for(1s); |
679 | 0 | auto compact_status = cluster_->CompactTablets(); |
680 | 0 | LOG_IF(INFO, !compact_status.ok()) << "Compaction failed: " << compact_status; |
681 | 0 | ASSERT_NO_FATALS(VerifyFlushedFrontiers()); |
682 | 0 | } |
683 | 0 | ASSERT_GE(num_iter, 5); |
684 | 0 | } |
685 | | |
686 | | // The scenario of this test is the following: |
687 | | // We do writes in background. |
688 | | // Isolate leader for 10 seconds. |
689 | | // Restore connectivity. |
690 | | // Check that old leader was able to catch up after the partition is healed. |
691 | 0 | TEST_F_EX(QLStressTest, OldLeaderCatchUpAfterNetworkPartition, QLStressTestSingleTablet) { |
692 | 0 | FLAGS_TEST_combine_batcher_errors = true; |
693 | |
|
694 | 0 | tablet::TabletPeer* leader_peer = nullptr; |
695 | 0 | std::atomic<int> key(0); |
696 | 0 | { |
697 | 0 | TestThreadHolder thread_holder; |
698 | |
|
699 | 0 | AddWriter("value_", &key, &thread_holder); |
700 | |
|
701 | 0 | std::this_thread::sleep_for(5s * yb::kTimeMultiplier); |
702 | |
|
703 | 0 | tserver::MiniTabletServer* leader = nullptr; |
704 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
705 | 0 | auto current = cluster_->mini_tablet_server(i); |
706 | 0 | auto peers = current->server()->tablet_manager()->GetTabletPeers(); |
707 | 0 | ASSERT_EQ(peers.size(), 1); |
708 | 0 | if (peers.front()->LeaderStatus() != consensus::LeaderStatus::NOT_LEADER) { |
709 | 0 | leader = current; |
710 | 0 | leader_peer = peers.front().get(); |
711 | 0 | break; |
712 | 0 | } |
713 | 0 | } |
714 | |
|
715 | 0 | ASSERT_NE(leader, nullptr); |
716 | |
|
717 | 0 | auto pre_isolate_op_id = leader_peer->GetLatestLogEntryOpId(); |
718 | 0 | LOG(INFO) << "Isolate, last op id: " << pre_isolate_op_id << ", key: " << key; |
719 | 0 | ASSERT_GE(pre_isolate_op_id.term, 1); |
720 | 0 | ASSERT_GT(pre_isolate_op_id.index, key); |
721 | 0 | leader->Isolate(); |
722 | 0 | std::this_thread::sleep_for(10s * yb::kTimeMultiplier); |
723 | |
|
724 | 0 | auto pre_restore_op_id = leader_peer->GetLatestLogEntryOpId(); |
725 | 0 | LOG(INFO) << "Restore, last op id: " << pre_restore_op_id << ", key: " << key; |
726 | 0 | ASSERT_EQ(pre_restore_op_id.term, pre_isolate_op_id.term); |
727 | 0 | ASSERT_GE(pre_restore_op_id.index, pre_isolate_op_id.index); |
728 | 0 | ASSERT_LE(pre_restore_op_id.index, pre_isolate_op_id.index + 10); |
729 | 0 | ASSERT_OK(leader->Reconnect()); |
730 | |
|
731 | 0 | thread_holder.WaitAndStop(5s * yb::kTimeMultiplier); |
732 | 0 | } |
733 | |
|
734 | 0 | ASSERT_OK(WaitFor([leader_peer, &key] { |
735 | 0 | return leader_peer->GetLatestLogEntryOpId().index > key; |
736 | 0 | }, 5s, "Old leader has enough operations")); |
737 | |
|
738 | 0 | auto finish_op_id = leader_peer->GetLatestLogEntryOpId(); |
739 | 0 | LOG(INFO) << "Finish, last op id: " << finish_op_id << ", key: " << key; |
740 | 0 | ASSERT_GT(finish_op_id.term, 1); |
741 | 0 | ASSERT_GT(finish_op_id.index, key); |
742 | 0 | } |
743 | | |
744 | 0 | TEST_F_EX(QLStressTest, SlowUpdateConsensus, QLStressTestSingleTablet) { |
745 | 0 | std::atomic<int> key(0); |
746 | |
|
747 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kNonLeaders); |
748 | 0 | ASSERT_EQ(peers.size(), 2); |
749 | |
|
750 | 0 | down_cast<consensus::RaftConsensus*>(peers[0]->consensus())->TEST_DelayUpdate(20s); |
751 | |
|
752 | 0 | TestThreadHolder thread_holder; |
753 | 0 | AddWriter(std::string(100_KB, 'X'), &key, &thread_holder, 100ms); |
754 | |
|
755 | 0 | thread_holder.WaitAndStop(30s); |
756 | |
|
757 | 0 | down_cast<consensus::RaftConsensus*>(peers[0]->consensus())->TEST_DelayUpdate(0s); |
758 | |
|
759 | 0 | int64_t max_peak_consumption = 0; |
760 | 0 | for (size_t i = 1; i <= cluster_->num_tablet_servers(); ++i) { |
761 | 0 | auto server_tracker = MemTracker::FindTracker(Format("server $0", i)); |
762 | 0 | auto call_tracker = MemTracker::FindTracker("Call", server_tracker); |
763 | 0 | auto inbound_rpc_tracker = MemTracker::FindTracker("Inbound RPC", call_tracker); |
764 | 0 | max_peak_consumption = std::max(max_peak_consumption, inbound_rpc_tracker->peak_consumption()); |
765 | 0 | } |
766 | 0 | LOG(INFO) << "Peak consumption: " << max_peak_consumption; |
767 | 0 | ASSERT_LE(max_peak_consumption, 150_MB); |
768 | 0 | } |
769 | | |
770 | | template <int kSoftLimit, int kHardLimit> |
771 | | class QLStressTestDelayWrite : public QLStressTestSingleTablet { |
772 | | public: |
773 | 2 | void SetUp() override { |
774 | 2 | FLAGS_db_write_buffer_size = 1_KB; |
775 | 2 | FLAGS_sst_files_soft_limit = kSoftLimit; |
776 | 2 | FLAGS_sst_files_hard_limit = kHardLimit; |
777 | 2 | FLAGS_rocksdb_level0_file_num_compaction_trigger = 6; |
778 | 2 | FLAGS_rocksdb_universal_compaction_min_merge_width = 2; |
779 | 2 | FLAGS_rocksdb_universal_compaction_size_ratio = 1000; |
780 | 2 | QLStressTestSingleTablet::SetUp(); |
781 | 2 | } _ZN2yb6client22QLStressTestDelayWriteILi4ELi10EE5SetUpEv Line | Count | Source | 773 | 1 | void SetUp() override { | 774 | 1 | FLAGS_db_write_buffer_size = 1_KB; | 775 | 1 | FLAGS_sst_files_soft_limit = kSoftLimit; | 776 | 1 | FLAGS_sst_files_hard_limit = kHardLimit; | 777 | 1 | FLAGS_rocksdb_level0_file_num_compaction_trigger = 6; | 778 | 1 | FLAGS_rocksdb_universal_compaction_min_merge_width = 2; | 779 | 1 | FLAGS_rocksdb_universal_compaction_size_ratio = 1000; | 780 | 1 | QLStressTestSingleTablet::SetUp(); | 781 | 1 | } |
_ZN2yb6client22QLStressTestDelayWriteILi6ELi6EE5SetUpEv Line | Count | Source | 773 | 1 | void SetUp() override { | 774 | 1 | FLAGS_db_write_buffer_size = 1_KB; | 775 | 1 | FLAGS_sst_files_soft_limit = kSoftLimit; | 776 | 1 | FLAGS_sst_files_hard_limit = kHardLimit; | 777 | 1 | FLAGS_rocksdb_level0_file_num_compaction_trigger = 6; | 778 | 1 | FLAGS_rocksdb_universal_compaction_min_merge_width = 2; | 779 | 1 | FLAGS_rocksdb_universal_compaction_size_ratio = 1000; | 780 | 1 | QLStressTestSingleTablet::SetUp(); | 781 | 1 | } |
|
782 | | |
783 | 0 | client::YBSessionPtr NewSession() override { |
784 | 0 | auto result = QLStressTestSingleTablet::NewSession(); |
785 | 0 | result->SetTimeout(5s); |
786 | 0 | return result; |
787 | 0 | } Unexecuted instantiation: _ZN2yb6client22QLStressTestDelayWriteILi4ELi10EE10NewSessionEv Unexecuted instantiation: _ZN2yb6client22QLStressTestDelayWriteILi6ELi6EE10NewSessionEv |
788 | | }; |
789 | | |
790 | | void QLStressTest::AddWriter( |
791 | | std::string value_prefix, std::atomic<int>* key, TestThreadHolder* thread_holder, |
792 | | const std::chrono::nanoseconds& sleep_duration, |
793 | 0 | bool allow_failures, TransactionManager* txn_manager, double transactional_write_probability) { |
794 | 0 | thread_holder->AddThreadFunctor([this, &stop = thread_holder->stop_flag(), key, sleep_duration, |
795 | 0 | value_prefix = std::move(value_prefix), allow_failures, |
796 | 0 | txn_manager, transactional_write_probability] { |
797 | 0 | auto session = NewSession(); |
798 | 0 | session->SetRejectionScoreSource(std::make_shared<RejectionScoreSource>()); |
799 | 0 | ASSERT_TRUE(txn_manager || transactional_write_probability == 0.0); |
800 | |
|
801 | 0 | while (!stop.load(std::memory_order_acquire)) { |
802 | 0 | YBTransactionPtr txn; |
803 | 0 | if (txn_manager && RandomActWithProbability(transactional_write_probability)) { |
804 | 0 | txn = std::make_shared<YBTransaction>(txn_manager); |
805 | 0 | ASSERT_OK(txn->Init(IsolationLevel::SNAPSHOT_ISOLATION)); |
806 | 0 | session->SetTransaction(txn); |
807 | 0 | } else { |
808 | 0 | session->SetTransaction(nullptr); |
809 | 0 | } |
810 | 0 | auto new_key = *key + 1; |
811 | 0 | auto status = WriteRow(session, new_key, value_prefix + std::to_string(new_key)); |
812 | 0 | if (status.ok() && txn) { |
813 | 0 | status = txn->CommitFuture().get(); |
814 | 0 | } |
815 | |
|
816 | 0 | if (!allow_failures) { |
817 | 0 | ASSERT_OK(status); |
818 | 0 | } else if (!status.ok()) { |
819 | 0 | LOG(WARNING) << "Write failed: " << status; |
820 | 0 | } |
821 | 0 | if (status.ok()) { |
822 | 0 | ++*key; |
823 | 0 | } |
824 | 0 | if (sleep_duration.count() > 0) { |
825 | 0 | std::this_thread::sleep_for(sleep_duration); |
826 | 0 | } |
827 | 0 | } |
828 | 0 | }); |
829 | 0 | } |
830 | | |
831 | 0 | void QLStressTest::TestWriteRejection() { |
832 | 0 | constexpr int kWriters = IsDebug() ? 10 : 20; |
833 | 0 | constexpr int kKeyBase = 10000; |
834 | |
|
835 | 0 | std::array<std::atomic<int>, kWriters> keys; |
836 | |
|
837 | 0 | const std::string value_prefix = std::string(1_KB, 'X'); |
838 | |
|
839 | 0 | TestThreadHolder thread_holder; |
840 | 0 | for (int i = 0; i != kWriters; ++i) { |
841 | 0 | keys[i] = i * kKeyBase; |
842 | 0 | AddWriter(value_prefix, &keys[i], &thread_holder, 0s, true /* allow_failures */); |
843 | 0 | } |
844 | |
|
845 | 0 | thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag(), &keys, &value_prefix] { |
846 | 0 | auto session = NewSession(); |
847 | 0 | while (!stop.load(std::memory_order_acquire)) { |
848 | 0 | int idx = RandomUniformInt(0, kWriters - 1); |
849 | 0 | auto had_key = keys[idx].load(std::memory_order_acquire); |
850 | 0 | if (had_key == kKeyBase * idx) { |
851 | 0 | std::this_thread::sleep_for(50ms); |
852 | 0 | continue; |
853 | 0 | } |
854 | 0 | auto value = ASSERT_RESULT(ReadRow(session, had_key)).string_value(); |
855 | 0 | ASSERT_EQ(value, value_prefix + std::to_string(had_key)); |
856 | 0 | } |
857 | 0 | }); |
858 | |
|
859 | 0 | int last_keys_written = 0; |
860 | 0 | int first_keys_written_after_rejections_started_to_appear = -1; |
861 | 0 | auto last_keys_written_update_time = CoarseMonoClock::now(); |
862 | 0 | uint64_t last_rejections = 0; |
863 | 0 | bool has_writes_after_rejections = false; |
864 | 0 | for (;;) { |
865 | 0 | std::this_thread::sleep_for(1s); |
866 | 0 | int keys_written = 0; |
867 | 0 | for (int i = 0; i != kWriters; ++i) { |
868 | 0 | keys_written += keys[i].load() - kKeyBase * i; |
869 | 0 | } |
870 | 0 | LOG(INFO) << "Total keys written: " << keys_written; |
871 | 0 | if (keys_written == last_keys_written) { |
872 | 0 | ASSERT_LE(CoarseMonoClock::now() - last_keys_written_update_time, 20s); |
873 | 0 | continue; |
874 | 0 | } |
875 | 0 | if (last_rejections != 0) { |
876 | 0 | if (first_keys_written_after_rejections_started_to_appear < 0) { |
877 | 0 | first_keys_written_after_rejections_started_to_appear = keys_written; |
878 | 0 | } else if (keys_written > first_keys_written_after_rejections_started_to_appear) { |
879 | 0 | has_writes_after_rejections = true; |
880 | 0 | } |
881 | 0 | } |
882 | 0 | last_keys_written = keys_written; |
883 | 0 | last_keys_written_update_time = CoarseMonoClock::now(); |
884 | |
|
885 | 0 | uint64_t total_rejections = 0; |
886 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) { |
887 | 0 | int64_t rejections = 0; |
888 | 0 | auto peers = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers(); |
889 | 0 | for (const auto& peer : peers) { |
890 | 0 | auto counter = METRIC_majority_sst_files_rejections.Instantiate( |
891 | 0 | peer->tablet()->GetTabletMetricsEntity()); |
892 | 0 | rejections += counter->value(); |
893 | 0 | } |
894 | 0 | total_rejections += rejections; |
895 | 0 | } |
896 | 0 | LOG(INFO) << "Total rejections: " << total_rejections; |
897 | 0 | last_rejections = total_rejections; |
898 | |
|
899 | 0 | if (keys_written >= RegularBuildVsSanitizers(1000, 100) && |
900 | 0 | (IsSanitizer() || total_rejections >= 10) && |
901 | 0 | has_writes_after_rejections) { |
902 | 0 | break; |
903 | 0 | } |
904 | 0 | } |
905 | |
|
906 | 0 | thread_holder.Stop(); |
907 | |
|
908 | 0 | ASSERT_OK(WaitFor([cluster = cluster_.get()] { |
909 | 0 | auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll); |
910 | 0 | OpId first_op_id; |
911 | 0 | for (const auto& peer : peers) { |
912 | 0 | if (!peer->consensus()) { |
913 | 0 | return false; |
914 | 0 | } |
915 | 0 | auto current = peer->consensus()->GetLastCommittedOpId(); |
916 | 0 | if (!first_op_id) { |
917 | 0 | first_op_id = current; |
918 | 0 | } else if (current != first_op_id) { |
919 | 0 | return false; |
920 | 0 | } |
921 | 0 | } |
922 | 0 | return true; |
923 | 0 | }, 30s, "Waiting tablets to sync up")); |
924 | 0 | } |
925 | | |
926 | | typedef QLStressTestDelayWrite<4, 10> QLStressTestDelayWrite_4_10; |
927 | | |
928 | 0 | TEST_F_EX(QLStressTest, DelayWrite, QLStressTestDelayWrite_4_10) { |
929 | 0 | TestWriteRejection(); |
930 | 0 | } |
931 | | |
932 | | // Soft limit == hard limit to test write stop and recover after it. |
933 | | typedef QLStressTestDelayWrite<6, 6> QLStressTestDelayWrite_6_6; |
934 | | |
935 | 0 | TEST_F_EX(QLStressTest, WriteStop, QLStressTestDelayWrite_6_6) { |
936 | 0 | TestWriteRejection(); |
937 | 0 | } |
938 | | |
939 | | class QLStressTestLongRemoteBootstrap : public QLStressTestSingleTablet { |
940 | | public: |
941 | 1 | void SetUp() override { |
942 | 1 | FLAGS_log_cache_size_limit_mb = 1; |
943 | 1 | FLAGS_log_segment_size_bytes = 96_KB; |
944 | 1 | QLStressTestSingleTablet::SetUp(); |
945 | 1 | } |
946 | | }; |
947 | | |
948 | 0 | TEST_F_EX(QLStressTest, LongRemoteBootstrap, QLStressTestLongRemoteBootstrap) { |
949 | 0 | FLAGS_log_min_seconds_to_retain = 1; |
950 | 0 | FLAGS_remote_bootstrap_rate_limit_bytes_per_sec = 1_MB; |
951 | |
|
952 | 0 | cluster_->mini_tablet_server(0)->Shutdown(); |
953 | |
|
954 | 0 | ASSERT_OK(WaitFor([this] { |
955 | 0 | auto leaders = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); |
956 | 0 | if (leaders.empty()) { |
957 | 0 | return false; |
958 | 0 | } |
959 | 0 | LOG(INFO) << "Tablet id: " << leaders.front()->tablet_id(); |
960 | 0 | return true; |
961 | 0 | }, 30s, "Leader elected")); |
962 | |
|
963 | 0 | std::atomic<int> key(0); |
964 | |
|
965 | 0 | TestThreadHolder thread_holder; |
966 | 0 | constexpr size_t kValueSize = 32_KB; |
967 | 0 | AddWriter(std::string(kValueSize, 'X'), &key, &thread_holder, 100ms); |
968 | |
|
969 | 0 | std::this_thread::sleep_for(20s); // Wait some time to have logs. |
970 | |
|
971 | 0 | ASSERT_OK(WaitFor([this]() -> Result<bool> { |
972 | 0 | RETURN_NOT_OK(cluster_->CleanTabletLogs()); |
973 | 0 | auto leaders = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); |
974 | 0 | if (leaders.empty()) { |
975 | 0 | return false; |
976 | 0 | } |
977 | |
|
978 | 0 | RETURN_NOT_OK(leaders.front()->tablet()->Flush(tablet::FlushMode::kSync)); |
979 | 0 | RETURN_NOT_OK(leaders.front()->RunLogGC()); |
980 | | |
981 | | // Check that first log was garbage collected, so remote bootstrap will be required. |
982 | 0 | consensus::ReplicateMsgs replicates; |
983 | 0 | int64_t starting_op_segment_seq_num; |
984 | 0 | yb::SchemaPB schema; |
985 | 0 | uint32_t schema_version; |
986 | 0 | return !leaders.front()->log()->GetLogReader()->ReadReplicatesInRange( |
987 | 0 | 100, 101, 0, &replicates, &starting_op_segment_seq_num, &schema, &schema_version).ok(); |
988 | 0 | }, 30s, "Logs cleaned")); |
989 | |
|
990 | 0 | LOG(INFO) << "Bring replica back, keys written: " << key.load(std::memory_order_acquire); |
991 | 0 | ASSERT_OK(cluster_->mini_tablet_server(0)->Start()); |
992 | |
|
993 | 0 | thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] { |
994 | 0 | while (!stop.load(std::memory_order_acquire)) { |
995 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
996 | 0 | ASSERT_OK(cluster_->CleanTabletLogs()); |
997 | 0 | std::this_thread::sleep_for(100ms); |
998 | 0 | } |
999 | 0 | }); |
1000 | |
|
1001 | 0 | ASSERT_OK(WaitAllReplicasHaveIndex(cluster_.get(), key.load(std::memory_order_acquire), 40s)); |
1002 | 0 | LOG(INFO) << "All replicas ready"; |
1003 | |
|
1004 | 0 | ASSERT_OK(WaitFor([this] { |
1005 | 0 | bool result = true; |
1006 | 0 | auto followers = ListTabletPeers(cluster_.get(), ListPeersFilter::kNonLeaders); |
1007 | 0 | LOG(INFO) << "Num followers: " << followers.size(); |
1008 | 0 | for (const auto& peer : followers) { |
1009 | 0 | auto log_cache_size = peer->raft_consensus()->LogCacheSize(); |
1010 | 0 | LOG(INFO) << "T " << peer->tablet_id() << " P " << peer->permanent_uuid() |
1011 | 0 | << ", log cache size: " << log_cache_size; |
1012 | 0 | if (log_cache_size != 0) { |
1013 | 0 | result = false; |
1014 | 0 | } |
1015 | 0 | } |
1016 | 0 | return result; |
1017 | 0 | }, 5s, "All followers cleanup cache")); |
1018 | | |
1019 | | // Write some more values and check that replica still in touch. |
1020 | 0 | std::this_thread::sleep_for(5s); |
1021 | 0 | ASSERT_OK(WaitAllReplicasHaveIndex(cluster_.get(), key.load(std::memory_order_acquire), 1s)); |
1022 | 0 | } |
1023 | | |
1024 | | class QLStressDynamicCompactionPriorityTest : public QLStressTest { |
1025 | | public: |
1026 | 1 | void SetUp() override { |
1027 | 1 | FLAGS_allow_preempting_compactions = true; |
1028 | 1 | FLAGS_db_write_buffer_size = 16_KB; |
1029 | 1 | FLAGS_enable_ondisk_compression = false; |
1030 | 1 | FLAGS_rocksdb_max_background_compactions = 1; |
1031 | 1 | FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec = 160_KB; |
1032 | 1 | QLStressTest::SetUp(); |
1033 | 1 | } |
1034 | | |
1035 | 0 | int NumTablets() override { |
1036 | 0 | return 1; |
1037 | 0 | } |
1038 | | |
1039 | 0 | void InitSchemaBuilder(YBSchemaBuilder* builder) override { |
1040 | 0 | builder->AddColumn("h")->Type(INT32)->HashPrimaryKey()->NotNull(); |
1041 | 0 | builder->AddColumn(kValueColumn)->Type(STRING); |
1042 | 0 | } |
1043 | | }; |
1044 | | |
1045 | 0 | TEST_F_EX(QLStressTest, DynamicCompactionPriority, QLStressDynamicCompactionPriorityTest) { |
1046 | 0 | YBSchemaBuilder b; |
1047 | 0 | InitSchemaBuilder(&b); |
1048 | 0 | CompleteSchemaBuilder(&b); |
1049 | |
|
1050 | 0 | TableHandle table2; |
1051 | 0 | ASSERT_OK(table2.Create(YBTableName(kTableName.namespace_type(), |
1052 | 0 | kTableName.namespace_name(), |
1053 | 0 | kTableName.table_name() + "_2"), |
1054 | 0 | NumTablets(), client_.get(), &b)); |
1055 | |
|
1056 | 0 | TestThreadHolder thread_holder; |
1057 | 0 | thread_holder.AddThreadFunctor([this, &table2, &stop = thread_holder.stop_flag()] { |
1058 | 0 | auto session = NewSession(); |
1059 | 0 | int key = 1; |
1060 | 0 | std::string value(FLAGS_db_write_buffer_size, 'X'); |
1061 | 0 | int left_writes_to_current_table = 0; |
1062 | 0 | TableHandle* table = nullptr; |
1063 | 0 | while (!stop.load(std::memory_order_acquire)) { |
1064 | 0 | if (left_writes_to_current_table == 0) { |
1065 | 0 | table = RandomUniformBool() ? &table_ : &table2; |
1066 | 0 | left_writes_to_current_table = RandomUniformInt(1, std::max(1, key / 5)); |
1067 | 0 | } else { |
1068 | 0 | --left_writes_to_current_table; |
1069 | 0 | } |
1070 | 0 | const auto op = table->NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
1071 | 0 | auto* const req = op->mutable_request(); |
1072 | 0 | QLAddInt32HashValue(req, key); |
1073 | 0 | table_.AddStringColumnValue(req, kValueColumn, value); |
1074 | 0 | session->Apply(op); |
1075 | 0 | ASSERT_OK(session->Flush()); |
1076 | 0 | ASSERT_OK(CheckOp(op.get())); |
1077 | 0 | std::this_thread::sleep_for(100ms); |
1078 | 0 | ++key; |
1079 | 0 | } |
1080 | 0 | }); |
1081 | |
|
1082 | 0 | thread_holder.WaitAndStop(60s); |
1083 | |
|
1084 | 0 | auto delete_start = CoarseMonoClock::now(); |
1085 | 0 | ASSERT_OK(client_->DeleteTable(table_->id(), true)); |
1086 | 0 | MonoDelta delete_time(CoarseMonoClock::now() - delete_start); |
1087 | 0 | LOG(INFO) << "Delete time: " << delete_time; |
1088 | 0 | ASSERT_LE(delete_time, 10s); |
1089 | 0 | } |
1090 | | |
1091 | | class QLStressTestTransactionalSingleTablet : public QLStressTestSingleTablet { |
1092 | 0 | void CompleteSchemaBuilder(YBSchemaBuilder* builder) override { |
1093 | 0 | SetTransactional(builder); |
1094 | 0 | } |
1095 | | }; |
1096 | | |
1097 | | // Verify that we don't have too many write waiters. |
1098 | | // Uses FLAGS_TEST_max_write_waiters to fail debug check when there are too many waiters. |
1099 | 0 | TEST_F_EX(QLStressTest, RemoveIntentsDuringWrite, QLStressTestTransactionalSingleTablet) { |
1100 | 0 | FLAGS_TEST_max_write_waiters = 5; |
1101 | |
|
1102 | 0 | constexpr int kWriters = 10; |
1103 | 0 | constexpr int kKeyBase = 10000; |
1104 | |
|
1105 | 0 | std::array<std::atomic<int>, kWriters> keys; |
1106 | |
|
1107 | 0 | auto txn_manager = CreateTxnManager(); |
1108 | 0 | TestThreadHolder thread_holder; |
1109 | 0 | for (int i = 0; i != kWriters; ++i) { |
1110 | 0 | keys[i] = i * kKeyBase; |
1111 | 0 | AddWriter( |
1112 | 0 | "value_", &keys[i], &thread_holder, 0s, false /* allow_failures */, &txn_manager, 1.0); |
1113 | 0 | } |
1114 | |
|
1115 | 0 | thread_holder.WaitAndStop(3s); |
1116 | 0 | } |
1117 | | |
1118 | 0 | TEST_F_EX(QLStressTest, SyncOldLeader, QLStressTestSingleTablet) { |
1119 | 0 | FLAGS_raft_heartbeat_interval_ms = 100 * kTimeMultiplier; |
1120 | 0 | constexpr int kOldLeaderWriteKeys = 100; |
1121 | | // Should be less than amount of pending operations at the old leader. |
1122 | | // So it is much smaller than keys written to the old leader. |
1123 | 0 | constexpr int kNewLeaderWriteKeys = 10; |
1124 | |
|
1125 | 0 | TestThreadHolder thread_holder; |
1126 | |
|
1127 | 0 | client_->messenger()->TEST_SetOutboundIpBase(ASSERT_RESULT(HostToAddress("127.0.0.1"))); |
1128 | |
|
1129 | 0 | auto session = NewSession(); |
1130 | | // Perform write to make sure we have a leader. |
1131 | 0 | ASSERT_OK(WriteRow(session, 0, "value")); |
1132 | |
|
1133 | 0 | session->SetTimeout(10s); |
1134 | 0 | std::vector<std::future<FlushStatus>> futures; |
1135 | 0 | int key; |
1136 | 0 | for (key = 1; key <= kOldLeaderWriteKeys; ++key) { |
1137 | 0 | InsertRow(session, key, std::to_string(key)); |
1138 | 0 | futures.push_back(session->FlushFuture()); |
1139 | 0 | } |
1140 | |
|
1141 | 0 | auto old_leader = ASSERT_RESULT(ServerWithLeaders(cluster_.get())); |
1142 | 0 | LOG(INFO) << "Isolate old leader: " |
1143 | 0 | << cluster_->mini_tablet_server(old_leader)->server()->permanent_uuid(); |
1144 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
1145 | 0 | if (i != old_leader) { |
1146 | 0 | ASSERT_OK(SetupConnectivity(cluster_.get(), i, old_leader, Connectivity::kOff)); |
1147 | 0 | } |
1148 | 0 | } |
1149 | |
|
1150 | 0 | int written_to_new_leader = 0; |
1151 | 0 | while (written_to_new_leader < kNewLeaderWriteKeys) { |
1152 | 0 | ++key; |
1153 | 0 | auto write_status = WriteRow(session, key, std::to_string(key)); |
1154 | 0 | if (write_status.ok()) { |
1155 | 0 | ++written_to_new_leader; |
1156 | 0 | } else { |
1157 | | // Some writes could fail, while operations are being send to the old leader. |
1158 | 0 | LOG(INFO) << "Write " << key << " failed: " << write_status; |
1159 | 0 | } |
1160 | 0 | } |
1161 | |
|
1162 | 0 | auto peers = cluster_->GetTabletPeers(old_leader); |
1163 | | // Reject all non empty update consensuses, to activate consensus exponential backoff, |
1164 | | // and get into situation where leader sends empty request. |
1165 | 0 | for (const auto& peer : peers) { |
1166 | 0 | peer->raft_consensus()->TEST_RejectMode(consensus::RejectMode::kNonEmpty); |
1167 | 0 | } |
1168 | |
|
1169 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
1170 | 0 | if (i != old_leader) { |
1171 | 0 | ASSERT_OK(SetupConnectivity(cluster_.get(), i, old_leader, Connectivity::kOn)); |
1172 | 0 | } |
1173 | 0 | } |
1174 | | |
1175 | | // Wait until old leader receive update consensus with empty ops. |
1176 | 0 | std::this_thread::sleep_for(5s * kTimeMultiplier); |
1177 | |
|
1178 | 0 | for (const auto& peer : peers) { |
1179 | 0 | peer->raft_consensus()->TEST_RejectMode(consensus::RejectMode::kNone); |
1180 | 0 | } |
1181 | | |
1182 | | // Wait all writes to complete. |
1183 | 0 | for (auto& future : futures) { |
1184 | 0 | WARN_NOT_OK(future.get().status, "Write failed"); |
1185 | 0 | } |
1186 | |
|
1187 | 0 | thread_holder.Stop(); |
1188 | 0 | } |
1189 | | |
1190 | | } // namespace client |
1191 | | } // namespace yb |