YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_server-test-base.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tserver/tablet_server-test-base.h"
15
16
#include "yb/client/yb_table_name.h"
17
18
#include "yb/common/ql_expr.h"
19
#include "yb/common/wire_protocol-test-util.h"
20
21
#include "yb/consensus/consensus.h"
22
#include "yb/consensus/consensus.proxy.h"
23
24
#include "yb/docdb/ql_rowwise_iterator_interface.h"
25
26
#include "yb/rpc/messenger.h"
27
#include "yb/rpc/proxy.h"
28
#include "yb/rpc/rpc_controller.h"
29
30
#include "yb/server/server_base.proxy.h"
31
32
#include "yb/tablet/local_tablet_writer.h"
33
#include "yb/tablet/tablet.h"
34
#include "yb/tablet/tablet_peer.h"
35
36
#include "yb/tserver/mini_tablet_server.h"
37
#include "yb/tserver/tablet_server.h"
38
#include "yb/tserver/tablet_server_test_util.h"
39
#include "yb/tserver/ts_tablet_manager.h"
40
#include "yb/tserver/tserver_admin.proxy.h"
41
#include "yb/tserver/tserver_service.proxy.h"
42
43
#include "yb/util/metrics.h"
44
#include "yb/util/status_log.h"
45
#include "yb/util/test_graph.h"
46
47
using namespace std::literals;
48
49
DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in seconds");
50
DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
51
DECLARE_bool(durable_wal_write);
52
DECLARE_bool(enable_maintenance_manager);
53
DECLARE_bool(enable_data_block_fsync);
54
DECLARE_int32(heartbeat_rpc_timeout_ms);
55
56
METRIC_DEFINE_entity(test);
57
58
namespace yb {
59
60
namespace client {
61
class YBTableName;
62
}
63
64
namespace tserver {
65
66
TabletServerTestBase::TabletServerTestBase(TableType table_type)
67
    : schema_(GetSimpleTestSchema()),
68
      table_type_(table_type),
69
      ts_test_metric_entity_(METRIC_ENTITY_test.Instantiate(
70
150
                                 &ts_test_metric_registry_, "ts_server-test")) {
71
  // Disable the maintenance ops manager since we want to trigger our own
72
  // maintenance operations at predetermined times.
73
150
  FLAGS_enable_maintenance_manager = false;
74
75
  // Decrease heartbeat timeout: we keep re-trying heartbeats when a
76
  // single master server fails due to a network error. Decreasing
77
  // the heartbeat timeout to 1 second speeds up unit tests which
78
  // purposefully specify non-running Master servers.
79
150
  FLAGS_heartbeat_rpc_timeout_ms = 1000;
80
81
  // Keep unit tests fast, but only if no one has set the flag explicitly.
82
150
  if (google::GetCommandLineFlagInfoOrDie("enable_data_block_fsync").is_default) {
83
150
    FLAGS_enable_data_block_fsync = false;
84
150
  }
85
150
}
86
87
116
TabletServerTestBase::~TabletServerTestBase() {}
88
89
// Starts the tablet server, override to start it later.
90
150
void TabletServerTestBase::SetUp() {
91
150
  YBTest::SetUp();
92
93
150
  key_schema_ = schema_.CreateKeyProjection();
94
95
150
  client_messenger_ = ASSERT_RESULT(rpc::MessengerBuilder("Client").Build());
96
0
  proxy_cache_ = std::make_unique<rpc::ProxyCache>(client_messenger_.get());
97
150
}
98
99
116
void TabletServerTestBase::TearDown() {
100
116
  client_messenger_->Shutdown();
101
116
  tablet_peer_.reset();
102
116
  if (mini_server_) {
103
31
    mini_server_->Shutdown();
104
31
  }
105
116
}
106
107
31
void TabletServerTestBase::StartTabletServer() {
108
  // Start server with an invalid master address, so it never successfully
109
  // heartbeats, even if there happens to be a master running on this machine.
110
31
  auto mini_ts =
111
31
      MiniTabletServer::CreateMiniTabletServer(GetTestPath("TabletServerTest-fsroot"), 0);
112
31
  CHECK_OK(mini_ts);
113
31
  mini_server_ = std::move(*mini_ts);
114
31
  auto addr = std::make_shared<server::MasterAddresses>();
115
31
  addr->push_back({HostPort("255.255.255.255", 1)});
116
31
  mini_server_->options()->SetMasterAddresses(addr);
117
31
  CHECK_OK(mini_server_->Start());
118
119
  // Set up a tablet inside the server.
120
31
  CHECK_OK(mini_server_->AddTestTablet(
121
31
      kTableName.namespace_name(), kTableName.table_name(), kTabletId, schema_, table_type_));
122
31
  CHECK(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet_peer_));
123
124
  // Creating a tablet is async, we wait here instead of having to handle errors later.
125
31
  CHECK_OK(WaitForTabletRunning(kTabletId));
126
127
  // Connect to it.
128
31
  ResetClientProxies();
129
31
}
130
131
34
Status TabletServerTestBase::WaitForTabletRunning(const char *tablet_id) {
132
34
  auto* tablet_manager = mini_server_->server()->tablet_manager();
133
34
  std::shared_ptr<tablet::TabletPeer> tablet_peer;
134
34
  RETURN_NOT_OK(tablet_manager->GetTabletPeer(tablet_id, &tablet_peer));
135
136
  // Sometimes the disk can be really slow and hence we need a high timeout to wait for consensus.
137
34
  RETURN_NOT_OK(tablet_peer->WaitUntilConsensusRunning(MonoDelta::FromSeconds(60)));
138
139
34
  RETURN_NOT_OK(tablet_peer->consensus()->EmulateElection());
140
141
68
  
return WaitFor([tablet_manager, tablet_peer, tablet_id]() 34
{
142
68
        if (tablet_manager->IsTabletInTransition(tablet_id)) {
143
4
          return false;
144
4
        }
145
64
        return tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY;
146
68
      },
147
34
      10s, Format("Complete state transitions for tablet $0", tablet_id));
148
34
}
149
150
void TabletServerTestBase::UpdateTestRowRemote(int tid,
151
                                               int32_t row_idx,
152
                                               int32_t new_val,
153
0
                                               TimeSeries *ts) {
154
0
  WriteRequestPB req;
155
0
  req.set_tablet_id(kTabletId);
156
157
0
  WriteResponsePB resp;
158
0
  rpc::RpcController controller;
159
0
  controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
160
0
  string new_string_val(strings::Substitute("mutated$0", row_idx));
161
162
0
  AddTestRowUpdate(row_idx, new_val, new_string_val, &req);
163
0
  ASSERT_OK(proxy_->Write(req, &resp, &controller));
164
165
0
  SCOPED_TRACE(resp.DebugString());
166
0
  ASSERT_FALSE(resp.has_error())<< resp.ShortDebugString();
167
0
  ASSERT_EQ(0, resp.per_row_errors_size());
168
0
  if (ts) {
169
0
    ts->AddValue(1);
170
0
  }
171
0
}
172
173
33
void TabletServerTestBase::ResetClientProxies() {
174
33
  CreateTsClientProxies(HostPort::FromBoundEndpoint(mini_server_->bound_rpc_addr()),
175
33
                        proxy_cache_.get(),
176
33
                        &proxy_, &admin_proxy_, &consensus_proxy_, &generic_proxy_);
177
33
}
178
179
// Inserts 'num_rows' test rows directly into the tablet (i.e not via RPC)
180
0
void TabletServerTestBase::InsertTestRowsDirect(int32_t start_row, int32_t num_rows) {
181
0
  tablet::LocalTabletWriter writer(tablet_peer_->tablet());
182
0
  QLWriteRequestPB req;
183
0
  for (int i = 0; i < num_rows; i++) {
184
0
    BuildTestRow(start_row + i, &req);
185
0
    CHECK_OK(writer.Write(&req));
186
0
  }
187
0
}
188
189
// Inserts 'num_rows' test rows remotely into the tablet (i.e via RPC)
190
// Rows are grouped in batches of 'count'/'num_batches' size.
191
// Batch size defaults to 1.
192
void TabletServerTestBase::InsertTestRowsRemote(int tid,
193
                                                int32_t first_row,
194
                                                int32_t count,
195
                                                int32_t num_batches,
196
                                                TabletServerServiceProxy* proxy,
197
                                                string tablet_id,
198
                                                vector<uint64_t>* write_hybrid_times_collector,
199
                                                TimeSeries *ts,
200
8.13k
                                                bool string_field_defined) {
201
8.13k
  const int kNumRetries = 10;
202
203
8.13k
  if (!proxy) {
204
8.13k
    proxy = proxy_.get();
205
8.13k
  }
206
207
8.13k
  if (num_batches == -1) {
208
8.13k
    num_batches = count;
209
8.13k
  }
210
211
8.13k
  WriteRequestPB req;
212
8.13k
  req.set_tablet_id(tablet_id);
213
214
8.13k
  WriteResponsePB resp;
215
8.13k
  rpc::RpcController controller;
216
217
8.13k
  uint64_t inserted_since_last_report = 0;
218
17.3k
  for (int i = 0; i < num_batches; 
++i9.21k
) {
219
9.21k
    for (int r = kNumRetries; 
r-- > 09.21k
;) {
220
      // reset the controller and the request
221
9.21k
      controller.Reset();
222
9.21k
      controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
223
9.21k
      req.clear_ql_write_batch();
224
225
9.21k
      auto first_row_in_batch = first_row + (i * count / num_batches);
226
9.21k
      auto last_row_in_batch = first_row_in_batch + count / num_batches;
227
228
18.4k
      for (int j = first_row_in_batch; j < last_row_in_batch; 
j++9.22k
) {
229
9.22k
        if (!string_field_defined) {
230
1
          AddTestRowInsert(j, j, &req);
231
9.22k
        } else {
232
9.22k
          AddTestRowInsert(j, j, strings::Substitute("original$0", j), &req);
233
9.22k
        }
234
9.22k
      }
235
9.21k
      CHECK_OK(DCHECK_NOTNULL(proxy)->Write(req, &resp, &controller));
236
9.21k
      if (write_hybrid_times_collector) {
237
0
        write_hybrid_times_collector->push_back(resp.propagated_hybrid_time());
238
0
      }
239
240
9.22k
      if (
!resp.has_error()9.21k
&&
resp.per_row_errors_size() == 09.21k
) {
241
9.22k
        break;
242
9.22k
      }
243
244
18.4E
      if (r == 0) {
245
0
        LOG(FATAL) << "Failed to insert batch "
246
0
                   << first_row_in_batch << "-" << last_row_in_batch
247
0
                   << ": " << resp.DebugString();
248
18.4E
      } else {
249
18.4E
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
250
18.4E
      }
251
18.4E
    }
252
253
9.21k
    inserted_since_last_report += count / num_batches;
254
9.21k
    if ((inserted_since_last_report > 100) && 
ts0
) {
255
0
      ts->AddValue(static_cast<double>(inserted_since_last_report));
256
0
      inserted_since_last_report = 0;
257
0
    }
258
9.21k
  }
259
260
8.13k
  if (ts) {
261
0
    ts->AddValue(static_cast<double>(inserted_since_last_report));
262
0
  }
263
8.13k
}
264
265
// Delete specified test row range.
266
void TabletServerTestBase::DeleteTestRowsRemote(int32_t first_row,
267
                                                int32_t count,
268
                                                TabletServerServiceProxy* proxy,
269
1
                                                string tablet_id) {
270
1
  if (!proxy) {
271
1
    proxy = proxy_.get();
272
1
  }
273
274
1
  WriteRequestPB req;
275
1
  WriteResponsePB resp;
276
1
  rpc::RpcController controller;
277
278
1
  req.set_tablet_id(tablet_id);
279
280
2
  for (int32_t rowid = first_row; rowid < first_row + count; 
rowid++1
) {
281
1
    AddTestRowDelete(rowid, &req);
282
1
  }
283
284
1
  SCOPED_TRACE(req.DebugString());
285
1
  ASSERT_OK(proxy_->Write(req, &resp, &controller));
286
1
  SCOPED_TRACE(resp.DebugString());
287
2
  ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString();
288
1
}
289
290
0
void TabletServerTestBase::BuildTestRow(int index, QLWriteRequestPB* req) {
291
0
  req->add_hashed_column_values()->mutable_value()->set_int32_value(index);
292
0
  auto column_value = req->add_column_values();
293
0
  column_value->set_column_id(kFirstColumnId + 1);
294
0
  column_value->mutable_expr()->mutable_value()->set_int32_value(index * 2);
295
0
  column_value = req->add_column_values();
296
0
  column_value->set_column_id(kFirstColumnId + 2);
297
0
  column_value->mutable_expr()->mutable_value()->set_string_value(
298
0
      StringPrintf("hello %d", index));
299
0
}
300
301
5
void TabletServerTestBase::ShutdownTablet() {
302
5
  if (mini_server_.get()) {
303
    // The tablet peer must be destroyed before the TS, otherwise data
304
    // blocks may be destroyed after their owning block manager.
305
4
    tablet_peer_.reset();
306
4
    mini_server_->Shutdown();
307
4
    mini_server_.reset();
308
4
  }
309
5
}
310
311
4
Status TabletServerTestBase::ShutdownAndRebuildTablet() {
312
4
  ShutdownTablet();
313
314
  // Start server.
315
4
  auto mini_ts =
316
4
      MiniTabletServer::CreateMiniTabletServer(GetTestPath("TabletServerTest-fsroot"), 0);
317
4
  CHECK_OK(mini_ts);
318
4
  mini_server_ = std::move(*mini_ts);
319
4
  auto addr = std::make_shared<server::MasterAddresses>();
320
4
  addr->push_back({HostPort("255.255.255.255", 1)});
321
4
  mini_server_->options()->SetMasterAddresses(addr);
322
  // this should open the tablet created on StartTabletServer()
323
4
  RETURN_NOT_OK(mini_server_->Start());
324
4
  RETURN_NOT_OK(mini_server_->WaitStarted());
325
326
3
  if (!mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet_peer_)) {
327
1
    return STATUS(NotFound, "Tablet was not found");
328
1
  }
329
  // Connect to it.
330
2
  ResetClientProxies();
331
332
  // Opening a tablet is async, we wait here instead of having to handle errors later.
333
2
  RETURN_NOT_OK(WaitForTabletRunning(kTabletId));
334
2
  return Status::OK();
335
2
}
336
337
// Verifies that a set of expected rows (key, value) is present in the tablet.
338
5
void TabletServerTestBase::VerifyRows(const Schema& schema, const vector<KeyValue>& expected) {
339
5
  auto iter = tablet_peer_->tablet()->NewRowIterator(schema);
340
5
  ASSERT_OK(iter);
341
342
5
  int count = 0;
343
5
  QLTableRow row;
344
16
  while (ASSERT_RESULT((**iter).HasNext())) {
345
16
    ASSERT_OK_FAST((**iter).NextRow(&row));
346
16
    ++count;
347
16
  }
348
5
  ASSERT_EQ(count, expected.size());
349
5
}
350
351
const client::YBTableName TabletServerTestBase::kTableName(
352
    YQL_DATABASE_CQL, "my_keyspace", "test-table");
353
const char* TabletServerTestBase::kTabletId = "test-tablet";
354
355
} // namespace tserver
356
} // namespace yb