/Users/deen/code/yugabyte-db/src/yb/integration-tests/load_generator.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/integration-tests/load_generator.h" |
15 | | |
16 | | #include <memory> |
17 | | #include <random> |
18 | | #include <thread> |
19 | | |
20 | | #include <boost/range/iterator_range.hpp> |
21 | | |
22 | | #include "yb/client/client.h" |
23 | | #include "yb/client/error.h" |
24 | | #include "yb/client/schema.h" |
25 | | #include "yb/client/session.h" |
26 | | #include "yb/client/table_handle.h" |
27 | | #include "yb/client/yb_op.h" |
28 | | |
29 | | #include "yb/common/common.pb.h" |
30 | | #include "yb/common/partial_row.h" |
31 | | #include "yb/common/ql_value.h" |
32 | | |
33 | | #include "yb/gutil/strings/split.h" |
34 | | #include "yb/gutil/strings/substitute.h" |
35 | | |
36 | | #include "yb/util/atomic.h" |
37 | | #include "yb/util/debug/leakcheck_disabler.h" |
38 | | #include "yb/util/net/sockaddr.h" |
39 | | #include "yb/util/result.h" |
40 | | #include "yb/util/status_log.h" |
41 | | |
42 | | #include "yb/yql/redis/redisserver/redis_client.h" |
43 | | |
44 | | using namespace std::literals; |
45 | | |
46 | | using std::atomic; |
47 | | using std::atomic_bool; |
48 | | using std::unique_ptr; |
49 | | |
50 | | using strings::Substitute; |
51 | | |
52 | | using std::shared_ptr; |
53 | | using yb::Status; |
54 | | using yb::ThreadPool; |
55 | | using yb::ThreadPoolBuilder; |
56 | | using yb::MonoDelta; |
57 | | using yb::MemoryOrder; |
58 | | using yb::ConditionVariable; |
59 | | using yb::Mutex; |
60 | | using yb::MutexLock; |
61 | | using yb::CountDownLatch; |
62 | | using yb::Slice; |
63 | | using yb::YBPartialRow; |
64 | | using yb::TableType; |
65 | | |
66 | | using yb::client::YBClient; |
67 | | using yb::client::YBError; |
68 | | using yb::client::YBNoOp; |
69 | | using yb::client::YBSession; |
70 | | using yb::client::YBTable; |
71 | | using yb::redisserver::RedisReply; |
72 | | |
73 | | DEFINE_bool(load_gen_verbose, |
74 | | false, |
75 | | "Custom verbose log messages for debugging the load test tool"); |
76 | | |
77 | | DEFINE_int32(load_gen_insertion_tracker_delay_ms, |
78 | | 50, |
79 | | "The interval (ms) at which the load generator's \"insertion tracker thread\" " |
80 | | "wakes in up "); |
81 | | |
82 | | DEFINE_int32(load_gen_scanner_open_retries, |
83 | | 10, |
84 | | "Number of times to re-try when opening a scanner"); |
85 | | |
86 | | DEFINE_int32(load_gen_wait_time_increment_step_ms, |
87 | | 100, |
88 | | "In retry loops used in the load test we increment the wait time by this number of " |
89 | | "milliseconds after every attempt."); |
90 | | |
91 | | namespace { |
92 | | |
93 | 0 | void ConfigureYBSession(YBSession* session) { |
94 | 0 | session->SetTimeout(60s); |
95 | 0 | } |
96 | | |
97 | 0 | string FormatWithSize(const string& s) { |
98 | 0 | return strings::Substitute("'$0' ($1 bytes)", s, s.size()); |
99 | 0 | } |
100 | | |
101 | | } // namespace |
102 | | |
103 | | namespace yb { |
104 | | namespace load_generator { |
105 | | |
106 | 0 | string FormatHexForLoadTestKey(uint64_t x) { |
107 | 0 | char buf[64]; |
108 | 0 | snprintf(buf, sizeof(buf) - 1, "%016" PRIx64, x); |
109 | 0 | return buf; |
110 | 0 | } |
111 | | |
112 | 0 | size_t KeyIndexSet::NumElements() const { |
113 | 0 | MutexLock l(mutex_); |
114 | 0 | return set_.size(); |
115 | 0 | } |
116 | | |
117 | 0 | void KeyIndexSet::Insert(int64_t key) { |
118 | 0 | MutexLock l(mutex_); |
119 | 0 | set_.insert(key); |
120 | 0 | } |
121 | | |
122 | 0 | bool KeyIndexSet::Contains(int64_t key) const { |
123 | 0 | MutexLock l(mutex_); |
124 | 0 | return set_.find(key) != set_.end(); |
125 | 0 | } |
126 | | |
127 | 0 | bool KeyIndexSet::RemoveIfContains(int64_t key) { |
128 | 0 | MutexLock l(mutex_); |
129 | 0 | set<int64>::iterator it = set_.find(key); |
130 | 0 | if (it == set_.end()) { |
131 | 0 | return false; |
132 | 0 | } else { |
133 | 0 | set_.erase(it); |
134 | 0 | return true; |
135 | 0 | } |
136 | 0 | } |
137 | | |
138 | 0 | int64_t KeyIndexSet::GetRandomKey(std::mt19937_64* random_number_generator) const { |
139 | 0 | MutexLock l(mutex_); |
140 | | // The set iterator does not support indexing, so we probabilistically choose a random element |
141 | | // by iterating the set. |
142 | 0 | size_t n = set_.size(); |
143 | 0 | for (int64_t x : set_) { |
144 | 0 | if ((*random_number_generator)() % n == 0) return x; |
145 | 0 | --n; // Decrement the number of remaining elements we are considering. |
146 | 0 | } |
147 | | // This will only happen if the set is empty. |
148 | 0 | return -1; |
149 | 0 | } |
150 | | |
151 | 0 | ostream& operator <<(ostream& out, const KeyIndexSet &key_index_set) { |
152 | 0 | MutexLock l(key_index_set.mutex_); |
153 | 0 | out << "["; |
154 | 0 | bool first = true; |
155 | 0 | for (auto key : key_index_set.set_) { |
156 | 0 | if (!first) { |
157 | 0 | out << ", "; |
158 | 0 | } |
159 | 0 | first = false; |
160 | 0 | out << key; |
161 | 0 | } |
162 | 0 | out << "]"; |
163 | 0 | return out; |
164 | 0 | } |
165 | | |
166 | | // ------------------------------------------------------------------------------------------------ |
167 | | // SessionFactory |
168 | | // ------------------------------------------------------------------------------------------------ |
169 | | |
170 | | YBSessionFactory::YBSessionFactory(client::YBClient* client, client::TableHandle* table) |
171 | 0 | : client_(client), table_(table) {} |
172 | | |
173 | 0 | string YBSessionFactory::ClientId() { return client_->id().ToString(); } |
174 | | |
175 | 0 | SingleThreadedWriter* YBSessionFactory::GetWriter(MultiThreadedWriter* writer, int idx) { |
176 | 0 | return new YBSingleThreadedWriter(writer, client_, table_, idx); |
177 | 0 | } |
178 | | |
179 | 0 | SingleThreadedReader* YBSessionFactory::GetReader(MultiThreadedReader* reader, int idx) { |
180 | 0 | return new YBSingleThreadedReader(reader, client_, table_, idx); |
181 | 0 | } |
182 | | |
183 | 0 | SingleThreadedWriter* NoopSessionFactory::GetWriter(MultiThreadedWriter* writer, int idx) { |
184 | 0 | return new NoopSingleThreadedWriter(writer, client_, table_, idx); |
185 | 0 | } |
186 | | |
187 | | RedisSessionFactory::RedisSessionFactory(const string& redis_server_addresses) |
188 | 0 | : redis_server_addresses_(redis_server_addresses) {} |
189 | | |
190 | 0 | string RedisSessionFactory::ClientId() { return "redis_client"; } |
191 | | |
192 | 0 | SingleThreadedWriter* RedisSessionFactory::GetWriter(MultiThreadedWriter* writer, int idx) { |
193 | 0 | return new RedisSingleThreadedWriter(writer, redis_server_addresses_, idx); |
194 | 0 | } |
195 | | |
196 | 0 | SingleThreadedReader* RedisSessionFactory::GetReader(MultiThreadedReader* reader, int idx) { |
197 | 0 | return new RedisSingleThreadedReader(reader, redis_server_addresses_, idx); |
198 | 0 | } |
199 | | |
200 | 0 | SingleThreadedWriter* RedisNoopSessionFactory::GetWriter(MultiThreadedWriter* writer, int idx) { |
201 | 0 | return new RedisNoopSingleThreadedWriter(writer, redis_server_addresses_, idx); |
202 | 0 | } |
203 | | |
204 | | // ------------------------------------------------------------------------------------------------ |
205 | | // MultiThreadedAction |
206 | | // ------------------------------------------------------------------------------------------------ |
207 | | |
208 | | MultiThreadedAction::MultiThreadedAction( |
209 | | const string& description, int64_t num_keys, int64_t start_key, int num_action_threads, |
210 | | int num_extra_threads, const string& client_id, atomic_bool* stop_requested_flag, |
211 | | int value_size) |
212 | | : description_(description), |
213 | | num_keys_(num_keys), |
214 | | start_key_(start_key), |
215 | | num_action_threads_(num_action_threads), |
216 | | client_id_(client_id), |
217 | | running_threads_latch_(num_action_threads), |
218 | | stop_requested_(stop_requested_flag), |
219 | 0 | value_size_(value_size) { |
220 | 0 | CHECK_OK( |
221 | 0 | ThreadPoolBuilder(description) |
222 | 0 | .set_max_threads(num_action_threads_ + num_extra_threads) |
223 | 0 | .Build(&thread_pool_)); |
224 | 0 | } |
225 | | |
226 | 0 | MultiThreadedAction::~MultiThreadedAction() {} |
227 | | |
228 | 0 | string MultiThreadedAction::GetKeyByIndex(int64_t key_index) { |
229 | 0 | string key_index_str(Substitute("key$0", key_index)); |
230 | 0 | return Substitute( |
231 | 0 | "$0_$1_$2", FormatHexForLoadTestKey(std::hash<string>()(key_index_str)), key_index_str, |
232 | 0 | client_id_); |
233 | 0 | } |
234 | | |
235 | | // Creates a human-readable string with hex characters to be used as a value in our test. This is |
236 | | // deterministic based on key_index. |
237 | | DISABLE_UBSAN |
238 | 0 | string MultiThreadedAction::GetValueByIndex(int64_t key_index) { |
239 | 0 | string value; |
240 | 0 | int64_t x = key_index; |
241 | 0 | for (int i = 0; i < value_size_; ++i) { |
242 | 0 | int val = static_cast<int>(x & 0xf); |
243 | 0 | char c = static_cast<char>(val > 9 ? val - 10 + 'a' : val + '0'); |
244 | 0 | value.push_back(c); |
245 | | // Add pseudo-randomness by using the loop index. |
246 | 0 | x = (x >> 4) * 31 + i; |
247 | 0 | } |
248 | 0 | return value; |
249 | 0 | } |
250 | | |
251 | 0 | void MultiThreadedAction::Start() { |
252 | 0 | LOG(INFO) << "Starting " << num_action_threads_ << " " << description_ << " threads"; |
253 | 0 | CHECK_OK(thread_pool_->SubmitFunc(std::bind(&MultiThreadedAction::RunStatsThread, this))); |
254 | 0 | for (int i = 0; i < num_action_threads_; i++) { |
255 | 0 | CHECK_OK(thread_pool_->SubmitFunc(std::bind(&MultiThreadedAction::RunActionThread, this, i))); |
256 | 0 | } |
257 | 0 | } |
258 | | |
259 | 0 | void MultiThreadedAction::WaitForCompletion() { |
260 | 0 | thread_pool_->Wait(); |
261 | 0 | } |
262 | | |
263 | | // ------------------------------------------------------------------------------------------------ |
264 | | // MultiThreadedWriter |
265 | | // ------------------------------------------------------------------------------------------------ |
266 | | |
267 | | MultiThreadedWriter::MultiThreadedWriter( |
268 | | int64_t num_keys, int64_t start_key, int num_writer_threads, SessionFactory* session_factory, |
269 | | atomic_bool* stop_flag, int value_size, size_t max_num_write_errors) |
270 | | : MultiThreadedAction( |
271 | | "writers", num_keys, start_key, num_writer_threads, 2, session_factory->ClientId(), |
272 | | stop_flag, value_size), |
273 | | session_factory_(session_factory), |
274 | | next_key_(start_key), |
275 | | inserted_up_to_inclusive_(-1), |
276 | 0 | max_num_write_errors_(max_num_write_errors) {} |
277 | | |
278 | 0 | void MultiThreadedWriter::Start() { |
279 | 0 | MultiThreadedAction::Start(); |
280 | 0 | CHECK_OK( |
281 | 0 | thread_pool_->SubmitFunc(std::bind(&MultiThreadedWriter::RunInsertionTrackerThread, this))); |
282 | 0 | } |
283 | | |
284 | 0 | void MultiThreadedWriter::WaitForCompletion() { |
285 | 0 | MultiThreadedAction::WaitForCompletion(); |
286 | 0 | LOG(INFO) << "Inserted up to and including " << inserted_up_to_inclusive_.load(); |
287 | 0 | } |
288 | | |
289 | 0 | void MultiThreadedWriter::RunActionThread(int writer_index) { |
290 | 0 | unique_ptr<SingleThreadedWriter> writer(session_factory_->GetWriter(this, writer_index)); |
291 | 0 | writer->set_pause_flag(pause_flag_); |
292 | 0 | writer->Run(); |
293 | |
|
294 | 0 | LOG(INFO) << "Writer thread " << writer_index << " finished"; |
295 | 0 | running_threads_latch_.CountDown(); |
296 | 0 | } |
297 | | |
298 | 0 | void SingleThreadedWriter::Run() { |
299 | 0 | LOG(INFO) << "Writer thread " << writer_index_ << " started"; |
300 | 0 | ConfigureSession(); |
301 | 0 | while (!multi_threaded_writer_->IsStopRequested()) { |
302 | 0 | if (pause_flag_ && pause_flag_->load(std::memory_order_acquire)) { |
303 | 0 | std::this_thread::sleep_for(10ms); |
304 | 0 | continue; |
305 | 0 | } |
306 | 0 | int64_t key_index = multi_threaded_writer_->next_key_++; |
307 | 0 | if (key_index >= multi_threaded_writer_->num_keys_) { |
308 | 0 | break; |
309 | 0 | } |
310 | | |
311 | 0 | string key_str(multi_threaded_writer_->GetKeyByIndex(key_index)); |
312 | 0 | string value_str(multi_threaded_writer_->GetValueByIndex(key_index)); |
313 | |
|
314 | 0 | if (Write(key_index, key_str, value_str)) { |
315 | 0 | multi_threaded_writer_->inserted_keys_.Insert(key_index); |
316 | 0 | } else { |
317 | 0 | multi_threaded_writer_->failed_keys_.Insert(key_index); |
318 | 0 | HandleInsertionFailure(key_index, key_str); |
319 | 0 | if (multi_threaded_writer_->num_write_errors() > |
320 | 0 | multi_threaded_writer_->max_num_write_errors_) { |
321 | 0 | LOG(ERROR) << "Exceeded the maximum number of write errors " |
322 | 0 | << multi_threaded_writer_->max_num_write_errors_ << ", stopping the test."; |
323 | 0 | multi_threaded_writer_->Stop(); |
324 | 0 | break; |
325 | 0 | } |
326 | 0 | } |
327 | 0 | } |
328 | 0 | CloseSession(); |
329 | 0 | } |
330 | | |
331 | | void ConfigureRedisSessions( |
332 | 0 | const string& redis_server_addresses, vector<shared_ptr<RedisClient> >* clients) { |
333 | 0 | std::vector<string> addresses; |
334 | 0 | SplitStringUsing(redis_server_addresses, ",", &addresses); |
335 | 0 | for (auto& addr : addresses) { |
336 | 0 | auto remote = CHECK_RESULT(ParseEndpoint(addr, 6379)); |
337 | 0 | clients->push_back(std::make_shared<RedisClient>( |
338 | 0 | remote.address().to_string(), remote.port())); |
339 | 0 | } |
340 | 0 | } |
341 | | |
342 | 0 | void RedisSingleThreadedWriter::ConfigureSession() { |
343 | 0 | ConfigureRedisSessions(redis_server_addresses_, &clients_); |
344 | 0 | } |
345 | | |
346 | | bool RedisSingleThreadedWriter::Write( |
347 | 0 | int64_t key_index, const string& key_str, const string& value_str) { |
348 | 0 | bool success = false; |
349 | 0 | auto writer_index = writer_index_; |
350 | 0 | int64_t idx = key_index % clients_.size(); |
351 | 0 | clients_[idx]->Send( |
352 | 0 | {"SET", key_str, value_str}, [&success, key_index, writer_index](const RedisReply& reply) { |
353 | 0 | if ("OK" == reply.as_string()) { |
354 | 0 | VLOG(2) << "Writer " << writer_index << " Successfully inserted key #" << key_index |
355 | 0 | << " into redis "; |
356 | 0 | success = true; |
357 | 0 | } else { |
358 | 0 | VLOG(1) << "Failed Insersion key #" << key_index << reply.as_string(); |
359 | 0 | success = false; |
360 | 0 | } |
361 | 0 | }); |
362 | 0 | clients_[idx]->Commit(); |
363 | |
|
364 | 0 | return success; |
365 | 0 | } |
366 | | |
367 | 0 | void RedisSingleThreadedWriter::HandleInsertionFailure(int64_t key_index, const string& key_str) { |
368 | | // Nothing special to do for Redis failures. |
369 | 0 | } |
370 | | |
371 | 0 | void RedisSingleThreadedWriter::CloseSession() { |
372 | 0 | for (auto client : clients_) { |
373 | 0 | client->Disconnect(); |
374 | 0 | } |
375 | 0 | } |
376 | | |
377 | | bool RedisNoopSingleThreadedWriter::Write( |
378 | 0 | int64_t key_index, const string& key_str, const string& value_str) { |
379 | 0 | bool success = false; |
380 | 0 | auto writer_index = writer_index_; |
381 | 0 | int64_t idx = key_index % clients_.size(); |
382 | 0 | clients_[idx]->Send({"ECHO", "OK"}, [&success, key_index, writer_index](const RedisReply& reply) { |
383 | 0 | if ("OK" == reply.as_string()) { |
384 | 0 | VLOG(2) << "Writer " << writer_index << " Successfully inserted key #" << key_index |
385 | 0 | << " into redis "; |
386 | 0 | success = true; |
387 | 0 | } else { |
388 | 0 | VLOG(1) << "Failed Insersion key #" << key_index << reply.as_string(); |
389 | 0 | success = false; |
390 | 0 | } |
391 | 0 | }); |
392 | 0 | clients_[idx]->Commit(); |
393 | |
|
394 | 0 | return success; |
395 | 0 | } |
396 | | |
397 | 0 | void YBSingleThreadedWriter::ConfigureSession() { |
398 | 0 | session_ = client_->NewSession(); |
399 | 0 | ConfigureYBSession(session_.get()); |
400 | 0 | } |
401 | | |
402 | | bool YBSingleThreadedWriter::Write( |
403 | 0 | int64_t key_index, const string& key_str, const string& value_str) { |
404 | 0 | auto insert = table_->NewInsertOp(); |
405 | | // Generate a Put for key_str, value_str |
406 | 0 | QLAddStringHashValue(insert->mutable_request(), key_str); |
407 | 0 | table_->AddStringColumnValue(insert->mutable_request(), "v", value_str); |
408 | | // submit a the put to apply. |
409 | | // If successful, add to inserted |
410 | 0 | session_->Apply(insert); |
411 | 0 | const auto flush_status = session_->FlushAndGetOpsErrors(); |
412 | 0 | const auto& status = flush_status.status; |
413 | 0 | if (!status.ok()) { |
414 | 0 | for (const auto& error : flush_status.errors) { |
415 | | // It means that key was actually written successfully, but our retry failed because |
416 | | // it was detected as duplicate request. |
417 | 0 | if (error->status().IsAlreadyPresent()) { |
418 | 0 | return true; |
419 | 0 | } |
420 | 0 | LOG(WARNING) << "Error inserting key '" << key_str << "': " << error->status(); |
421 | 0 | } |
422 | | |
423 | 0 | LOG(WARNING) << "Error inserting key '" << key_str << "': " |
424 | 0 | << "Flush() failed (" << status << ")"; |
425 | 0 | return false; |
426 | 0 | } |
427 | 0 | if (insert->response().status() != QLResponsePB::YQL_STATUS_OK) { |
428 | 0 | LOG(WARNING) << "Error inserting key '" << key_str << "': " |
429 | 0 | << insert->response().error_message(); |
430 | 0 | return false; |
431 | 0 | } |
432 | | |
433 | 0 | multi_threaded_writer_->inserted_keys_.Insert(key_index); |
434 | 0 | VLOG(2) << "Successfully inserted key #" << key_index << " at hybrid_time " |
435 | 0 | << client_->GetLatestObservedHybridTime() << " or earlier"; |
436 | |
|
437 | 0 | return true; |
438 | 0 | } |
439 | | |
440 | 0 | void YBSingleThreadedWriter::HandleInsertionFailure(int64_t key_index, const string& key_str) { |
441 | | // Already handled in YBSingleThreadedWriter::Write. |
442 | 0 | } |
443 | | |
444 | 0 | void YBSingleThreadedWriter::CloseSession() { CHECK_OK(session_->Close()); } |
445 | | |
446 | 0 | void MultiThreadedWriter::RunStatsThread() { |
447 | 0 | MicrosecondsInt64 prev_time = GetMonoTimeMicros(); |
448 | 0 | int64_t prev_writes = 0; |
449 | 0 | while (!IsStopRequested() && running_threads_latch_.count() > 0) { |
450 | 0 | running_threads_latch_.WaitFor(MonoDelta::FromSeconds(5)); |
451 | 0 | int64_t num_writes = this->num_writes(); |
452 | 0 | MicrosecondsInt64 current_time = GetMonoTimeMicros(); |
453 | 0 | LOG(INFO) << "Wrote " << num_writes << " rows (" |
454 | 0 | << (num_writes - prev_writes) * 1000000.0 / (current_time - prev_time) |
455 | 0 | << " writes/sec), contiguous insertion point: " << inserted_up_to_inclusive_.load() |
456 | 0 | << ", write errors: " << failed_keys_.NumElements(); |
457 | 0 | prev_writes = num_writes; |
458 | 0 | prev_time = current_time; |
459 | 0 | } |
460 | 0 | } |
461 | | |
462 | 0 | void MultiThreadedWriter::RunInsertionTrackerThread() { |
463 | 0 | LOG(INFO) << "Insertion tracker thread started"; |
464 | 0 | int64_t current_key = 0; // the first key to be inserted |
465 | 0 | while (!IsStopRequested() && running_threads_latch_.count() > 0) { |
466 | 0 | while (failed_keys_.Contains(current_key) || inserted_keys_.RemoveIfContains(current_key)) { |
467 | 0 | VLOG(2) << "Advancing insertion tracker to key #" << current_key; |
468 | 0 | inserted_up_to_inclusive_.store(current_key); |
469 | 0 | current_key++; |
470 | 0 | } |
471 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_load_gen_insertion_tracker_delay_ms)); |
472 | 0 | } |
473 | 0 | LOG(INFO) << "Insertion tracker thread stopped"; |
474 | 0 | } |
475 | | |
476 | | // ------------------------------------------------------------------------------------------------ |
477 | | // SingleThreadedScanner |
478 | | // ------------------------------------------------------------------------------------------------ |
479 | | |
480 | 0 | SingleThreadedScanner::SingleThreadedScanner(client::TableHandle* table) : table_(table) {} |
481 | | |
482 | 0 | int64_t SingleThreadedScanner::CountRows() { |
483 | 0 | auto result = boost::size(client::TableRange(*table_)); |
484 | |
|
485 | 0 | LOG(INFO) << " num read rows = " << result; |
486 | 0 | return result; |
487 | 0 | } |
488 | | |
489 | | // ------------------------------------------------------------------------------------------------ |
490 | | // MultiThreadedReader |
491 | | // ------------------------------------------------------------------------------------------------ |
492 | | |
493 | | MultiThreadedReader::MultiThreadedReader(int64_t num_keys, int num_reader_threads, |
494 | | SessionFactory* session_factory, |
495 | | atomic<int64_t>* insertion_point, |
496 | | const KeyIndexSet* inserted_keys, |
497 | | const KeyIndexSet* failed_keys, atomic_bool* stop_flag, |
498 | | int value_size, int max_num_read_errors, |
499 | | MultiThreadedReaderOptions options) |
500 | | : MultiThreadedAction( |
501 | | "readers", num_keys, 0, num_reader_threads, 1, session_factory->ClientId(), |
502 | | stop_flag, value_size), |
503 | | session_factory_(session_factory), |
504 | | insertion_point_(insertion_point), |
505 | | inserted_keys_(inserted_keys), |
506 | | failed_keys_(failed_keys), |
507 | | num_reads_(0), |
508 | | num_read_errors_(0), |
509 | | max_num_read_errors_(max_num_read_errors), |
510 | 0 | options_(options) {} |
511 | | |
512 | 0 | void MultiThreadedReader::RunActionThread(int reader_index) { |
513 | 0 | unique_ptr<SingleThreadedReader> reader_loop(session_factory_->GetReader(this, reader_index)); |
514 | 0 | reader_loop->Run(); |
515 | |
|
516 | 0 | LOG(INFO) << "Reader thread " << reader_index << " finished"; |
517 | 0 | running_threads_latch_.CountDown(); |
518 | 0 | } |
519 | | |
520 | 0 | void MultiThreadedReader::RunStatsThread() { |
521 | 0 | MicrosecondsInt64 prev_time = GetMonoTimeMicros(); |
522 | 0 | int64_t prev_rows_read = 0; |
523 | 0 | while (!IsStopRequested() && running_threads_latch_.count() > 0) { |
524 | 0 | running_threads_latch_.WaitFor(MonoDelta::FromSeconds(5)); |
525 | 0 | MicrosecondsInt64 current_time = GetMonoTimeMicros(); |
526 | 0 | int64_t num_rows_read = num_reads_.load(); |
527 | 0 | LOG(INFO) << "Read " << num_rows_read << " rows (" |
528 | 0 | << (num_rows_read - prev_rows_read) * 1000000.0 / (current_time - prev_time) |
529 | 0 | << " reads/sec), read errors: " << num_read_errors_.load(); |
530 | 0 | prev_rows_read = num_rows_read; |
531 | 0 | prev_time = current_time; |
532 | 0 | } |
533 | 0 | } |
534 | | |
535 | 0 | void MultiThreadedReader::IncrementReadErrorCount(ReadStatus read_status) { |
536 | 0 | DCHECK(read_status != ReadStatus::kOk); |
537 | |
|
538 | 0 | if (++num_read_errors_ > max_num_read_errors_) { |
539 | 0 | LOG(ERROR) << "Exceeded the maximum number of read errors (" << max_num_read_errors_ << ")!"; |
540 | 0 | read_status_stopped_ = read_status; |
541 | 0 | Stop(); |
542 | 0 | } |
543 | 0 | for (const auto& option_and_status : |
544 | 0 | {std::make_pair(MultiThreadedReaderOption::kStopOnEmptyRead, ReadStatus::kNoRows), |
545 | 0 | std::make_pair(MultiThreadedReaderOption::kStopOnInvalidRead, ReadStatus::kInvalidRead), |
546 | 0 | std::make_pair(MultiThreadedReaderOption::kStopOnExtraRead, ReadStatus::kExtraRows)}) { |
547 | 0 | if (options_.Test(option_and_status.first) && read_status == option_and_status.second) { |
548 | 0 | LOG(ERROR) << "Stopping due to not allowed read status: " << AsString(read_status); |
549 | 0 | read_status_stopped_ = read_status; |
550 | 0 | Stop(); |
551 | 0 | return; |
552 | 0 | } |
553 | 0 | } |
554 | 0 | } |
555 | | |
556 | 0 | void RedisSingleThreadedReader::ConfigureSession() { |
557 | 0 | ConfigureRedisSessions(redis_server_addresses_, &clients_); |
558 | 0 | } |
559 | | |
560 | 0 | void RedisSingleThreadedReader::CloseSession() { |
561 | 0 | for (auto client : clients_) { |
562 | 0 | client->Disconnect(); |
563 | 0 | } |
564 | 0 | } |
565 | | |
566 | 0 | void YBSingleThreadedReader::ConfigureSession() { |
567 | 0 | session_ = client_->NewSession(); |
568 | 0 | ConfigureYBSession(session_.get()); |
569 | 0 | } |
570 | | |
571 | | bool NoopSingleThreadedWriter::Write( |
572 | 0 | int64_t key_index, const string& key_str, const string& value_str) { |
573 | 0 | YBNoOp noop(table_->table()); |
574 | 0 | std::unique_ptr<YBPartialRow> row(table_->schema().NewRow()); |
575 | 0 | CHECK_OK(row->SetBinary("k", key_str)); |
576 | 0 | Status s = noop.Execute(client_, *row); |
577 | 0 | if (s.ok()) { |
578 | 0 | return true; |
579 | 0 | } |
580 | 0 | LOG(ERROR) << "NoOp failed" << s.CodeAsString(); |
581 | 0 | return false; |
582 | 0 | } |
583 | | |
584 | | ReadStatus YBSingleThreadedReader::PerformRead( |
585 | 0 | int64_t key_index, const string& key_str, const string& expected_value_str) { |
586 | 0 | uint64_t read_ts = client_->GetLatestObservedHybridTime(); |
587 | |
|
588 | 0 | for (int i = 1;; ++i) { |
589 | 0 | auto read_op = table_->NewReadOp(); |
590 | 0 | QLAddStringHashValue(read_op->mutable_request(), key_str); |
591 | 0 | table_->AddColumns({"k", "v"}, read_op->mutable_request()); |
592 | 0 | auto status = session_->ApplyAndFlush(read_op); |
593 | 0 | boost::optional<QLRowBlock> row_block; |
594 | 0 | if (status.ok()) { |
595 | 0 | auto result = read_op->MakeRowBlock(); |
596 | 0 | if (!result.ok()) { |
597 | 0 | status = std::move(result.status()); |
598 | 0 | } else { |
599 | 0 | row_block = std::move(*result); |
600 | 0 | } |
601 | 0 | } |
602 | 0 | if (!status.ok()) { |
603 | 0 | LOG(ERROR) << "Failed to read: " << status << ", re-trying."; |
604 | 0 | if (i >= FLAGS_load_gen_scanner_open_retries) { |
605 | 0 | CHECK_OK(status); |
606 | 0 | } |
607 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_load_gen_wait_time_increment_step_ms * i)); |
608 | 0 | continue; |
609 | 0 | } |
610 | 0 | if (row_block->row_count() == 0) { |
611 | 0 | LOG(ERROR) << "No rows found for key #" << key_index |
612 | 0 | << " (read hybrid_time: " << read_ts << ")"; |
613 | 0 | return ReadStatus::kNoRows; |
614 | 0 | } |
615 | 0 | if (row_block->row_count() != 1) { |
616 | 0 | LOG(ERROR) << "Found an invalid number of rows for key #" << key_index << ": " |
617 | 0 | << row_block->row_count() << " (expected to find 1 row), read hybrid_time: " |
618 | 0 | << read_ts; |
619 | 0 | multi_threaded_reader_->IncrementReadErrorCount(ReadStatus::kOtherError); |
620 | 0 | return ReadStatus::kOtherError; |
621 | 0 | } |
622 | 0 | auto row = row_block->rows()[0]; |
623 | 0 | if (row.column(0).binary_value() != key_str) { |
624 | 0 | LOG(ERROR) << "Invalid key returned by the read operation: '" << row.column(0).binary_value() |
625 | 0 | << "', expected: '" << key_str << "', read hybrid_time: " << read_ts; |
626 | 0 | multi_threaded_reader_->IncrementReadErrorCount(ReadStatus::kOtherError); |
627 | 0 | return ReadStatus::kOtherError; |
628 | 0 | } |
629 | 0 | auto returned_value = row.column(1).binary_value(); |
630 | 0 | if (returned_value != expected_value_str) { |
631 | 0 | LOG(ERROR) << "Invalid value returned by the read operation for key '" << key_str |
632 | 0 | << "': " << FormatWithSize(returned_value) |
633 | 0 | << ", expected: " << FormatWithSize(expected_value_str) |
634 | 0 | << ", read hybrid_time: " << read_ts; |
635 | 0 | multi_threaded_reader_->IncrementReadErrorCount(ReadStatus::kOtherError); |
636 | 0 | return ReadStatus::kOtherError; |
637 | 0 | } |
638 | 0 | break; |
639 | 0 | } |
640 | | |
641 | 0 | return ReadStatus::kOk; |
642 | 0 | } |
643 | | |
644 | 0 | void YBSingleThreadedReader::CloseSession() { CHECK_OK(session_->Close()); } |
645 | | |
646 | | ReadStatus RedisSingleThreadedReader::PerformRead( |
647 | 0 | int64_t key_index, const string& key_str, const string& expected_value_str) { |
648 | 0 | string value_str; |
649 | 0 | int64_t idx = key_index % clients_.size(); |
650 | 0 | clients_[idx]->Send({"GET", key_str}, [&value_str](const RedisReply& reply) { |
651 | 0 | value_str = reply.as_string(); |
652 | 0 | }); |
653 | 0 | VLOG(3) << "Trying to read key #" << key_index << " from redis " |
654 | 0 | << " key : " << key_str; |
655 | 0 | clients_[idx]->Commit(); |
656 | |
|
657 | 0 | if (expected_value_str != value_str) { |
658 | 0 | LOG(INFO) << "Read the wrong value for #" << key_index << " from redis " |
659 | 0 | << " key : " << key_str << " value : " << value_str |
660 | 0 | << " expected : " << expected_value_str; |
661 | 0 | return ReadStatus::kOtherError; |
662 | 0 | } |
663 | | |
664 | 0 | VLOG(2) << "Reader " << reader_index_ << " Successfully read key #" << key_index << " from redis " |
665 | 0 | << " key : " << key_str << " value : " << value_str; |
666 | 0 | return ReadStatus::kOk; |
667 | 0 | } |
668 | | |
669 | 0 | void SingleThreadedReader::Run() { |
670 | 0 | std::mt19937_64 random_number_generator(reader_index_); |
671 | |
|
672 | 0 | LOG(INFO) << "Reader thread " << reader_index_ << " started"; |
673 | 0 | ConfigureSession(); |
674 | | |
675 | | // Wait until at least one row has been inserted (keys are numbered starting from 1). |
676 | 0 | while (!multi_threaded_reader_->IsStopRequested() && |
677 | 0 | multi_threaded_reader_->insertion_point_->load() < 0) { |
678 | 0 | VLOG(1) << "Reader thread " << reader_index_ << " Sleeping until load() >= 0"; |
679 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
680 | 0 | } |
681 | |
|
682 | 0 | while (!multi_threaded_reader_->IsStopRequested()) { |
683 | 0 | const int64_t key_index = NextKeyIndexToRead(&random_number_generator); |
684 | |
|
685 | 0 | ++multi_threaded_reader_->num_reads_; |
686 | 0 | const string key_str(multi_threaded_reader_->GetKeyByIndex(key_index)); |
687 | 0 | const string expected_value_str(multi_threaded_reader_->GetValueByIndex(key_index)); |
688 | 0 | const ReadStatus read_status = PerformRead(key_index, key_str, expected_value_str); |
689 | | |
690 | | // Read operation returning zero rows is treated as a read error. |
691 | | // See: https://yugabyte.atlassian.net/browse/ENG-1272 |
692 | 0 | if (read_status == ReadStatus::kNoRows) { |
693 | 0 | multi_threaded_reader_->IncrementReadErrorCount(read_status); |
694 | 0 | } |
695 | 0 | } |
696 | |
|
697 | 0 | CloseSession(); |
698 | 0 | } |
699 | | |
700 | 0 | int64_t SingleThreadedReader::NextKeyIndexToRead(std::mt19937_64* random_number_generator) const { |
701 | 0 | int64_t key_index = 0; |
702 | 0 | VLOG(3) << "Reader thread " << reader_index_ << " waiting to load insertion point"; |
703 | 0 | int64_t written_up_to = multi_threaded_reader_->insertion_point_->load(); |
704 | 0 | do { |
705 | 0 | VLOG(3) << "Reader thread " << reader_index_ << " coin toss"; |
706 | 0 | switch ((*random_number_generator)() % 3) { |
707 | 0 | case 0: |
708 | | // Read the latest value that the insertion tracker knows we've written up to. |
709 | 0 | key_index = written_up_to; |
710 | 0 | break; |
711 | 0 | case 1: |
712 | | // Read one of the keys that have been successfully inserted but have not been processed |
713 | | // by the insertion tracker thread yet. |
714 | 0 | key_index = |
715 | 0 | multi_threaded_reader_->inserted_keys_->GetRandomKey(random_number_generator); |
716 | 0 | if (key_index == -1) { |
717 | | // The set is empty. |
718 | 0 | key_index = written_up_to; |
719 | 0 | } |
720 | 0 | break; |
721 | | |
722 | 0 | default: |
723 | | // We're assuming the total number of keys is < RAND_MAX (~2 billion) here. |
724 | 0 | key_index = (*random_number_generator)() % (written_up_to + 1); |
725 | 0 | break; |
726 | 0 | } |
727 | | // Ensure we don't try to read a key for which a write failed. |
728 | 0 | } while (multi_threaded_reader_->failed_keys_->Contains(key_index) && |
729 | 0 | !multi_threaded_reader_->IsStopRequested()); |
730 | | |
731 | 0 | VLOG(1) << "Reader thread " << reader_index_ << " saw written_up_to=" << written_up_to |
732 | 0 | << " and picked key #" << key_index; |
733 | 0 | return key_index; |
734 | 0 | } |
735 | | |
736 | | } // namespace load_generator |
737 | | } // namespace yb |