YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/kv_table_ts_failover_write_if-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <atomic>
15
#include <memory>
16
#include <vector>
17
18
#include "yb/client/client-test-util.h"
19
#include "yb/client/client.h"
20
#include "yb/client/session.h"
21
#include "yb/client/table_creator.h"
22
#include "yb/client/yb_op.h"
23
24
#include "yb/common/ql_value.h"
25
26
#include "yb/integration-tests/cluster_itest_util.h"
27
#include "yb/integration-tests/cluster_verifier.h"
28
#include "yb/integration-tests/external_mini_cluster.h"
29
#include "yb/integration-tests/yb_table_test_base.h"
30
31
#include "yb/util/format.h"
32
#include "yb/util/status_format.h"
33
#include "yb/util/test_util.h"
34
35
#include "yb/yql/cql/ql/util/statement_result.h"
36
37
DECLARE_bool(TEST_combine_batcher_errors);
38
39
namespace yb {
40
41
using client::YBSessionPtr;
42
using client::YBSchemaBuilder;
43
using client::YBqlReadOp;
44
using client::YBqlWriteOp;
45
using itest::TServerDetails;
46
using std::shared_ptr;
47
using std::unique_ptr;
48
49
using namespace std::literals;
50
51
namespace {
52
53
const auto kTestCommandsTimeOut = 30s;
54
const auto kHeartBeatInterval = 1s;
55
const auto kLeaderFailureMaxMissedHeartbeatPeriods = 3;
56
const auto kKeyColumnName = "k";
57
const auto kValueColumnName = "v";
58
59
16
std::string TsNameForIndex(size_t idx) {
60
16
  return Format("ts-$0", idx + 1);
61
16
}
62
63
} // namespace
64
65
class KVTableTsFailoverWriteIfTest : public integration_tests::YBTableTestBase {
66
 public:
67
31
  bool use_external_mini_cluster() override { return true; }
68
69
2
  int num_tablets() override { return 1; }
70
71
1
  bool enable_ysql() override { return false; }
72
73
1
  void CustomizeExternalMiniCluster(ExternalMiniClusterOptions* opts) override {
74
1
    opts->extra_tserver_flags.push_back("--raft_heartbeat_interval_ms=" +
75
1
        yb::ToString(ToMilliseconds(kHeartBeatInterval)));
76
1
    opts->extra_tserver_flags.push_back("--leader_failure_max_missed_heartbeat_periods=" +
77
1
        yb::ToString(kLeaderFailureMaxMissedHeartbeatPeriods));
78
1
  }
79
80
1
  void SetUp() override {
81
1
    YBTableTestBase::SetUp();
82
1
    ts_map_ = ASSERT_RESULT(itest::CreateTabletServerMap(external_mini_cluster()));
83
1
    ts_details_.clear();
84
4
    for (size_t i = 0; i < external_mini_cluster()->num_tablet_servers(); ++i) {
85
3
      std::string ts_id = external_mini_cluster()->tablet_server(i)->uuid();
86
3
      LOG(INFO) << TsNameForIndex(i) << ": " << ts_id;
87
3
      TServerDetails* ts = ts_map_[ts_id].get();
88
3
      ASSERT_EQ(ts->uuid(), ts_id);
89
3
      ts_details_.push_back(ts);
90
3
    }
91
1
  }
92
93
2
  void SetValueAsync(const YBSessionPtr& session, int32_t key, int32_t value) {
94
2
    const auto insert = table_.NewInsertOp();
95
2
    auto* const req = insert->mutable_request();
96
2
    QLAddInt32HashValue(req, key);
97
2
    table_.AddInt32ColumnValue(req, kValueColumnName, value);
98
2
    string op_str = Format("$0: $1", key, value);
99
2
    LOG(INFO) << "Sending write: " << op_str;
100
2
    session->Apply(insert);
101
2
    session->FlushAsync([insert, op_str](client::FlushStatus* flush_status) {
102
2
      const auto& s = flush_status->status;
103
4
      ASSERT_TRUE(s.ok() || s.IsAlreadyPresent())
104
4
          << "Failed to flush write " << op_str << ". Error: " << s;
105
2
      if (s.ok()) {
106
4
        ASSERT_EQ(insert->response().status(), QLResponsePB::YQL_STATUS_OK)
107
4
            << "Failed to write " << op_str;
108
2
      }
109
2
      LOG(INFO) << "Written: " << op_str;
110
2
    });
111
2
  }
112
113
1
  shared_ptr<YBqlWriteOp> CreateWriteIfOp(int32_t key, int32_t old_value, int32_t new_value) {
114
1
    const auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_UPDATE);
115
1
    auto* const req = op->mutable_request();
116
    // Set v = new_value.
117
1
    table_.AddInt32ColumnValue(req, kValueColumnName, new_value);
118
    // If v = old_value.
119
1
    table_.SetInt32Condition(
120
1
        req->mutable_if_expr()->mutable_condition(), kValueColumnName, QL_OP_EQUAL, old_value);
121
1
    req->mutable_column_refs()->add_ids(table_.ColumnId(kValueColumnName));
122
    // And k = key.
123
1
    QLAddInt32HashValue(req, key);
124
1
    return op;
125
1
  }
126
127
63
  boost::optional<int32_t> GetValue(const YBSessionPtr& session, int32_t key) {
128
63
    const auto op = client::CreateReadOp(key, table_, kValueColumnName);
129
63
    Status s = session->ApplyAndFlush(op);
130
63
    if (!s.ok()) {
131
0
      return boost::none;
132
0
    }
133
63
    auto rowblock = ql::RowsResult(op.get()).GetRowBlock();
134
63
    EXPECT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK);
135
63
    if (rowblock->row_count() == 0) {
136
0
      return boost::none;
137
0
    }
138
63
    EXPECT_EQ(1, rowblock->row_count());
139
63
    return rowblock->row(0).column(0).int32_value();
140
63
  }
