/Users/deen/code/yugabyte-db/src/yb/integration-tests/kv_table_ts_failover_write_if-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <atomic> |
15 | | #include <memory> |
16 | | #include <vector> |
17 | | |
18 | | #include "yb/client/client-test-util.h" |
19 | | #include "yb/client/client.h" |
20 | | #include "yb/client/session.h" |
21 | | #include "yb/client/table_creator.h" |
22 | | #include "yb/client/yb_op.h" |
23 | | |
24 | | #include "yb/common/ql_value.h" |
25 | | |
26 | | #include "yb/integration-tests/cluster_itest_util.h" |
27 | | #include "yb/integration-tests/cluster_verifier.h" |
28 | | #include "yb/integration-tests/external_mini_cluster.h" |
29 | | #include "yb/integration-tests/yb_table_test_base.h" |
30 | | |
31 | | #include "yb/util/format.h" |
32 | | #include "yb/util/status_format.h" |
33 | | #include "yb/util/test_util.h" |
34 | | |
35 | | #include "yb/yql/cql/ql/util/statement_result.h" |
36 | | |
37 | | DECLARE_bool(TEST_combine_batcher_errors); |
38 | | |
39 | | namespace yb { |
40 | | |
41 | | using client::YBSessionPtr; |
42 | | using client::YBSchemaBuilder; |
43 | | using client::YBqlReadOp; |
44 | | using client::YBqlWriteOp; |
45 | | using itest::TServerDetails; |
46 | | using std::shared_ptr; |
47 | | using std::unique_ptr; |
48 | | |
49 | | using namespace std::literals; |
50 | | |
51 | | namespace { |
52 | | |
53 | | const auto kTestCommandsTimeOut = 30s; |
54 | | const auto kHeartBeatInterval = 1s; |
55 | | const auto kLeaderFailureMaxMissedHeartbeatPeriods = 3; |
56 | | const auto kKeyColumnName = "k"; |
57 | | const auto kValueColumnName = "v"; |
58 | | |
59 | 16 | std::string TsNameForIndex(size_t idx) { |
60 | 16 | return Format("ts-$0", idx + 1); |
61 | 16 | } |
62 | | |
63 | | } // namespace |
64 | | |
65 | | class KVTableTsFailoverWriteIfTest : public integration_tests::YBTableTestBase { |
66 | | public: |
67 | 31 | bool use_external_mini_cluster() override { return true; } |
68 | | |
69 | 2 | int num_tablets() override { return 1; } |
70 | | |
71 | 1 | bool enable_ysql() override { return false; } |
72 | | |
73 | 1 | void CustomizeExternalMiniCluster(ExternalMiniClusterOptions* opts) override { |
74 | 1 | opts->extra_tserver_flags.push_back("--raft_heartbeat_interval_ms=" + |
75 | 1 | yb::ToString(ToMilliseconds(kHeartBeatInterval))); |
76 | 1 | opts->extra_tserver_flags.push_back("--leader_failure_max_missed_heartbeat_periods=" + |
77 | 1 | yb::ToString(kLeaderFailureMaxMissedHeartbeatPeriods)); |
78 | 1 | } |
79 | | |
80 | 1 | void SetUp() override { |
81 | 1 | YBTableTestBase::SetUp(); |
82 | 1 | ts_map_ = ASSERT_RESULT(itest::CreateTabletServerMap(external_mini_cluster())); |
83 | 1 | ts_details_.clear(); |
84 | 4 | for (size_t i = 0; i < external_mini_cluster()->num_tablet_servers(); ++i) { |
85 | 3 | std::string ts_id = external_mini_cluster()->tablet_server(i)->uuid(); |
86 | 3 | LOG(INFO) << TsNameForIndex(i) << ": " << ts_id; |
87 | 3 | TServerDetails* ts = ts_map_[ts_id].get(); |
88 | 3 | ASSERT_EQ(ts->uuid(), ts_id); |
89 | 3 | ts_details_.push_back(ts); |
90 | 3 | } |
91 | 1 | } |
92 | | |
93 | 2 | void SetValueAsync(const YBSessionPtr& session, int32_t key, int32_t value) { |
94 | 2 | const auto insert = table_.NewInsertOp(); |
95 | 2 | auto* const req = insert->mutable_request(); |
96 | 2 | QLAddInt32HashValue(req, key); |
97 | 2 | table_.AddInt32ColumnValue(req, kValueColumnName, value); |
98 | 2 | string op_str = Format("$0: $1", key, value); |
99 | 2 | LOG(INFO) << "Sending write: " << op_str; |
100 | 2 | session->Apply(insert); |
101 | 2 | session->FlushAsync([insert, op_str](client::FlushStatus* flush_status) { |
102 | 2 | const auto& s = flush_status->status; |
103 | 4 | ASSERT_TRUE(s.ok() || s.IsAlreadyPresent()) |
104 | 4 | << "Failed to flush write " << op_str << ". Error: " << s; |
105 | 2 | if (s.ok()) { |
106 | 4 | ASSERT_EQ(insert->response().status(), QLResponsePB::YQL_STATUS_OK) |
107 | 4 | << "Failed to write " << op_str; |
108 | 2 | } |
109 | 2 | LOG(INFO) << "Written: " << op_str; |
110 | 2 | }); |
111 | 2 | } |
112 | | |
113 | 1 | shared_ptr<YBqlWriteOp> CreateWriteIfOp(int32_t key, int32_t old_value, int32_t new_value) { |
114 | 1 | const auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_UPDATE); |
115 | 1 | auto* const req = op->mutable_request(); |
116 | | // Set v = new_value. |
117 | 1 | table_.AddInt32ColumnValue(req, kValueColumnName, new_value); |
118 | | // If v = old_value. |
119 | 1 | table_.SetInt32Condition( |
120 | 1 | req->mutable_if_expr()->mutable_condition(), kValueColumnName, QL_OP_EQUAL, old_value); |
121 | 1 | req->mutable_column_refs()->add_ids(table_.ColumnId(kValueColumnName)); |
122 | | // And k = key. |
123 | 1 | QLAddInt32HashValue(req, key); |
124 | 1 | return op; |
125 | 1 | } |
126 | | |
127 | 63 | boost::optional<int32_t> GetValue(const YBSessionPtr& session, int32_t key) { |
128 | 63 | const auto op = client::CreateReadOp(key, table_, kValueColumnName); |
129 | 63 | Status s = session->ApplyAndFlush(op); |
130 | 63 | if (!s.ok()) { |
131 | 0 | return boost::none; |
132 | 0 | } |
133 | 63 | auto rowblock = ql::RowsResult(op.get()).GetRowBlock(); |
134 | 63 | EXPECT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK); |
135 | 63 | if (rowblock->row_count() == 0) { |
136 | 0 | return boost::none; |
137 | 0 | } |
138 | 63 | EXPECT_EQ(1, rowblock->row_count()); |
139 | 63 | return rowblock->row(0).column(0).int32_value(); |
140 | 63 | } |
141 | | |
142 | 1 | void CreateTable() override { |
143 | 1 | if (!table_exists_) { |
144 | 1 | const auto table = table_name(); |
145 | 1 | ASSERT_OK(client_->CreateNamespaceIfNotExists(table.namespace_name(), |
146 | 1 | table.namespace_type())); |
147 | | |
148 | 1 | YBSchemaBuilder b; |
149 | 1 | b.AddColumn(kKeyColumnName)->Type(INT32)->NotNull()->HashPrimaryKey(); |
150 | 1 | b.AddColumn(kValueColumnName)->Type(INT32)->NotNull(); |
151 | 1 | ASSERT_OK(b.Build(&schema_)); |
152 | | |
153 | 1 | ASSERT_OK(NewTableCreator()->table_name(table_name()).schema(&schema_).Create()); |
154 | 1 | table_exists_ = true; |
155 | 1 | } |
156 | 1 | } |
157 | | |
158 | 1 | Result<int> GetTabletServerLeaderIndex(const std::string& tablet_id) { |
159 | 1 | TServerDetails* leader_ts_details; |
160 | 1 | Status s = FindTabletLeader(ts_map_, tablet_id, kTestCommandsTimeOut, &leader_ts_details); |
161 | 1 | RETURN_NOT_OK(s); |
162 | 1 | const int idx = external_mini_cluster()->tablet_server_index_by_uuid(leader_ts_details->uuid()); |
163 | 1 | if (idx < 0) { |
164 | 0 | return STATUS_FORMAT( |
165 | 0 | IllegalState, "Not found tablet server with uuid: $0", leader_ts_details->uuid()); |
166 | 0 | } |
167 | 1 | LOG(INFO) << "Tablet server leader detected: " << TsNameForIndex(idx); |
168 | 1 | return idx; |
169 | 1 | } |
170 | | |
171 | 1 | Result<int> GetTabletServerRaftLeaderIndex(const std::string& tablet_id) { |
172 | 1 | const MonoDelta kMinTimeout = 1s; |
173 | 1 | MonoTime start = MonoTime::Now(); |
174 | 1 | MonoTime deadline = start + 60s; |
175 | 1 | Status s; |
176 | 1 | int i = 0; |
177 | 4 | while (true) { |
178 | 4 | MonoDelta remaining_timeout = deadline - MonoTime::Now(); |
179 | 4 | TServerDetails* ts = ts_details_[i]; |
180 | 4 | consensus::ConsensusStatePB cstate; |
181 | 4 | s = itest::GetConsensusState(ts, |
182 | 4 | tablet_id, |
183 | 4 | consensus::ConsensusConfigType::CONSENSUS_CONFIG_ACTIVE, |
184 | 4 | std::min(remaining_timeout, kMinTimeout), |
185 | 4 | &cstate); |
186 | 4 | if (s.ok() && cstate.has_leader_uuid() && cstate.leader_uuid() == ts->uuid()) { |
187 | 1 | LOG(INFO) << "Tablet server RAFT leader detected: " << TsNameForIndex(i); |
188 | 1 | return i; |
189 | 1 | } |
190 | 3 | if (MonoTime::Now() > deadline) { |
191 | 0 | break; |
192 | 0 | } |
193 | 3 | SleepFor(100ms); |
194 | 3 | i = (i + 1) % ts_details_.size(); |
195 | 3 | } |
196 | 0 | return STATUS(NotFound, "No tablet server RAFT leader detected"); |
197 | 1 | } |
198 | | |
199 | 8 | void SetBoolFlag(size_t ts_idx, const std::string& flag, bool value) { |
200 | 8 | auto ts = external_mini_cluster()->tablet_server(ts_idx); |
201 | 8 | LOG(INFO) << "Setting " << flag << " to " << value << " on " << TsNameForIndex(ts_idx); |
202 | 8 | ASSERT_OK(external_mini_cluster()->SetFlag(ts, flag, yb::ToString(value))); |
203 | 8 | } |
204 | | |
205 | | itest::TabletServerMap ts_map_; |
206 | | |
207 | | // TServerDetails instances referred by ts_details_ are owned by ts_map_. |
208 | | vector<TServerDetails*> ts_details_; |
209 | | }; |
210 | | |
211 | | // Test for ENG-3471 - shouldn't run write-if when leader hasn't yet committed all pendings ops. |
212 | 1 | TEST_F(KVTableTsFailoverWriteIfTest, KillTabletServerDuringReplication) { |
213 | 1 | FLAGS_TEST_combine_batcher_errors = true; |
214 | | |
215 | 1 | const int32_t key = 0; |
216 | 1 | const int32_t initial_value = 10000; |
217 | 1 | const auto small_delay = 100ms; |
218 | | |
219 | 1 | const auto cluster = external_mini_cluster(); |
220 | 1 | const auto num_ts = cluster->num_tablet_servers(); |
221 | | |
222 | 1 | vector<string> tablet_ids; |
223 | 1 | do { |
224 | 1 | ASSERT_OK(itest::ListRunningTabletIds( |
225 | 1 | ts_map_.begin()->second.get(), kTestCommandsTimeOut, &tablet_ids)); |
226 | 1 | } while (tablet_ids.empty()); |
227 | 1 | const auto tablet_id = tablet_ids[0]; |
228 | | |
229 | 1 | const auto old_leader_ts_idx = ASSERT_RESULT(GetTabletServerLeaderIndex(tablet_id)); |
230 | 1 | auto* const old_leader_ts = cluster->tablet_server(old_leader_ts_idx); |
231 | | |
232 | | // Select replica to be a new leader. |
233 | 1 | const auto new_leader_ts_idx = (old_leader_ts_idx + 1) % num_ts; |
234 | 1 | const auto new_leader_name = TsNameForIndex(new_leader_ts_idx); |
235 | 1 | auto* const new_leader_ts = cluster->tablet_server(new_leader_ts_idx); |
236 | | |
237 | | // Select replica to be follower all the time. |
238 | 1 | const auto follower_replica_ts_idx = (new_leader_ts_idx + 1) % num_ts; |
239 | | |
240 | 1 | SetValueAsync(session_, key, initial_value); |
241 | | |
242 | | // Run concurrent reads. |
243 | 1 | std::atomic<bool> stop_requested(false); |
244 | 1 | std::atomic<int32_t> last_read_value(0); |
245 | 1 | std::thread reader([this, &stop_requested, &small_delay, &last_read_value] { |
246 | 1 | auto session = NewSession(); |
247 | 1 | MonoTime log_deadline(MonoTime::Min()); |
248 | | |
249 | 64 | while (!stop_requested) { |
250 | 63 | SleepFor(small_delay); |
251 | 63 | auto val = GetValue(session, key); |
252 | 63 | if (!val) { |
253 | 0 | continue; |
254 | 0 | } |
255 | 63 | const auto last_value = last_read_value.exchange(*val); |
256 | 63 | if (*val != last_value || MonoTime::Now() > log_deadline) { |
257 | 8 | LOG(INFO) << "Read value: " << key << ": " << *val; |
258 | 8 | log_deadline = MonoTime::Now() + 1s; |
259 | 8 | } |
260 | 63 | ASSERT_GE(*val, last_value); |
261 | 63 | } |
262 | 1 | }); |
263 | | |
264 | | // Make sure we read initial value. |
265 | 1 | ASSERT_OK(LoggedWaitFor( |
266 | 1 | [&last_read_value]{ return last_read_value == initial_value; }, 60s, |
267 | 1 | "Waiting to read initial value...", small_delay)); |
268 | | |
269 | | // Prevent follower_replica_ts_idx from being elected as a new leader. |
270 | 1 | SetBoolFlag(follower_replica_ts_idx, "TEST_follower_reject_update_consensus_requests", true); |
271 | | |
272 | 1 | { |
273 | 1 | LogWaiter log_waiter(new_leader_ts, "Pausing due to flag TEST_pause_update_replica"); |
274 | | |
275 | | // Pause Consensus Update RPC processing on new_leader_ts to delay initial_value + 2 replication |
276 | | // till new_leader_ts is elected as a new leader. |
277 | 1 | SetBoolFlag(new_leader_ts_idx, "TEST_pause_update_replica", true); |
278 | | |
279 | | // Send write initial_value + 2, it won't be fully replicated until we resume UpdateReplica and |
280 | | // UpdateMajorityReplicated. |
281 | 1 | SetValueAsync(session_, key, initial_value + 2); |
282 | | |
283 | 1 | LOG(INFO) << Format("Waiting for Consensus Update RPC to arrive $0...", new_leader_name); |
284 | 1 | ASSERT_OK(log_waiter.WaitFor(60s)); |
285 | 1 | } |
286 | | |
287 | 1 | { |
288 | 1 | LogWaiter log_waiter(new_leader_ts, "Starting NORMAL_ELECTION..."); |
289 | | |
290 | | // Pausing leader and waiting, so replicas will detect leader failure. |
291 | 1 | LOG(INFO) << "Pausing " << TsNameForIndex(old_leader_ts_idx); |
292 | 1 | ASSERT_OK(old_leader_ts->Pause()); |
293 | | |
294 | 1 | LOG(INFO) << Format("Waiting for election to start on $0...", new_leader_name); |
295 | 1 | ASSERT_OK(log_waiter.WaitFor(60s)); |
296 | 1 | } |
297 | | |
298 | 1 | { |
299 | 1 | LogWaiter log_waiter( |
300 | 1 | new_leader_ts, "Pausing due to flag TEST_pause_update_majority_replicated"); |
301 | | |
302 | | // Pause applying write ops on new_leader_ts_idx, so initial_value + 2 won't be applied to DocDB |
303 | | // yet after going to RAFT log. |
304 | 1 | SetBoolFlag(new_leader_ts_idx, "TEST_pause_update_majority_replicated", true); |
305 | | |
306 | | // Resume UpdateReplica on new_leader_ts_idx, so it can: |
307 | | // 1 - Trigger election (RaftConsensus::ReportFailureDetectedTask is waiting on lock inside |
308 | | // LOG_WITH_PREFIX). |
309 | | // 2 - Append initial_value + 2 to RAFT log. |
310 | 1 | SetBoolFlag(new_leader_ts_idx, "TEST_pause_update_replica", false); |
311 | | |
312 | | // new_leader_ts_idx will become leader, but might not be ready to serve due to pending write |
313 | | // ops to apply. |
314 | 1 | { |
315 | 1 | const auto leader_idx = ASSERT_RESULT(GetTabletServerRaftLeaderIndex(tablet_id)); |
316 | 1 | ASSERT_EQ(leader_idx, new_leader_ts_idx); |
317 | 1 | } |
318 | | |
319 | | // We need follower_replica_ts_idx to be able to process Consensus Update RPC from new leader. |
320 | 1 | SetBoolFlag(follower_replica_ts_idx, "TEST_follower_reject_update_consensus_requests", false); |
321 | | |
322 | 1 | LOG(INFO) << Format( |
323 | 1 | "Waiting for $0 to append to RAFT log on $1...", initial_value + 2, new_leader_name); |
324 | 1 | ASSERT_OK(log_waiter.WaitFor(60s)); |
325 | 1 | } |
326 | | |
327 | | // Make following CAS to pause after doing read and evaluating if part. |
328 | 1 | SetBoolFlag(new_leader_ts_idx, "TEST_pause_write_apply_after_if", true); |
329 | | |
330 | | // Send CAS initial_value -> initial_value + 1. |
331 | 1 | std::atomic<bool> cas_completed(false); |
332 | 1 | const auto cas_old_value = initial_value; |
333 | 1 | const auto cas_new_value = initial_value + 1; |
334 | 1 | const auto op = CreateWriteIfOp(key, cas_old_value, cas_new_value); |
335 | 1 | const auto op_str = Format("$0: $1 -> $2", key, cas_old_value, cas_new_value); |
336 | 1 | LOG(INFO) << "Sending CAS " << op_str; |
337 | 1 | auto session = NewSession(); |
338 | 1 | session->SetTimeout(15s); |
339 | 1 | client::FlushCallback callback = [&session, &op, &op_str, &cas_completed, |
340 | 2 | &callback](client::FlushStatus* flush_status) { |
341 | 2 | const auto& s = flush_status->status; |
342 | 2 | if (s.ok()) { |
343 | 1 | LOG(INFO) << "CAS operation completed: " << op_str; |
344 | 1 | cas_completed.store(true); |
345 | 1 | } else { |
346 | 1 | LOG(INFO) << "Error during CAS: " << s; |
347 | 1 | LOG(INFO) << "Retrying CAS: " << op_str; |
348 | 1 | session->Apply(op); |
349 | 1 | session->FlushAsync(callback); |
350 | 1 | } |
351 | 2 | }; |
352 | 1 | session->Apply(op); |
353 | 1 | session->FlushAsync(callback); |
354 | | |
355 | | // In case of bug ENG-3471 read part of CAS will be completed before appending pending ops to log, |
356 | | // so give it some time to CAS-read and prepare to put write (initial_value + 1) to RAFT log. |
357 | 1 | SleepFor(3s); |
358 | | |
359 | | // Let write (initial_value + 2) to be applied to DocDB. |
360 | 1 | SetBoolFlag(new_leader_ts_idx, "TEST_pause_update_majority_replicated", false); |
361 | | |
362 | | // Waiting to read (initial_value + 2) from new leader. |
363 | 1 | { |
364 | 1 | const auto desc = Format("Waiting to read $0...", initial_value + 2); |
365 | 1 | LOG(INFO) << desc; |
366 | 1 | ASSERT_OK(WaitFor( |
367 | 1 | [&last_read_value]{ return last_read_value == initial_value + 2; }, 60s, desc, small_delay, |
368 | 1 | 1)); |
369 | 1 | } |
370 | | |
371 | | // Resume CAS processing. |
372 | 1 | SetBoolFlag(new_leader_ts_idx, "TEST_pause_write_apply_after_if", false); |
373 | | |
374 | 1 | { |
375 | 1 | const auto desc = "Waiting for CAS to complete..."; |
376 | 1 | LOG(INFO) << desc; |
377 | 1 | ASSERT_OK(WaitFor([&cas_completed]{ return cas_completed.load(); }, 60s, desc, small_delay, 1)); |
378 | 1 | } |
379 | | |
380 | | // Give reader thread some time to read CAS result in case CAS condition was met. |
381 | 1 | SleepFor(3s); |
382 | | |
383 | 1 | stop_requested.store(true); |
384 | 1 | LOG(INFO) << "Waiting for reader thread..."; |
385 | 1 | reader.join(); |
386 | 1 | LOG(INFO) << "Reader thread stopped."; |
387 | | |
388 | 1 | LOG(INFO) << "Resuming " << TsNameForIndex(old_leader_ts_idx); |
389 | 1 | ASSERT_OK(old_leader_ts->Resume()); |
390 | | |
391 | 1 | ClusterVerifier cluster_verifier(external_mini_cluster()); |
392 | 1 | ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); |
393 | 1 | } |
394 | | |
395 | | } // namespace yb |