YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/ts_recovery-itest.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <string>
34
35
#include "yb/integration-tests/cluster_verifier.h"
36
#include "yb/integration-tests/external_mini_cluster.h"
37
#include "yb/integration-tests/test_workload.h"
38
39
#include "yb/rpc/rpc_controller.h"
40
41
#include "yb/tserver/tserver_admin.proxy.h"
42
43
#include "yb/util/size_literals.h"
44
#include "yb/util/test_util.h"
45
46
using std::string;
47
using namespace std::literals;
48
49
namespace yb {
50
51
class TsRecoveryITest : public YBTest {
52
 public:
53
2
  void TearDown() override {
54
2
    if (cluster_) cluster_->Shutdown();
55
2
    YBTest::TearDown();
56
2
  }
57
58
 protected:
59
  void StartCluster(const vector<string>& extra_tserver_flags = vector<string>(),
60
                    int num_tablet_servers = 1);
61
62
  std::unique_ptr<ExternalMiniCluster> cluster_;
63
};
64
65
void TsRecoveryITest::StartCluster(const vector<string>& extra_tserver_flags,
66
2
                                   int num_tablet_servers) {
67
2
  ExternalMiniClusterOptions opts;
68
2
  opts.num_tablet_servers = num_tablet_servers;
69
2
  opts.extra_tserver_flags = extra_tserver_flags;
70
2
  if (num_tablet_servers < 3) {
71
2
    opts.extra_master_flags.push_back("--replication_factor=1");
72
2
  }
73
2
  cluster_.reset(new ExternalMiniCluster(opts));
74
2
  ASSERT_OK(cluster_->Start());
75
2
}
76
77
// Test that we replay from the recovery directory, if it exists.
78
1
TEST_F(TsRecoveryITest, TestCrashDuringLogReplay) {
79
1
  ASSERT_NO_FATALS(StartCluster({ "--TEST_fault_crash_during_log_replay=0.05" }));
80
81
1
  TestWorkload work(cluster_.get());
82
1
  work.set_num_write_threads(4);
83
1
  work.set_write_batch_size(1);
84
1
  work.set_write_timeout_millis(100);
85
1
  work.set_timeout_allowed(true);
86
1
  work.Setup();
87
1
  work.Start();
88
20
  while (work.rows_inserted() < 200) {
89
19
    SleepFor(MonoDelta::FromMilliseconds(10));
90
19
  }
91
1
  work.StopAndJoin();
92
93
  // Now restart the server, which will result in log replay, which will crash
94
  // mid-replay with very high probability since we wrote at least 200 log
95
  // entries and we're injecting a fault 5% of the time.
96
1
  cluster_->tablet_server(0)->Shutdown();
97
98
  // Restart might crash very quickly and actually return a bad status, so we
99
  // ignore the result.
100
1
  WARN_NOT_OK(cluster_->tablet_server(0)->Restart(), "Restart failed");
101
102
  // Wait for the process to crash during log replay.
103
1
  for (int i = 0; i < 3000 && cluster_->tablet_server(0)->IsProcessAlive(); i++) {
104
0
    SleepFor(MonoDelta::FromMilliseconds(10));
105
0
  }
106
2
  ASSERT_FALSE(cluster_->tablet_server(0)->IsProcessAlive()) << "TS didn't crash!";
107
108
  // Now remove the crash flag, so the next replay will complete, and restart
109
  // the server once more.
110
1
  cluster_->tablet_server(0)->Shutdown();
111
1
  cluster_->tablet_server(0)->mutable_flags()->clear();
112
1
  ASSERT_OK(cluster_->tablet_server(0)->Restart());
113
114
1
  ClusterVerifier cluster_verifier(cluster_.get());
115
1
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
116
1
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCountWithRetries(work.table_name(),
117
1
                                       ClusterVerifier::AT_LEAST,
118
1
                                       work.rows_inserted(),
119
1
                                       MonoDelta::FromSeconds(30)));
120
1
}
121
122
1
TEST_F(TsRecoveryITest, CrashAfterLogSegmentPreAllocationg) {
123
1
  ASSERT_NO_FATALS(StartCluster({
124
1
      "--log_segment_size_bytes=2000",
125
1
      "--log_min_seconds_to_retain=0",
126
1
      "--retryable_request_timeout_secs=0",
127
1
      "--db_write_buffer_size=2000",
128
1
      "--TEST_log_fault_after_segment_allocation_min_replicate_index=10" }));
129
130
1
  auto& tserver = *cluster_->tablet_server(0);
131
132
1
  TestWorkload work(cluster_.get());
133
1
  work.set_num_write_threads(4);
134
1
  work.set_write_timeout_millis(100);
135
1
  work.set_timeout_allowed(true);
136
1
  work.set_payload_bytes(1_KB);
137
1
  work.Setup();
138
1
  work.Start();
139
1
  auto proxy = cluster_->GetProxy<tserver::TabletServerAdminServiceProxy>(&tserver);
140
8
  while (tserver.IsProcessAlive()) {
141
7
    tserver::FlushTabletsRequestPB req;
142
7
    req.set_dest_uuid(tserver.uuid());
143
7
    req.set_all_tablets(true);
144
7
    req.set_operation(tserver::FlushTabletsRequestPB::LOG_GC);
145
7
    tserver::FlushTabletsResponsePB resp;
146
7
    rpc::RpcController controller;
147
7
    controller.set_timeout(30s);
148
7
    WARN_NOT_OK(proxy.FlushTablets(req, &resp, &controller), "FlushTablets failed");
149
7
    SleepFor(MonoDelta::FromMilliseconds(10));
150
7
  }
151
1
  work.StopAndJoin();
152
153
11
  EraseIf([](const auto& flag) {
154
11
    return flag.find("TEST_log_fault_after_segment_allocation_min_replicate_index") !=
155
11
           std::string::npos;
156
11
  }, tserver.mutable_flags());
157
158
1
  ASSERT_OK(tserver.Restart());
159
160
1
  ClusterVerifier cluster_verifier(cluster_.get());
161
1
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
162
1
}
163
164
}  // namespace yb