141
142
1
  void CreateTable() override {
143
1
    if (!table_exists_) {
144
1
      const auto table = table_name();
145
1
      ASSERT_OK(client_->CreateNamespaceIfNotExists(table.namespace_name(),
146
1
                                                    table.namespace_type()));
147
148
1
      YBSchemaBuilder b;
149
1
      b.AddColumn(kKeyColumnName)->Type(INT32)->NotNull()->HashPrimaryKey();
150
1
      b.AddColumn(kValueColumnName)->Type(INT32)->NotNull();
151
1
      ASSERT_OK(b.Build(&schema_));
152
153
1
      ASSERT_OK(NewTableCreator()->table_name(table_name()).schema(&schema_).Create());
154
1
      table_exists_ = true;
155
1
    }
156
1
  }
157
158
1
  Result<int> GetTabletServerLeaderIndex(const std::string& tablet_id) {
159
1
    TServerDetails* leader_ts_details;
160
1
    Status s = FindTabletLeader(ts_map_, tablet_id, kTestCommandsTimeOut, &leader_ts_details);
161
1
    RETURN_NOT_OK(s);
162
1
    const int idx = external_mini_cluster()->tablet_server_index_by_uuid(leader_ts_details->uuid());
163
1
    if (idx < 0) {
164
0
      return STATUS_FORMAT(
165
0
          IllegalState, "Not found tablet server with uuid: $0", leader_ts_details->uuid());
166
0
    }
167
1
    LOG(INFO) << "Tablet server leader detected: " << TsNameForIndex(idx);
168
1
    return idx;
169
1
  }
170
171
1
  Result<int> GetTabletServerRaftLeaderIndex(const std::string& tablet_id) {
172
1
    const MonoDelta kMinTimeout = 1s;
173
1
    MonoTime start = MonoTime::Now();
174
1
    MonoTime deadline = start + 60s;
175
1
    Status s;
176
1
    int i = 0;
177
4
    while (true) {
178
4
      MonoDelta remaining_timeout = deadline - MonoTime::Now();
179
4
      TServerDetails* ts = ts_details_[i];
180
4
      consensus::ConsensusStatePB cstate;
181
4
      s = itest::GetConsensusState(ts,
182
4
                                  tablet_id,
183
4
                                  consensus::ConsensusConfigType::CONSENSUS_CONFIG_ACTIVE,
184
4
                                  std::min(remaining_timeout, kMinTimeout),
185
4
                                  &cstate);
186
4
      if (s.ok() && cstate.has_leader_uuid() && cstate.leader_uuid() == ts->uuid()) {
187
1
       LOG(INFO) << "Tablet server RAFT leader detected: " << TsNameForIndex(i);
188
1
       return i;
189
1
      }
190
3
      if (MonoTime::Now() > deadline) {
191
0
       break;
192
0
      }
193
3
      SleepFor(100ms);
194
3
      i = (i + 1) % ts_details_.size();
195
3
    }
196
0
    return STATUS(NotFound, "No tablet server RAFT leader detected");
197
1
  }
198
199
8
  void SetBoolFlag(size_t ts_idx, const std::string& flag, bool value) {
200
8
    auto ts = external_mini_cluster()->tablet_server(ts_idx);
201
8
    LOG(INFO) << "Setting " << flag << " to " << value << " on " << TsNameForIndex(ts_idx);
202
8
    ASSERT_OK(external_mini_cluster()->SetFlag(ts, flag, yb::ToString(value)));
203
8
  }
204
205
  itest::TabletServerMap ts_map_;
206
207
  // TServerDetails instances referred by ts_details_ are owned by ts_map_.
208
  vector<TServerDetails*> ts_details_;
