/Users/deen/code/yugabyte-db/src/yb/consensus/mt-log-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <algorithm> |
34 | | #include <mutex> |
35 | | #include <vector> |
36 | | |
37 | | #include "yb/consensus/log-test-base.h" |
38 | | #include "yb/consensus/log_index.h" |
39 | | |
40 | | #include "yb/gutil/algorithm.h" |
41 | | #include "yb/gutil/ref_counted.h" |
42 | | #include "yb/gutil/strings/substitute.h" |
43 | | |
44 | | #include "yb/util/locks.h" |
45 | | #include "yb/util/metrics.h" |
46 | | #include "yb/util/random.h" |
47 | | #include "yb/util/status_log.h" |
48 | | #include "yb/util/stopwatch.h" |
49 | | #include "yb/util/thread.h" |
50 | | |
51 | | // TODO: Semantics of the Log and Appender thread interactions changed and now multi-threaded |
52 | | // writing is no longer allowed, or to be more precise, does no longer guarantee the ordering of |
53 | | // events being written, across threads. |
54 | | DEFINE_int32(num_writer_threads, 1, "Number of threads writing to the log"); |
55 | | DEFINE_int32(num_batches_per_thread, 2000, "Number of batches per thread"); |
56 | | DEFINE_int32(num_ops_per_batch_avg, 5, "Target average number of ops per batch"); |
57 | | |
58 | | namespace yb { |
59 | | namespace log { |
60 | | |
61 | | using std::vector; |
62 | | using consensus::ReplicateMsgPtr; |
63 | | |
64 | | namespace { |
65 | | |
66 | | class CustomLatchCallback : public RefCountedThreadSafe<CustomLatchCallback> { |
67 | | public: |
68 | | CustomLatchCallback(CountDownLatch* latch, vector<Status>* errors) |
69 | | : latch_(latch), |
70 | 2.00k | errors_(errors) { |
71 | 2.00k | } |
72 | | |
73 | 2.00k | void StatusCB(const Status& s) { |
74 | 2.00k | if (!s.ok()) { |
75 | 0 | errors_->push_back(s); |
76 | 0 | } |
77 | 2.00k | latch_->CountDown(); |
78 | 2.00k | } |
79 | | |
80 | 2.00k | StatusCallback AsStatusCallback() { |
81 | 2.00k | return Bind(&CustomLatchCallback::StatusCB, this); |
82 | 2.00k | } |
83 | | |
84 | | private: |
85 | | CountDownLatch* latch_; |
86 | | vector<Status>* errors_; |
87 | | }; |
88 | | |
89 | | } // anonymous namespace |
90 | | |
91 | | extern const char *kTestTablet; |
92 | | |
93 | | class MultiThreadedLogTest : public LogTestBase { |
94 | | public: |
95 | | MultiThreadedLogTest() |
96 | 1 | : random_(SeedRandom()) { |
97 | 1 | } |
98 | | |
99 | 1 | void SetUp() override { |
100 | 1 | LogTestBase::SetUp(); |
101 | 1 | } |
102 | | |
103 | 1 | void LogWriterThread(int thread_id) { |
104 | 1 | CountDownLatch latch(FLAGS_num_batches_per_thread); |
105 | 1 | vector<Status> errors; |
106 | 2.00k | for (int i = 0; i < FLAGS_num_batches_per_thread; i++) { |
107 | 2.00k | LogEntryBatch* entry_batch; |
108 | 2.00k | ReplicateMsgs batch_replicates; |
109 | 2.00k | int num_ops = static_cast<int>(random_.Normal( |
110 | 2.00k | static_cast<double>(FLAGS_num_ops_per_batch_avg), 1.0)); |
111 | 0 | DVLOG(1) << num_ops << " ops in this batch"; |
112 | 2.00k | num_ops = std::max(num_ops, 1); |
113 | 2.00k | { |
114 | 2.00k | std::lock_guard<simple_spinlock> lock_guard(lock_); |
115 | 10.9k | for (int j = 0; j < num_ops; j++) { |
116 | 8.93k | auto replicate = std::make_shared<ReplicateMsg>(); |
117 | 8.93k | auto index = current_index_++; |
118 | 8.93k | OpIdPB* op_id = replicate->mutable_id(); |
119 | 8.93k | op_id->set_term(0); |
120 | 8.93k | op_id->set_index(index); |
121 | | |
122 | 8.93k | replicate->set_op_type(WRITE_OP); |
123 | 8.93k | replicate->set_hybrid_time(clock_->Now().ToUint64()); |
124 | | |
125 | 8.93k | tserver::WriteRequestPB request; |
126 | 8.93k | AddTestRowInsert(narrow_cast<int32_t>(index), 0, "this is a test insert", &request); |
127 | 8.93k | request.set_tablet_id(kTestTablet); |
128 | 8.93k | batch_replicates.push_back(replicate); |
129 | 8.93k | } |
130 | | |
131 | 2.00k | auto entry_batch_pb = CreateBatchFromAllocatedOperations(batch_replicates); |
132 | | |
133 | 2.00k | log_->Reserve(REPLICATE, &entry_batch_pb, &entry_batch); |
134 | 2.00k | } // lock_guard scope |
135 | 2.00k | auto cb = new CustomLatchCallback(&latch, &errors); |
136 | 2.00k | ASSERT_OK(log_->TEST_AsyncAppendWithReplicates( |
137 | 2.00k | entry_batch, batch_replicates, cb->AsStatusCallback())); |
138 | 2.00k | } |
139 | 1 | LOG_TIMING(INFO, strings::Substitute("thread $0 waiting to append and sync $1 batches", |
140 | 1 | thread_id, FLAGS_num_batches_per_thread)) { |
141 | 1 | latch.Wait(); |
142 | 1 | } |
143 | 0 | for (const Status& status : errors) { |
144 | 0 | WARN_NOT_OK(status, "Unexpected failure during AsyncAppend"); |
145 | 0 | } |
146 | 1 | ASSERT_EQ(0, errors.size()); |
147 | 1 | } |
148 | | |
149 | 1 | void Run() { |
150 | 2 | for (int i = 0; i < FLAGS_num_writer_threads; i++) { |
151 | 1 | scoped_refptr<yb::Thread> new_thread; |
152 | 1 | CHECK_OK(yb::Thread::Create("test", "inserter", |
153 | 1 | &MultiThreadedLogTest::LogWriterThread, this, i, &new_thread)); |
154 | 1 | threads_.push_back(new_thread); |
155 | 1 | } |
156 | 1 | for (scoped_refptr<yb::Thread>& thread : threads_) { |
157 | 1 | ASSERT_OK(ThreadJoiner(thread.get()).Join()); |
158 | 1 | } |
159 | 1 | } |
160 | | private: |
161 | | ThreadSafeRandom random_; |
162 | | simple_spinlock lock_; |
163 | | vector<scoped_refptr<yb::Thread> > threads_; |
164 | | }; |
165 | | |
166 | 1 | TEST_F(MultiThreadedLogTest, TestAppends) { |
167 | 1 | BuildLog(); |
168 | 1 | auto start_current_id = current_index_; |
169 | 1 | LOG_TIMING(INFO, strings::Substitute("inserting $0 batches($1 threads, $2 per-thread)", |
170 | 1 | FLAGS_num_writer_threads * FLAGS_num_batches_per_thread, |
171 | 1 | FLAGS_num_batches_per_thread, FLAGS_num_writer_threads)) { |
172 | 1 | ASSERT_NO_FATALS(Run()); |
173 | 1 | } |
174 | 1 | ASSERT_OK(log_->Close()); |
175 | | |
176 | 1 | std::unique_ptr<LogReader> reader; |
177 | 1 | ASSERT_OK(LogReader::Open(fs_manager_->env(), nullptr, "Log reader: ", |
178 | 1 | fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet), |
179 | 1 | nullptr, nullptr, &reader)); |
180 | 1 | SegmentSequence segments; |
181 | 1 | ASSERT_OK(reader->GetSegmentsSnapshot(&segments)); |
182 | | |
183 | 1 | std::vector<int64_t> ids; |
184 | 1 | for (const SegmentSequence::value_type& entry : segments) { |
185 | 1 | auto read_entries = entry->ReadEntries(); |
186 | 1 | ASSERT_OK(read_entries.status); |
187 | 8.93k | for (const auto& entry : read_entries.entries) { |
188 | 8.93k | if (entry->type() == REPLICATE) { |
189 | 8.93k | ids.push_back(entry->replicate().id().index()); |
190 | 8.93k | } |
191 | 8.93k | } |
192 | 1 | } |
193 | 0 | DVLOG(1) << "Wrote total of " << current_index_ - start_current_id << " ops"; |
194 | 1 | ASSERT_EQ(current_index_ - start_current_id, ids.size()); |
195 | 1 | ASSERT_TRUE(std::is_sorted(ids.begin(), ids.end())); |
196 | 1 | } |
197 | | |
198 | | } // namespace log |
199 | | } // namespace yb |