209
};
210
211
// Test for ENG-3471 - shouldn't run write-if when leader hasn't yet committed all pendings ops.
212
1
TEST_F(KVTableTsFailoverWriteIfTest, KillTabletServerDuringReplication) {
213
1
  FLAGS_TEST_combine_batcher_errors = true;
214
215
1
  const int32_t key = 0;
216
1
  const int32_t initial_value = 10000;
217
1
  const auto small_delay = 100ms;
218
219
1
  const auto cluster = external_mini_cluster();
220
1
  const auto num_ts = cluster->num_tablet_servers();
221
222
1
  vector<string> tablet_ids;
223
1
  do {
224
1
    ASSERT_OK(itest::ListRunningTabletIds(
225
1
        ts_map_.begin()->second.get(), kTestCommandsTimeOut, &tablet_ids));
226
1
  } while (tablet_ids.empty());
227
1
  const auto tablet_id = tablet_ids[0];
228
229
1
  const auto old_leader_ts_idx = ASSERT_RESULT(GetTabletServerLeaderIndex(tablet_id));
230
1
  auto* const old_leader_ts = cluster->tablet_server(old_leader_ts_idx);
231
232
  // Select replica to be a new leader.
233
1
  const auto new_leader_ts_idx = (old_leader_ts_idx + 1) % num_ts;
234
1
  const auto new_leader_name = TsNameForIndex(new_leader_ts_idx);
235
1
  auto* const new_leader_ts = cluster->tablet_server(new_leader_ts_idx);
236
237
  // Select replica to be follower all the time.
238
1
  const auto follower_replica_ts_idx = (new_leader_ts_idx + 1) % num_ts;
239
240
1
  SetValueAsync(session_, key, initial_value);
241
242
  // Run concurrent reads.
243
1
  std::atomic<bool> stop_requested(false);
244
1
  std::atomic<int32_t> last_read_value(0);
245
1
  std::thread reader([this, &stop_requested, &small_delay, &last_read_value] {
246
1
    auto session = NewSession();
247
1
    MonoTime log_deadline(MonoTime::Min());
248
249
64
    while (!stop_requested) {
250
63
      SleepFor(small_delay);
251
63
      auto val = GetValue(session, key);
252
63
      if (!val) {
253
0
        continue;
254
0
      }
255
63
      const auto last_value = last_read_value.exchange(*val);
256
63
      if (*val != last_value || MonoTime::Now() > log_deadline) {
257
8
        LOG(INFO) << "Read value: " << key << ": " << *val;
258
8
        log_deadline = MonoTime::Now() + 1s;
259
8
      }
260
63
      ASSERT_GE(*val, last_value);
261
63
    }
262
1
  });
263
264
  // Make sure we read initial value.
265
1
  ASSERT_OK(LoggedWaitFor(
266
1
      [&last_read_value]{ return last_read_value == initial_value; }, 60s,
267
1
      "Waiting to read initial value...", small_delay));
268
269
  // Prevent follower_replica_ts_idx from being elected as a new leader.
270
1
  SetBoolFlag(follower_replica_ts_idx, "TEST_follower_reject_update_consensus_requests", true);
271
272
1
  {
273
1
    LogWaiter log_waiter(new_leader_ts, "Pausing due to flag TEST_pause_update_replica");
274
275
    // Pause Consensus Update RPC processing on new_leader_ts to delay initial_value + 2 replication
276
    // till new_leader_ts is elected as a new leader.
277
1
    SetBoolFlag(new_leader_ts_idx, "TEST_pause_update_replica", true);
278
279
    // Send write initial_value + 2, it won't be fully replicated until we resume UpdateReplica and
280
    // UpdateMajorityReplicated.
281
1
    SetValueAsync(session_, key, initial_value + 2);
282
283
1
    LOG(INFO) << Format("Waiting for Consensus Update RPC to arrive $0...", new_leader_name);
284
1
    ASSERT_OK(log_waiter.WaitFor(60s));
285
1
  }
286
287
1
  {
288
1
    LogWaiter log_waiter(new_leader_ts, "Starting NORMAL_ELECTION...");
289
290
    // Pausing leader and waiting, so replicas will detect leader failure.
291
1
    LOG(INFO) << "Pausing " << TsNameForIndex(old_leader_ts_idx);
292
1
    ASSERT_OK(old_leader_ts->Pause());
293
294
1
    LOG(INFO) << Format("Waiting for election to start on $0...", new_leader_name);
295
1
    ASSERT_OK(log_waiter.WaitFor(60s));
296
1
  }
297
298
1
  {
299
1
    LogWaiter log_waiter(
300
1
        new_leader_ts, "Pausing due to flag TEST_pause_update_majority_replicated");
301
302
    // Pause applying write ops on new_leader_ts_idx, so initial_value + 2 won't be applied to DocDB
303
    // yet after going to RAFT log.
304
1
    SetBoolFlag(new_leader_ts_idx, "TEST_pause_update_majority_replicated", true);
305
306
    // Resume UpdateReplica on new_leader_ts_idx, so it can:
307
    // 1 - Trigger election (RaftConsensus::ReportFailureDetectedTask is waiting on lock inside
308
    // LOG_WITH_PREFIX).
309
    // 2 - Append initial_value + 2 to RAFT log.
310
1
    SetBoolFlag(new_leader_ts_idx, "TEST_pause_update_replica", false);
311
312
    // new_leader_ts_idx will become leader, but might not be ready to serve due to pending write
313
    // ops to apply.
314
1
    {
315
1
      const auto leader_idx = ASSERT_RESULT(GetTabletServerRaftLeaderIndex(tablet_id));
316
1
      ASSERT_EQ(leader_idx, new_leader_ts_idx);
317
1
    }
318
319
    // We need follower_replica_ts_idx to be able to process Consensus Update RPC from new leader.
320
1
    SetBoolFlag(follower_replica_ts_idx, "TEST_follower_reject_update_consensus_requests", false);
321
322
1
    LOG(INFO) << Format(
323
1
        "Waiting for $0 to append to RAFT log on $1...", initial_value + 2, new_leader_name);
324
1
    ASSERT_OK(log_waiter.WaitFor(60s));
325
1
  }
326
327
  // Make following CAS to pause after doing read and evaluating if part.
328
1
  SetBoolFlag(new_leader_ts_idx, "TEST_pause_write_apply_after_if", true);
329
330
  // Send CAS initial_value -> initial_value + 1.
331
1
  std::atomic<bool> cas_completed(false);
332
1
  const auto cas_old_value = initial_value;
333
1
  const auto cas_new_value = initial_value + 1;
334
1
  const auto op = CreateWriteIfOp(key, cas_old_value, cas_new_value);
335
1
  const auto op_str = Format("$0: $1 -> $2", key, cas_old_value, cas_new_value);
336
1
  LOG(INFO) << "Sending CAS " << op_str;
337
1
  auto session = NewSession();
338
1
  session->SetTimeout(15s);
339
1
  client::FlushCallback callback = [&session, &op, &op_str, &cas_completed,
340
2
                                    &callback](client::FlushStatus* flush_status) {
341
2
    const auto& s = flush_status->status;
342
2
    if (s.ok()) {
343
1
      LOG(INFO) << "CAS operation completed: " << op_str;
344
1
      cas_completed.store(true);
345
1
    } else {
346
1
      LOG(INFO) << "Error during CAS: " << s;
347
1
      LOG(INFO) << "Retrying CAS: " << op_str;
348
1
      session->Apply(op);
349
1
      session->FlushAsync(callback);
350
1
    }
351
2
  };
352
1
  session->Apply(op);
353
1
  session->FlushAsync(callback);
354
355
  // In case of bug ENG-3471 read part of CAS will be completed before appending pending ops to log,
356
  // so give it some time to CAS-read and prepare to put write (initial_value + 1) to RAFT log.
357
1
  SleepFor(3s);
358
359
  // Let write (initial_value + 2) to be applied to DocDB.
360
1
  SetBoolFlag(new_leader_ts_idx, "TEST_pause_update_majority_replicated", false);
361
362
  // Waiting to read (initial_value + 2) from new leader.
363
1
  {
364
1
    const auto desc = Format("Waiting to read $0...", initial_value + 2);
365
1
    LOG(INFO) << desc;
366
1
    ASSERT_OK(WaitFor(
367
1
        [&last_read_value]{ return last_read_value == initial_value + 2; }, 60s, desc, small_delay,
368
1
        1));
369
1
  }
370
371
  // Resume CAS processing.
372
1
  SetBoolFlag(new_leader_ts_idx, "TEST_pause_write_apply_after_if", false);
373
374
1
  {
375
1
    const auto desc = "Waiting for CAS to complete...";
376
1
    LOG(INFO) << desc;
377
1
    ASSERT_OK(WaitFor([&cas_completed]{ return cas_completed.load(); }, 60s, desc, small_delay, 1));
378
1
  }
379
380
  // Give reader thread some time to read CAS result in case CAS condition was met.
381
1
  SleepFor(3s);
382
383
1
  stop_requested.store(true);
384
1
  LOG(INFO) << "Waiting for reader thread...";
385
1
  reader.join();
386
1
  LOG(INFO) << "Reader thread stopped.";
387
388
1
  LOG(INFO) << "Resuming " << TsNameForIndex(old_leader_ts_idx);
389
1
  ASSERT_OK(old_leader_ts->Resume());
390
391
1
  ClusterVerifier cluster_verifier(external_mini_cluster());
392
1
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
393
1
}
394
395
}  // namespace yb