YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/integration-tests/cluster_itest_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/integration-tests/cluster_itest_util.h"
34
35
#include <stdint.h>
36
37
#include <algorithm>
38
#include <limits>
39
#include <memory>
40
#include <mutex>
41
#include <string>
42
#include <utility>
43
#include <vector>
44
45
#include <boost/optional.hpp>
46
#include <glog/stl_logging.h>
47
#include <gtest/gtest.h>
48
49
#include "yb/client/schema.h"
50
#include "yb/client/yb_table_name.h"
51
52
#include "yb/common/wire_protocol-test-util.h"
53
#include "yb/common/wire_protocol.h"
54
#include "yb/common/wire_protocol.pb.h"
55
56
#include "yb/consensus/consensus.proxy.h"
57
#include "yb/consensus/consensus_meta.h"
58
#include "yb/consensus/consensus_types.pb.h"
59
#include "yb/consensus/opid_util.h"
60
#include "yb/consensus/quorum_util.h"
61
62
#include "yb/gutil/strings/substitute.h"
63
64
#include "yb/integration-tests/external_mini_cluster.h"
65
66
#include "yb/master/master_client.proxy.h"
67
#include "yb/master/master_cluster.proxy.h"
68
69
#include "yb/rpc/rpc_fwd.h"
70
71
#include "yb/server/server_base.proxy.h"
72
73
#include "yb/tserver/tablet_server_test_util.h"
74
#include "yb/tserver/tserver_admin.proxy.h"
75
#include "yb/tserver/tserver_service.pb.h"
76
#include "yb/tserver/tserver_service.proxy.h"
77
78
#include "yb/util/enums.h"
79
#include "yb/util/format.h"
80
#include "yb/util/logging.h"
81
#include "yb/util/monotime.h"
82
#include "yb/util/net/net_fwd.h"
83
#include "yb/util/net/net_util.h"
84
#include "yb/util/result.h"
85
#include "yb/util/status.h"
86
#include "yb/util/status_format.h"
87
#include "yb/util/status_log.h"
88
#include "yb/util/strongly_typed_bool.h"
89
#include "yb/util/test_util.h"
90
91
namespace yb {
92
namespace itest {
93
94
using client::YBClient;
95
using client::YBSchema;
96
using client::YBSchemaBuilder;
97
using client::YBTable;
98
using client::YBTableName;
99
using consensus::CONSENSUS_CONFIG_ACTIVE;
100
using consensus::CONSENSUS_CONFIG_COMMITTED;
101
using consensus::ChangeConfigRequestPB;
102
using consensus::ChangeConfigResponsePB;
103
using consensus::ConsensusConfigType;
104
using consensus::ConsensusStatePB;
105
using consensus::CountVoters;
106
using consensus::GetConsensusStateRequestPB;
107
using consensus::GetConsensusStateResponsePB;
108
using consensus::GetLastOpIdRequestPB;
109
using consensus::GetLastOpIdResponsePB;
110
using consensus::LeaderStepDownRequestPB;
111
using consensus::LeaderStepDownResponsePB;
112
using consensus::RaftPeerPB;
113
using consensus::RunLeaderElectionResponsePB;
114
using consensus::RunLeaderElectionRequestPB;
115
using consensus::kInvalidOpIdIndex;
116
using consensus::LeaderLeaseCheckMode;
117
using consensus::LeaderLeaseStatus;
118
using master::ListTabletServersResponsePB;
119
using master::TabletLocationsPB;
120
using rpc::Messenger;
121
using rpc::RpcController;
122
using std::min;
123
using std::shared_ptr;
124
using std::string;
125
using std::unordered_map;
126
using std::vector;
127
using strings::Substitute;
128
using tserver::CreateTsClientProxies;
129
using tserver::ListTabletsResponsePB;
130
using tserver::DeleteTabletRequestPB;
131
using tserver::DeleteTabletResponsePB;
132
using tserver::TabletServerAdminServiceProxy;
133
using tserver::TabletServerErrorPB;
134
using tserver::TabletServerServiceProxy;
135
using tserver::WriteRequestPB;
136
using tserver::WriteResponsePB;
137
138
689
TServerDetails::TServerDetails() : registration(new master::TSRegistrationPB) {
139
689
}
140
141
457
TServerDetails::~TServerDetails() = default;
142
143
5.47k
const string& TServerDetails::uuid() const {
144
5.47k
  return instance_id.permanent_uuid();
145
5.47k
}
146
147
799
std::string TServerDetails::ToString() const {
148
799
  return Format("TabletServer: $0, Rpc address: $1", instance_id.permanent_uuid(),
149
799
                DesiredHostPort(registration->common(), CloudInfoPB()));
150
799
}
151
152
1
client::YBSchema SimpleIntKeyYBSchema() {
153
1
  YBSchema s;
154
1
  YBSchemaBuilder b;
155
1
  b.AddColumn("key")->Type(INT32)->NotNull()->PrimaryKey();
156
1
  CHECK_OK(b.Build(&s));
157
1
  return s;
158
1
}
159
160
Result<std::vector<OpId>> GetLastOpIdForEachReplica(
161
    const TabletId& tablet_id,
162
    const std::vector<TServerDetails*>& replicas,
163
    consensus::OpIdType opid_type,
164
    const MonoDelta& timeout,
165
188
    consensus::OperationType op_type) {
166
188
  struct Getter {
167
188
    const TabletId& tablet_id;
168
188
    consensus::OpIdType opid_type;
169
188
    consensus::OperationType op_type;
170
171
405
    Result<OpId> operator()(TServerDetails* ts, rpc::RpcController* controller) const {
172
405
      GetLastOpIdRequestPB opid_req;
173
405
      GetLastOpIdResponsePB opid_resp;
174
405
      opid_req.set_tablet_id(tablet_id);
175
405
      opid_req.set_dest_uuid(ts->uuid());
176
405
      opid_req.set_opid_type(opid_type);
177
405
      if (op_type != consensus::OperationType::UNKNOWN_OP) {
178
3
        opid_req.set_op_type(op_type);
179
3
      }
180
405
      RETURN_NOT_OK(ts->consensus_proxy->GetLastOpId(opid_req, &opid_resp, controller));
181
379
      return OpId::FromPB(opid_resp.opid());
182
405
    }
183
188
  };
184
185
188
  return GetForEachReplica(
186
188
      replicas, timeout,
187
188
      Getter{.tablet_id = tablet_id, .opid_type = opid_type, .op_type = op_type});
188
188
}
189
190
169
vector<TServerDetails*> TServerDetailsVector(const TabletServerMap& tablet_servers) {
191
169
  vector<TServerDetails*> result;
192
169
  result.reserve(tablet_servers.size());
193
519
  for (auto& pair : tablet_servers) {
194
519
    result.push_back(pair.second.get());
195
519
  }
196
169
  return result;
197
169
}
198
199
53
vector<TServerDetails*> TServerDetailsVector(const TabletServerMapUnowned& tablet_servers) {
200
53
  vector<TServerDetails*> result;
201
53
  result.reserve(tablet_servers.size());
202
118
  for (auto& pair : tablet_servers) {
203
118
    result.push_back(pair.second);
204
118
  }
205
53
  return result;
206
53
}
207
208
TabletServerMapUnowned CreateTabletServerMapUnowned(const TabletServerMap& tablet_servers,
209
13
                                                    const std::set<std::string>& exclude) {
210
13
  TabletServerMapUnowned result;
211
39
  for (auto& pair : tablet_servers) {
212
39
    if (exclude.find(pair.first) != exclude.end()) {
213
0
      continue;
214
0
    }
215
39
    result.emplace(pair.first, pair.second.get());
216
39
  }
217
13
  return result;
218
13
}
219
220
Status WaitForOpFromCurrentTerm(TServerDetails* replica,
221
                                const string& tablet_id,
222
                                consensus::OpIdType opid_type,
223
                                const MonoDelta& timeout,
224
7
                                OpId* opid) {
225
7
  const MonoTime kStart = MonoTime::Now();
226
7
  const MonoTime kDeadline = kStart + timeout;
227
228
7
  Status s;
229
31
  while (MonoTime::Now() < kDeadline) {
230
31
    ConsensusStatePB cstate;
231
31
    s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_ACTIVE, kDeadline - MonoTime::Now(),
232
31
                          &cstate);
233
31
    if (s.ok()) {
234
7
      Result<OpId> tmp_opid =
235
7
          GetLastOpIdForReplica(tablet_id, replica, opid_type, kDeadline - MonoTime::Now());
236
7
      if (tmp_opid) {
237
7
        if (tmp_opid->term == cstate.current_term()) {
238
7
          if (opid) {
239
1
            *opid = *tmp_opid;
240
1
          }
241
7
          return Status::OK();
242
7
        }
243
0
        s = STATUS(IllegalState, Substitute("Terms don't match. Current term: $0. Latest OpId: $1",
244
0
                                 cstate.current_term(), tmp_opid->ToString()));
245
0
      }
246
7
    }
247
24
    SleepFor(MonoDelta::FromMilliseconds(10));
248
24
  }
249
250
0
  return STATUS(TimedOut, Substitute("Timed out after $0 waiting for op from current term: $1",
251
7
                                     (MonoTime::Now() - kStart).ToString(),
252
7
                                     s.ToString()));
253
7
}
254
255
Status WaitForServersToAgree(const MonoDelta& timeout,
256
                             const TabletServerMap& tablet_servers,
257
                             const string& tablet_id,
258
                             int64_t minimum_index,
259
                             int64_t* actual_index,
260
28
                             MustBeCommitted must_be_committed) {
261
28
  return WaitForServersToAgree(timeout,
262
28
                               TServerDetailsVector(tablet_servers),
263
28
                               tablet_id,
264
28
                               minimum_index,
265
28
                               actual_index,
266
28
                               must_be_committed);
267
28
}
268
269
Status WaitForServersToAgree(const MonoDelta& timeout,
270
                             const TabletServerMapUnowned& tablet_servers,
271
                             const TabletId& tablet_id,
272
                             int64_t minimum_index,
273
                             int64_t* actual_index,
274
41
                             MustBeCommitted must_be_committed) {
275
41
  return WaitForServersToAgree(
276
41
      timeout, TServerDetailsVector(tablet_servers), tablet_id, minimum_index, actual_index,
277
41
      must_be_committed);
278
41
}
279
280
Status WaitForServersToAgree(const MonoDelta& timeout,
281
                             const vector<TServerDetails*>& servers,
282
                             const string& tablet_id,
283
                             int64_t minimum_index,
284
                             int64_t* actual_index,
285
69
                             MustBeCommitted must_be_committed) {
286
69
  auto deadline = CoarseMonoClock::Now() + timeout;
287
69
  if (actual_index != nullptr) {
288
2
    *actual_index = 0;
289
2
  }
290
291
69
  vector<consensus::OpIdType> opid_types{consensus::OpIdType::RECEIVED_OPID};
292
69
  if (must_be_committed) {
293
    // In this mode we require that last received and committed op ids from all servers converge
294
    // on the same value.
295
1
    opid_types.push_back(consensus::OpIdType::COMMITTED_OPID);
296
1
  }
297
298
69
  Status last_non_ok_status;
299
69
  vector<OpId> received_ids;
300
69
  vector<OpId> committed_ids;
301
302
142
  for (int attempt = 1; CoarseMonoClock::Now() < deadline; 
attempt++73
) {
303
140
    vector<OpId> ids;
304
305
140
    Status s;
306
143
    for (auto opid_type : opid_types) {
307
143
      auto ids_of_this_type = GetLastOpIdForEachReplica(tablet_id, servers, opid_type, timeout);
308
143
      if (!ids_of_this_type.ok()) {
309
26
        s = ids_of_this_type.status();
310
26
        break;
311
26
      }
312
117
      if (opid_type == consensus::OpIdType::RECEIVED_OPID) {
313
115
        received_ids = *ids_of_this_type;
314
115
      } else {
315
2
        committed_ids = *ids_of_this_type;
316
2
      }
317
117
      std::copy(ids_of_this_type->begin(), ids_of_this_type->end(), std::back_inserter(ids));
318
117
    }
319
320
140
    if (s.ok()) {
321
114
      int64_t cur_index = kInvalidOpIdIndex;
322
114
      bool any_behind = false;
323
114
      bool any_disagree = false;
324
234
      for (const OpId& id : ids) {
325
234
        if (cur_index == kInvalidOpIdIndex) {
326
114
          cur_index = id.index;
327
114
        }
328
234
        if (id.index != cur_index) {
329
22
          any_disagree = true;
330
22
          break;
331
22
        }
332
212
        if (id.index < minimum_index) {
333
25
          any_behind = true;
334
25
          break;
335
25
        }
336
212
      }
337
114
      if (!any_behind && 
!any_disagree89
) {
338
67
        LOG(INFO) << "All servers converged on OpIds: " << ids;
339
67
        if (actual_index != nullptr) {
340
2
          *actual_index = cur_index;
341
2
        }
342
67
        return Status::OK();
343
67
      }
344
114
    } else {
345
26
      LOG(WARNING) << "Got error getting last opid for each replica: " << s.ToString();
346
26
      last_non_ok_status = s;
347
26
    }
348
349
73
    LOG(INFO) << "Not converged past " << minimum_index << " yet: " << ids;
350
73
    SleepFor(MonoDelta::FromMilliseconds(min(attempt * 100, 1000)));
351
73
  }
352
2
  return STATUS_FORMAT(
353
69
      TimedOut,
354
69
      "All replicas of tablet $0 could not converge on an index of at least $1 after $2. "
355
69
      "must_be_committed=$3. Latest received ids: $3, committed ids: $4",
356
69
      tablet_id, minimum_index, timeout, must_be_committed, received_ids, committed_ids);
357
69
}
358
359
// Wait until all specified replicas have logged the given index.
360
Status WaitUntilAllReplicasHaveOp(const int64_t log_index,
361
                                  const string& tablet_id,
362
                                  const vector<TServerDetails*>& replicas,
363
                                  const MonoDelta& timeout,
364
1
                                  int64_t* actual_minimum_index) {
365
1
  MonoTime start = MonoTime::Now();
366
1
  MonoDelta passed = MonoDelta::FromMilliseconds(0);
367
3
  while (true) {
368
3
    auto op_ids = GetLastOpIdForEachReplica(tablet_id, replicas, consensus::RECEIVED_OPID, timeout);
369
3
    if (op_ids.ok()) {
370
3
      if (actual_minimum_index != nullptr) {
371
3
        *actual_minimum_index = std::numeric_limits<int64_t>::max();
372
3
      }
373
374
3
      bool any_behind = false;
375
9
      for (const OpId& op_id : *op_ids) {
376
9
        if (actual_minimum_index != nullptr) {
377
9
          *actual_minimum_index = std::min(*actual_minimum_index, op_id.index);
378
9
        }
379
380
9
        if (op_id.index < log_index) {
381
2
          any_behind = true;
382
2
          break;
383
2
        }
384
9
      }
385
3
      if (!any_behind) 
return Status::OK()1
;
386
3
    } else {
387
0
      LOG(WARNING) << "Got error getting last opid for each replica: " << op_ids.ToString();
388
0
    }
389
2
    passed = MonoTime::Now().GetDeltaSince(start);
390
2
    if (passed.MoreThan(timeout)) {
391
0
      break;
392
0
    }
393
2
    SleepFor(MonoDelta::FromMilliseconds(50));
394
2
  }
395
0
  string replicas_str;
396
0
  for (const TServerDetails* replica : replicas) {
397
0
    if (!replicas_str.empty()) replicas_str += ", ";
398
0
    replicas_str += "{ " + replica->ToString() + " }";
399
0
  }
400
0
  return STATUS(TimedOut, Substitute("Index $0 not available on all replicas after $1. "
401
1
                                              "Replicas: [ $2 ]",
402
1
                                              log_index, passed.ToString()));
403
1
}
404
405
Status WaitUntilNumberOfAliveTServersEqual(int n_tservers,
406
                                           const master::MasterClusterProxy& master_proxy,
407
1
                                           const MonoDelta& timeout) {
408
409
1
  master::ListTabletServersRequestPB req;
410
1
  master::ListTabletServersResponsePB resp;
411
1
  rpc::RpcController controller;
412
1
  controller.set_timeout(timeout);
413
414
  // The field primary_only means only tservers that are alive (tservers that have sent at least on
415
  // heartbeat in the last FLAG_tserver_unresponsive_timeout_ms milliseconds.)
416
1
  req.set_primary_only(true);
417
418
1
  MonoTime start = MonoTime::Now();
419
1
  MonoDelta passed = MonoDelta::FromMilliseconds(0);
420
87
  while (true) {
421
87
    Status s = master_proxy.ListTabletServers(req, &resp, &controller);
422
423
87
    if (s.ok() &&
424
87
        controller.status().ok() &&
425
87
        !resp.has_error()) {
426
87
      if (resp.servers_size() == n_tservers) {
427
1
        passed = MonoTime::Now().GetDeltaSince(start);
428
1
        return Status::OK();
429
1
      }
430
87
    } else {
431
0
      string error;
432
0
      if (!s.ok()) {
433
0
        error = s.ToString();
434
0
      } else if (!controller.status().ok()) {
435
0
        error = controller.status().ToString();
436
0
      } else {
437
0
        error = resp.error().ShortDebugString();
438
0
      }
439
0
      LOG(WARNING) << "Got error getting list of tablet servers: " << error;
440
0
    }
441
86
    passed = MonoTime::Now().GetDeltaSince(start);
442
86
    if (passed.MoreThan(timeout)) {
443
0
      break;
444
0
    }
445
86
    SleepFor(MonoDelta::FromMilliseconds(50));
446
86
    controller.Reset();
447
86
  }
448
0
  return STATUS(TimedOut, Substitute("Number of alive tservers not equal to $0 after $1 ms. ",
449
1
                                     n_tservers, timeout.ToMilliseconds()));
450
1
}
451
452
223
Result<TabletServerMap> CreateTabletServerMap(ExternalMiniCluster* cluster) {
453
223
  auto proxy = cluster->num_masters() > 1
454
223
      ? 
cluster->GetLeaderMasterProxy<master::MasterClusterProxy>()50
455
223
      : 
cluster->GetMasterProxy<master::MasterClusterProxy>()173
;
456
223
  return CreateTabletServerMap(proxy, &cluster->proxy_cache());
457
223
}
458
459
Result<TabletServerMap> CreateTabletServerMap(
460
223
    const master::MasterClusterProxy& proxy, rpc::ProxyCache* proxy_cache) {
461
223
  master::ListTabletServersRequestPB req;
462
223
  master::ListTabletServersResponsePB resp;
463
223
  rpc::RpcController controller;
464
465
223
  RETURN_NOT_OK(proxy.ListTabletServers(req, &resp, &controller));
466
223
  RETURN_NOT_OK(controller.status());
467
223
  if (resp.has_error()) {
468
0
    return STATUS(RemoteError, "Response had an error", resp.error().ShortDebugString());
469
0
  }
470
471
223
  TabletServerMap result;
472
689
  for (const ListTabletServersResponsePB::Entry& entry : resp.servers()) {
473
689
    HostPort host_port = HostPortFromPB(DesiredHostPort(
474
689
        entry.registration().common(), CloudInfoPB()));
475
476
689
    std::unique_ptr<TServerDetails> peer(new TServerDetails());
477
689
    peer->instance_id.CopyFrom(entry.instance_id());
478
689
    peer->registration->CopyFrom(entry.registration());
479
480
689
    CreateTsClientProxies(host_port,
481
689
                          proxy_cache,
482
689
                          &peer->tserver_proxy,
483
689
                          &peer->tserver_admin_proxy,
484
689
                          &peer->consensus_proxy,
485
689
                          &peer->generic_proxy);
486
487
689
    const auto& key = peer->instance_id.permanent_uuid();
488
689
    CHECK
(result.emplace(key, std::move(peer)).second) << "duplicate key: " << key0
;
489
689
  }
490
223
  return result;
491
223
}
492
493
Status GetConsensusState(const TServerDetails* replica,
494
                         const string& tablet_id,
495
                         consensus::ConsensusConfigType type,
496
                         const MonoDelta& timeout,
497
                         ConsensusStatePB* consensus_state,
498
3.80k
                         LeaderLeaseStatus* leader_lease_status) {
499
3.80k
  DCHECK_ONLY_NOTNULL(replica);
500
501
3.80k
  GetConsensusStateRequestPB req;
502
3.80k
  GetConsensusStateResponsePB resp;
503
3.80k
  RpcController controller;
504
3.80k
  controller.set_timeout(timeout);
505
3.80k
  req.set_dest_uuid(replica->uuid());
506
3.80k
  req.set_tablet_id(tablet_id);
507
3.80k
  req.set_type(type);
508
509
3.80k
  RETURN_NOT_OK(replica->consensus_proxy->GetConsensusState(req, &resp, &controller));
510
3.66k
  if (resp.has_error()) {
511
181
    return StatusFromPB(resp.error().status());
512
181
  }
513
3.48k
  *consensus_state = resp.cstate();
514
3.48k
  if (leader_lease_status) {
515
2.72k
    *leader_lease_status = resp.has_leader_lease_status() ?
516
2.72k
        resp.leader_lease_status() :
517
2.72k
        
LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE0
; // Could be anything but HAS_LEASE.
518
2.72k
  }
519
3.48k
  return Status::OK();
520
3.66k
}
521
522
Status WaitUntilCommittedConfigNumVotersIs(size_t config_size,
523
                                           const TServerDetails* replica,
524
                                           const std::string& tablet_id,
525
122
                                           const MonoDelta& timeout) {
526
122
  return WaitUntilCommittedConfigMemberTypeIs(config_size, replica, tablet_id, timeout,
527
122
                                              consensus::PeerMemberType::VOTER);
528
122
}
529
530
Status WaitUntilCommittedConfigMemberTypeIs(size_t config_size,
531
                                            const TServerDetails* replica,
532
                                            const std::string& tablet_id,
533
                                            const MonoDelta& timeout,
534
128
                                            consensus::PeerMemberType member_type) {
535
128
  DCHECK_ONLY_NOTNULL(replica);
536
537
128
  MonoTime start = MonoTime::Now();
538
128
  MonoTime deadline = start + timeout;
539
540
128
  int backoff_exp = 0;
541
128
  const int kMaxBackoffExp = 7;
542
128
  Status s;
543
128
  ConsensusStatePB cstate;
544
919
  while (true) {
545
919
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
546
919
    s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED,
547
919
                          remaining_timeout, &cstate);
548
919
    if (s.ok()) {
549
744
      if (CountMemberType(cstate.config(), member_type) == config_size) {
550
128
        return Status::OK();
551
128
      }
552
616
      LOG(INFO) << "Got " << yb::ToString(cstate) << " from " << replica->ToString();
553
616
    } else {
554
175
      LOG(INFO) << "Got " << s.ToString() << " from " << replica->ToString();
555
175
    }
556
557
791
    if (MonoTime::Now().GetDeltaSince(start).MoreThan(timeout)) {
558
0
      break;
559
0
    }
560
791
    SleepFor(MonoDelta::FromMilliseconds(1LLU << backoff_exp));
561
791
    backoff_exp = min(backoff_exp + 1, kMaxBackoffExp);
562
791
  }
563
0
  return STATUS(TimedOut, Substitute("Number of replicas of type $0 does not equal $1 after "
564
128
                                     "waiting for $2. Last consensus state: $3. Last status: $4",
565
128
                                     PeerMemberType_Name(member_type), config_size,
566
128
                                     timeout.ToString(), cstate.ShortDebugString(), s.ToString()));
567
128
}
568
569
template<class Context>
570
Status WaitUntilCommittedOpIdIndex(TServerDetails* replica,
571
                                   const string& tablet_id,
572
                                   const MonoDelta& timeout,
573
                                   CommittedEntryType type,
574
17
                                   Context context) {
575
17
  MonoTime start = MonoTime::Now();
576
17
  MonoTime deadline = start;
577
17
  deadline.AddDelta(timeout);
578
579
17
  bool config = type == CommittedEntryType::CONFIG;
580
17
  Status s;
581
17
  OpId op_id;
582
17
  ConsensusStatePB cstate;
583
18
  while (true) {
584
18
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
585
586
18
    int64_t op_index = -1;
587
18
    if (config) {
588
0
      s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED,
589
0
          remaining_timeout, &cstate);
590
0
      if (s.ok()) {
591
0
        op_index = cstate.config().opid_index();
592
0
      }
593
18
    } else {
594
18
      auto last_op_id_result = GetLastOpIdForReplica(
595
18
          tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout);
596
18
      if (last_op_id_result.ok()) {
597
18
        op_id = *last_op_id_result;
598
18
        op_index = op_id.index;
599
18
      } else {
600
0
        s = last_op_id_result.status();
601
0
      }
602
18
    }
603
604
18
    if (s.ok() && context.Check(op_index)) {
605
17
      if (config) {
606
0
        LOG(INFO) << "Committed config state is: " << cstate.ShortDebugString() << " for replica: "
607
0
                  << replica->instance_id.permanent_uuid();
608
17
      } else {
609
17
        LOG(INFO) << "Committed op_id index is: " << op_id << " for replica: "
610
17
                  << replica->instance_id.permanent_uuid();
611
17
      }
612
17
      return Status::OK();
613
17
    }
614
1
    auto passed = MonoTime::Now().GetDeltaSince(start);
615
1
    if (passed.MoreThan(timeout)) {
616
0
      auto name = config ? "config" : "consensus";
617
0
      auto last_value = config ? cstate.ShortDebugString() : AsString(op_id);
618
0
      return STATUS(TimedOut,
619
0
                    Substitute("Committed $0 opid_index does not equal $1 "
620
0
                               "after waiting for $2. Last value: $3, Last status: $4",
621
0
                               name,
622
0
                               context.Desired(),
623
0
                               passed.ToString(),
624
0
                               last_value,
625
0
                               s.ToString()));
626
0
    }
627
1
    if (!config) {
628
1
      LOG(INFO) << "Committed index is at: " << op_id.index << " and not yet at "
629
1
                << context.Desired();
630
1
    }
631
1
    SleepFor(MonoDelta::FromMilliseconds(100));
632
1
  }
633
17
}
yb::Status yb::itest::WaitUntilCommittedOpIdIndex<yb::itest::WaitUntilCommittedOpIdIndexIsContext>(yb::itest::TServerDetails*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::MonoDelta const&, yb::itest::CommittedEntryType, yb::itest::WaitUntilCommittedOpIdIndexIsContext)
Line
Count
Source
574
16
                                   Context context) {
575
16
  MonoTime start = MonoTime::Now();
576
16
  MonoTime deadline = start;
577
16
  deadline.AddDelta(timeout);
578
579
16
  bool config = type == CommittedEntryType::CONFIG;
580
16
  Status s;
581
16
  OpId op_id;
582
16
  ConsensusStatePB cstate;
583
17
  while (true) {
584
17
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
585
586
17
    int64_t op_index = -1;
587
17
    if (config) {
588
0
      s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED,
589
0
          remaining_timeout, &cstate);
590
0
      if (s.ok()) {
591
0
        op_index = cstate.config().opid_index();
592
0
      }
593
17
    } else {
594
17
      auto last_op_id_result = GetLastOpIdForReplica(
595
17
          tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout);
596
17
      if (last_op_id_result.ok()) {
597
17
        op_id = *last_op_id_result;
598
17
        op_index = op_id.index;
599
17
      } else {
600
0
        s = last_op_id_result.status();
601
0
      }
602
17
    }
603
604
17
    if (s.ok() && context.Check(op_index)) {
605
16
      if (config) {
606
0
        LOG(INFO) << "Committed config state is: " << cstate.ShortDebugString() << " for replica: "
607
0
                  << replica->instance_id.permanent_uuid();
608
16
      } else {
609
16
        LOG(INFO) << "Committed op_id index is: " << op_id << " for replica: "
610
16
                  << replica->instance_id.permanent_uuid();
611
16
      }
612
16
      return Status::OK();
613
16
    }
614
1
    auto passed = MonoTime::Now().GetDeltaSince(start);
615
1
    if (passed.MoreThan(timeout)) {
616
0
      auto name = config ? "config" : "consensus";
617
0
      auto last_value = config ? cstate.ShortDebugString() : AsString(op_id);
618
0
      return STATUS(TimedOut,
619
0
                    Substitute("Committed $0 opid_index does not equal $1 "
620
0
                               "after waiting for $2. Last value: $3, Last status: $4",
621
0
                               name,
622
0
                               context.Desired(),
623
0
                               passed.ToString(),
624
0
                               last_value,
625
0
                               s.ToString()));
626
0
    }
627
1
    if (!config) {
628
1
      LOG(INFO) << "Committed index is at: " << op_id.index << " and not yet at "
629
1
                << context.Desired();
630
1
    }
631
1
    SleepFor(MonoDelta::FromMilliseconds(100));
632
1
  }
633
16
}
yb::Status yb::itest::WaitUntilCommittedOpIdIndex<yb::itest::WaitUntilCommittedOpIdIndexIsGreaterThanContext>(yb::itest::TServerDetails*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::MonoDelta const&, yb::itest::CommittedEntryType, yb::itest::WaitUntilCommittedOpIdIndexIsGreaterThanContext)
Line
Count
Source
574
1
                                   Context context) {
575
1
  MonoTime start = MonoTime::Now();
576
1
  MonoTime deadline = start;
577
1
  deadline.AddDelta(timeout);
578
579
1
  bool config = type == CommittedEntryType::CONFIG;
580
1
  Status s;
581
1
  OpId op_id;
582
1
  ConsensusStatePB cstate;
583
1
  while (true) {
584
1
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
585
586
1
    int64_t op_index = -1;
587
1
    if (config) {
588
0
      s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED,
589
0
          remaining_timeout, &cstate);
590
0
      if (s.ok()) {
591
0
        op_index = cstate.config().opid_index();
592
0
      }
593
1
    } else {
594
1
      auto last_op_id_result = GetLastOpIdForReplica(
595
1
          tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout);
596
1
      if (last_op_id_result.ok()) {
597
1
        op_id = *last_op_id_result;
598
1
        op_index = op_id.index;
599
1
      } else {
600
0
        s = last_op_id_result.status();
601
0
      }
602
1
    }
603
604
1
    if (s.ok() && context.Check(op_index)) {
605
1
      if (config) {
606
0
        LOG(INFO) << "Committed config state is: " << cstate.ShortDebugString() << " for replica: "
607
0
                  << replica->instance_id.permanent_uuid();
608
1
      } else {
609
1
        LOG(INFO) << "Committed op_id index is: " << op_id << " for replica: "
610
1
                  << replica->instance_id.permanent_uuid();
611
1
      }
612
1
      return Status::OK();
613
1
    }
614
0
    auto passed = MonoTime::Now().GetDeltaSince(start);
615
0
    if (passed.MoreThan(timeout)) {
616
0
      auto name = config ? "config" : "consensus";
617
0
      auto last_value = config ? cstate.ShortDebugString() : AsString(op_id);
618
0
      return STATUS(TimedOut,
619
0
                    Substitute("Committed $0 opid_index does not equal $1 "
620
0
                               "after waiting for $2. Last value: $3, Last status: $4",
621
0
                               name,
622
0
                               context.Desired(),
623
0
                               passed.ToString(),
624
0
                               last_value,
625
0
                               s.ToString()));
626
0
    }
627
0
    if (!config) {
628
0
      LOG(INFO) << "Committed index is at: " << op_id.index << " and not yet at "
629
0
                << context.Desired();
630
0
    }
631
0
    SleepFor(MonoDelta::FromMilliseconds(100));
632
0
  }
633
1
}
634
635
class WaitUntilCommittedOpIdIndexContext {
636
 public:
637
  explicit WaitUntilCommittedOpIdIndexContext(std::string desired)
638
17
      : desired_(std::move(desired)) {
639
17
  }
640
641
1
  const string& Desired() const {
642
1
    return desired_;
643
1
  }
644
 private:
645
  string desired_;
646
};
647
648
class WaitUntilCommittedOpIdIndexIsContext : public WaitUntilCommittedOpIdIndexContext {
649
 public:
650
  explicit WaitUntilCommittedOpIdIndexIsContext(int64_t value)
651
      : WaitUntilCommittedOpIdIndexContext(Substitute("equal $0", value)),
652
16
        value_(value) {
653
16
  }
654
655
17
  bool Check(int64_t current) {
656
17
    return value_ == current;
657
17
  }
658
 private:
659
  int64_t value_;
660
};
661
662
Status WaitForReplicasReportedToMaster(
663
    ExternalMiniCluster* cluster,
664
    int num_replicas, const string& tablet_id,
665
    const MonoDelta& timeout,
666
    WaitForLeader wait_for_leader,
667
    bool* has_leader,
668
12
    master::TabletLocationsPB* tablet_locations) {
669
12
  MonoTime deadline(MonoTime::Now() + timeout);
670
12
  while (true) {
671
12
    RETURN_NOT_OK(GetTabletLocations(cluster, tablet_id, timeout, tablet_locations));
672
12
    *has_leader = false;
673
12
    if (tablet_locations->replicas_size() == num_replicas) {
674
12
      for (const master::TabletLocationsPB_ReplicaPB& replica :
675
36
                    tablet_locations->replicas()) {
676
36
        if (replica.role() == PeerRole::LEADER) {
677
12
          *has_leader = true;
678
12
        }
679
36
      }
680
12
      if (wait_for_leader == DONT_WAIT_FOR_LEADER ||
681
12
          (wait_for_leader == WAIT_FOR_LEADER && *has_leader)) {
682
12
        break;
683
12
      }
684
12
    }
685
0
    if (deadline < MonoTime::Now()) {
686
0
      return STATUS(TimedOut, Substitute("Timed out after waiting "
687
0
          "for tablet $1 expected to report master with $2 replicas, has_leader: $3",
688
0
          tablet_id, num_replicas, *has_leader));
689
0
    }
690
0
    SleepFor(MonoDelta::FromMilliseconds(20));
691
0
  }
692
12
  if (num_replicas != tablet_locations->replicas_size()) {
693
0
      return STATUS(NotFound, Substitute("Number of replicas for tablet $0 "
694
0
          "reported to master $1:$2",
695
0
          tablet_id, tablet_locations->replicas_size(),
696
0
          yb::ToString(*tablet_locations)));
697
0
  }
698
12
  if (wait_for_leader == WAIT_FOR_LEADER && !(*has_leader)) {
699
0
    return STATUS(NotFound, Substitute("Leader for tablet $0 not found on master, "
700
0
                                       "number of replicas $1:$2",
701
0
                                       tablet_id, tablet_locations->replicas_size(),
702
0
                                       yb::ToString(*tablet_locations)));
703
0
  }
704
12
  return Status::OK();
705
12
}
706
707
Status WaitUntilCommittedOpIdIndexIs(int64_t opid_index,
708
                                     TServerDetails* replica,
709
                                     const string& tablet_id,
710
                                     const MonoDelta& timeout,
711
16
                                     CommittedEntryType type) {
712
16
  return WaitUntilCommittedOpIdIndex(
713
16
      replica,
714
16
      tablet_id,
715
16
      timeout,
716
16
      type,
717
16
      WaitUntilCommittedOpIdIndexIsContext(opid_index));
718
16
}
719
720
class WaitUntilCommittedOpIdIndexIsGreaterThanContext : public WaitUntilCommittedOpIdIndexContext {
721
 public:
722
  explicit WaitUntilCommittedOpIdIndexIsGreaterThanContext(int64_t* value)
723
      : WaitUntilCommittedOpIdIndexContext(Substitute("greater than $0", *value)),
724
1
        original_value_(*value), value_(value) {
725
726
1
  }
727
728
1
  bool Check(int64_t current) {
729
1
    if (current > *value_) {
730
1
      CHECK_EQ(*value_, original_value_);
731
1
      *value_ = current;
732
1
      return true;
733
1
    }
734
0
    return false;
735
1
  }
736
 private:
737
  int64_t original_value_;
738
  int64_t* const value_;
739
};
740
741
Status WaitUntilCommittedOpIdIndexIsGreaterThan(int64_t* index,
742
                                                TServerDetails* replica,
743
                                                const TabletId& tablet_id,
744
                                                const MonoDelta& timeout,
745
1
                                                CommittedEntryType type) {
746
1
  return WaitUntilCommittedOpIdIndex(
747
1
      replica,
748
1
      tablet_id,
749
1
      timeout,
750
1
      type,
751
1
      WaitUntilCommittedOpIdIndexIsGreaterThanContext(index));
752
1
}
753
754
Status WaitUntilCommittedOpIdIndexIsAtLeast(int64_t* index,
755
                                            TServerDetails* replica,
756
                                            const TabletId& tablet_id,
757
                                            const MonoDelta& timeout,
758
1
                                            CommittedEntryType type) {
759
1
  int64_t tmp_index = *index - 1;
760
1
  Status s = WaitUntilCommittedOpIdIndexIsGreaterThan(
761
1
      &tmp_index,
762
1
      replica,
763
1
      tablet_id,
764
1
      timeout,
765
1
      type);
766
1
  *index = tmp_index;
767
1
  return s;
768
1
}
769
770
Status GetReplicaStatusAndCheckIfLeader(const TServerDetails* replica,
771
                                        const string& tablet_id,
772
                                        const MonoDelta& timeout,
773
2.84k
                                        LeaderLeaseCheckMode lease_check_mode) {
774
2.84k
  ConsensusStatePB cstate;
775
2.84k
  LeaderLeaseStatus leader_lease_status;
776
2.84k
  Status s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_ACTIVE,
777
2.84k
                               timeout, &cstate, &leader_lease_status);
778
2.84k
  if (PREDICT_FALSE(!s.ok())) {
779
118
    VLOG(1) << "Error getting consensus state from replica: "
780
0
            << replica->instance_id.permanent_uuid();
781
118
    return STATUS(NotFound, "Error connecting to replica", s.ToString());
782
118
  }
783
2.72k
  const string& replica_uuid = replica->instance_id.permanent_uuid();
784
2.72k
  if (cstate.has_leader_uuid() && cstate.leader_uuid() == replica_uuid &&
785
2.72k
      
(338
lease_check_mode == LeaderLeaseCheckMode::DONT_NEED_LEASE338
||
786
338
       
leader_lease_status == consensus::LeaderLeaseStatus::HAS_LEASE337
)) {
787
173
    return Status::OK();
788
173
  }
789
2.55k
  VLOG
(1) << "Replica not leader of config: " << replica->instance_id.permanent_uuid()0
;
790
2.55k
  return STATUS_FORMAT(IllegalState,
791
2.72k
      "Replica found but not leader; lease check mode: $0", lease_check_mode);
792
2.72k
}
793
794
Status WaitUntilLeader(const TServerDetails* replica,
795
                       const string& tablet_id,
796
                       const MonoDelta& timeout,
797
25
                       const LeaderLeaseCheckMode lease_check_mode) {
798
25
  MonoTime start = MonoTime::Now();
799
25
  MonoTime deadline = start;
800
25
  deadline.AddDelta(timeout);
801
802
25
  int backoff_exp = 0;
803
25
  const int kMaxBackoffExp = 7;
804
25
  Status s;
805
280
  while (true) {
806
280
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
807
280
    s = GetReplicaStatusAndCheckIfLeader(replica, tablet_id, remaining_timeout,
808
280
                                         lease_check_mode);
809
280
    if (s.ok()) {
810
17
      return Status::OK();
811
17
    }
812
813
263
    if (MonoTime::Now().GetDeltaSince(start).MoreThan(timeout)) {
814
8
      break;
815
8
    }
816
255
    SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
817
255
    backoff_exp = min(backoff_exp + 1, kMaxBackoffExp);
818
255
  }
819
8
  return STATUS(TimedOut, Substitute("Replica $0 is not leader after waiting for $1: $2",
820
25
                                     replica->ToString(), timeout.ToString(), s.ToString()));
821
25
}
822
823
Status FindTabletLeader(const TabletServerMap& tablet_servers,
824
                        const string& tablet_id,
825
                        const MonoDelta& timeout,
826
112
                        TServerDetails** leader) {
827
112
  return FindTabletLeader(TServerDetailsVector(tablet_servers), tablet_id, timeout, leader);
828
112
}
829
830
Status FindTabletLeader(const TabletServerMapUnowned& tablet_servers,
831
                        const string& tablet_id,
832
                        const MonoDelta& timeout,
833
12
                        TServerDetails** leader) {
834
12
  return FindTabletLeader(TServerDetailsVector(tablet_servers), tablet_id, timeout, leader);
835
12
}
836
837
Status FindTabletLeader(const vector<TServerDetails*>& tservers,
838
                        const string& tablet_id,
839
                        const MonoDelta& timeout,
840
124
                        TServerDetails** leader) {
841
124
  MonoTime start = MonoTime::Now();
842
124
  MonoTime deadline = start;
843
124
  deadline.AddDelta(timeout);
844
124
  Status s;
845
124
  int i = 0;
846
2.30k
  while (true) {
847
2.30k
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
848
2.30k
    s = GetReplicaStatusAndCheckIfLeader(tservers[i], tablet_id, remaining_timeout);
849
2.30k
    if (s.ok()) {
850
123
      *leader = tservers[i];
851
123
      return Status::OK();
852
123
    }
853
854
2.18k
    if (deadline.ComesBefore(MonoTime::Now())) 
break1
;
855
2.18k
    i = (i + 1) % tservers.size();
856
2.18k
    if (i == 0) {
857
685
      SleepFor(MonoDelta::FromMilliseconds(10));
858
685
    }
859
2.18k
  }
860
1
  return STATUS(TimedOut, Substitute("Unable to find leader of tablet $0 after $1. "
861
124
                                     "Status message: $2", tablet_id,
862
124
                                     MonoTime::Now().GetDeltaSince(start).ToString(),
863
124
                                     s.ToString()));
864
124
}
865
866
Status FindTabletFollowers(const TabletServerMapUnowned& tablet_servers,
867
                           const string& tablet_id,
868
                           const MonoDelta& timeout,
869
6
                           vector<TServerDetails*>* followers) {
870
6
  TServerDetails* leader;
871
6
  RETURN_NOT_OK(FindTabletLeader(tablet_servers, tablet_id, timeout, &leader));
872
18
  
for (const auto& entry : tablet_servers)6
{
873
18
    TServerDetails* ts = entry.second;
874
18
    if (ts->uuid() != leader->uuid()) {
875
12
      followers->push_back(ts);
876
12
    }
877
18
  }
878
6
  return Status::OK();
879
6
}
880
881
Status StartElection(const TServerDetails* replica,
882
                     const string& tablet_id,
883
                     const MonoDelta& timeout,
884
100
                     consensus::TEST_SuppressVoteRequest suppress_vote_request) {
885
100
  RunLeaderElectionRequestPB req;
886
100
  req.set_dest_uuid(replica->uuid());
887
100
  req.set_tablet_id(tablet_id);
888
100
  req.set_suppress_vote_request(suppress_vote_request);
889
100
  RunLeaderElectionResponsePB resp;
890
100
  RpcController rpc;
891
100
  rpc.set_timeout(timeout);
892
100
  RETURN_NOT_OK(replica->consensus_proxy->RunLeaderElection(req, &resp, &rpc));
893
100
  if (resp.has_error()) {
894
0
    return StatusFromPB(resp.error().status())
895
0
      .CloneAndPrepend(Substitute("Code $0", TabletServerErrorPB::Code_Name(resp.error().code())));
896
0
  }
897
100
  return Status::OK();
898
100
}
899
900
Status RequestVote(const TServerDetails* replica,
901
                   const std::string& tablet_id,
902
                   const std::string& candidate_uuid,
903
                   int64_t candidate_term,
904
                   const OpIdPB& last_logged_opid,
905
                   boost::optional<bool> ignore_live_leader,
906
                   boost::optional<bool> is_pre_election,
907
0
                   const MonoDelta& timeout) {
908
0
  RSTATUS_DCHECK(
909
0
      last_logged_opid.IsInitialized(), Uninitialized, "Last logged op id is uninitialized");
910
0
  consensus::VoteRequestPB req;
911
0
  req.set_dest_uuid(replica->uuid());
912
0
  req.set_tablet_id(tablet_id);
913
0
  req.set_candidate_uuid(candidate_uuid);
914
0
  req.set_candidate_term(candidate_term);
915
0
  *req.mutable_candidate_status()->mutable_last_received() = last_logged_opid;
916
0
  if (ignore_live_leader) req.set_ignore_live_leader(*ignore_live_leader);
917
0
  if (is_pre_election) req.set_preelection(*is_pre_election);
918
0
  consensus::VoteResponsePB resp;
919
0
  RpcController rpc;
920
0
  rpc.set_timeout(timeout);
921
0
  RETURN_NOT_OK(replica->consensus_proxy->RequestConsensusVote(req, &resp, &rpc));
922
0
  if (resp.has_vote_granted() && resp.vote_granted())
923
0
    return Status::OK();
924
0
  if (resp.has_error())
925
0
    return StatusFromPB(resp.error().status());
926
0
  if (resp.has_consensus_error())
927
0
    return StatusFromPB(resp.consensus_error().status());
928
0
  return STATUS(IllegalState, "Unknown error (vote not granted)");
929
0
}
930
931
Status LeaderStepDown(
932
    const TServerDetails* replica,
933
    const string& tablet_id,
934
    const TServerDetails* new_leader,
935
    const MonoDelta& timeout,
936
    const bool disable_graceful_transition,
937
13
    TabletServerErrorPB* error) {
938
13
  LeaderStepDownRequestPB req;
939
13
  req.set_dest_uuid(replica->uuid());
940
13
  req.set_tablet_id(tablet_id);
941
13
  if (disable_graceful_transition) {
942
1
    req.set_disable_graceful_transition(disable_graceful_transition);
943
1
  }
944
13
  if (new_leader) {
945
0
    req.set_new_leader_uuid(new_leader->uuid());
946
0
  }
947
13
  LeaderStepDownResponsePB resp;
948
13
  RpcController rpc;
949
13
  rpc.set_timeout(timeout);
950
13
  RETURN_NOT_OK(replica->consensus_proxy->LeaderStepDown(req, &resp, &rpc));
951
12
  if (resp.has_error()) {
952
1
    if (error != nullptr) {
953
1
      *error = resp.error();
954
1
    }
955
1
    return StatusFromPB(resp.error().status())
956
1
      .CloneAndPrepend(Substitute("Code $0", TabletServerErrorPB::Code_Name(resp.error().code())));
957
1
  }
958
377
  
return WaitFor([&]() -> Result<bool> 11
{
959
377
    rpc.Reset();
960
377
    GetConsensusStateRequestPB state_req;
961
377
    state_req.set_dest_uuid(replica->uuid());
962
377
    state_req.set_tablet_id(tablet_id);
963
377
    GetConsensusStateResponsePB state_resp;
964
377
    RETURN_NOT_OK(replica->consensus_proxy->GetConsensusState(state_req, &state_resp, &rpc));
965
377
    return state_resp.cstate().leader_uuid() != replica->uuid();
966
377
  }, timeout, "Leader change");
967
12
}
968
969
Status WriteSimpleTestRow(const TServerDetails* replica,
970
                          const std::string& tablet_id,
971
                          int32_t key,
972
                          int32_t int_val,
973
                          const string& string_val,
974
13.2k
                          const MonoDelta& timeout) {
975
13.2k
  WriteRequestPB req;
976
13.2k
  WriteResponsePB resp;
977
13.2k
  RpcController rpc;
978
13.2k
  rpc.set_timeout(timeout);
979
980
13.2k
  req.set_tablet_id(tablet_id);
981
982
13.2k
  AddTestRowInsert(key, int_val, string_val, &req);
983
984
13.2k
  RETURN_NOT_OK(replica->tserver_proxy->Write(req, &resp, &rpc));
985
13.2k
  if (resp.has_error()) {
986
8.51k
    return StatusFromPB(resp.error().status());
987
8.51k
  }
988
4.77k
  return Status::OK();
989
13.2k
}
990
991
namespace {
992
  Status SendAddRemoveServerRequest(const TServerDetails* leader,
993
                                    const ChangeConfigRequestPB& req,
994
                                    ChangeConfigResponsePB* resp,
995
                                    RpcController* rpc,
996
                                    const MonoDelta& timeout,
997
                                    TabletServerErrorPB::Code* error_code,
998
40
                                    bool retry) {
999
40
    Status status = Status::OK();
1000
40
    MonoTime start = MonoTime::Now();
1001
40
    do {
1002
40
      RETURN_NOT_OK(leader->consensus_proxy->ChangeConfig(req, resp, rpc));
1003
35
      if (!resp->has_error()) {
1004
28
        break;
1005
28
      }
1006
7
      if (error_code) 
*error_code = resp->error().code()2
;
1007
7
      status = StatusFromPB(resp->error().status());
1008
7
      if (!retry) {
1009
7
        break;
1010
7
      }
1011
0
      if (resp->error().code() != TabletServerErrorPB::LEADER_NOT_READY_CHANGE_CONFIG) {
1012
0
        break;
1013
0
      }
1014
0
      rpc->Reset();
1015
0
    } while (MonoTime::Now().GetDeltaSince(start).LessThan(timeout));
1016
35
    return status;
1017
40
  }
1018
} // namespace
1019
1020
Status AddServer(const TServerDetails* leader,
1021
                 const std::string& tablet_id,
1022
                 const TServerDetails* replica_to_add,
1023
                 consensus::PeerMemberType member_type,
1024
                 const boost::optional<int64_t>& cas_config_opid_index,
1025
                 const MonoDelta& timeout,
1026
                 TabletServerErrorPB::Code* error_code,
1027
15
                 bool retry) {
1028
15
  ChangeConfigRequestPB req;
1029
15
  ChangeConfigResponsePB resp;
1030
15
  RpcController rpc;
1031
15
  rpc.set_timeout(timeout);
1032
1033
15
  req.set_dest_uuid(leader->uuid());
1034
15
  req.set_tablet_id(tablet_id);
1035
15
  req.set_type(consensus::ADD_SERVER);
1036
15
  RaftPeerPB* peer = req.mutable_server();
1037
15
  peer->set_permanent_uuid(replica_to_add->uuid());
1038
15
  peer->set_member_type(member_type);
1039
15
  CopyRegistration(replica_to_add->registration->common(), peer);
1040
15
  if (cas_config_opid_index) {
1041
2
    req.set_cas_config_opid_index(*cas_config_opid_index);
1042
2
  }
1043
1044
15
  return SendAddRemoveServerRequest(leader, req, &resp, &rpc, timeout, error_code, retry);
1045
15
}
1046
1047
Status RemoveServer(const TServerDetails* leader,
1048
                    const std::string& tablet_id,
1049
                    const TServerDetails* replica_to_remove,
1050
                    const boost::optional<int64_t>& cas_config_opid_index,
1051
                    const MonoDelta& timeout,
1052
                    TabletServerErrorPB::Code* error_code,
1053
25
                    bool retry) {
1054
25
  ChangeConfigRequestPB req;
1055
25
  ChangeConfigResponsePB resp;
1056
25
  RpcController rpc;
1057
25
  rpc.set_timeout(timeout);
1058
1059
25
  req.set_dest_uuid(leader->uuid());
1060
25
  req.set_tablet_id(tablet_id);
1061
25
  req.set_type(consensus::REMOVE_SERVER);
1062
25
  if (cas_config_opid_index) {
1063
6
    req.set_cas_config_opid_index(*cas_config_opid_index);
1064
6
  }
1065
25
  RaftPeerPB* peer = req.mutable_server();
1066
25
  peer->set_permanent_uuid(replica_to_remove->uuid());
1067
1068
25
  return SendAddRemoveServerRequest(leader, req, &resp, &rpc, timeout, error_code, retry);
1069
25
}
1070
1071
Status ListTablets(const TServerDetails* ts,
1072
                   const MonoDelta& timeout,
1073
188
                   vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) {
1074
188
  tserver::ListTabletsRequestPB req;
1075
188
  tserver::ListTabletsResponsePB resp;
1076
188
  RpcController rpc;
1077
188
  rpc.set_timeout(timeout);
1078
1079
188
  RETURN_NOT_OK(ts->tserver_proxy->ListTablets(req, &resp, &rpc));
1080
188
  if (resp.has_error()) {
1081
0
    return StatusFromPB(resp.error().status());
1082
0
  }
1083
1084
188
  tablets->assign(resp.status_and_schema().begin(), resp.status_and_schema().end());
1085
188
  return Status::OK();
1086
188
}
1087
1088
Status ListRunningTabletIds(const TServerDetails* ts,
1089
                            const MonoDelta& timeout,
1090
1
                            vector<string>* tablet_ids) {
1091
1
  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
1092
1
  RETURN_NOT_OK(ListTablets(ts, timeout, &tablets));
1093
1
  tablet_ids->clear();
1094
1
  for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
1095
1
    if (t.tablet_status().state() == tablet::RUNNING) {
1096
1
      tablet_ids->push_back(t.tablet_status().tablet_id());
1097
1
    }
1098
1
  }
1099
1
  return Status::OK();
1100
1
}
1101
1102
Status GetTabletLocations(ExternalMiniCluster* cluster,
1103
                          const string& tablet_id,
1104
                          const MonoDelta& timeout,
1105
83
                          master::TabletLocationsPB* tablet_locations) {
1106
83
  master::GetTabletLocationsResponsePB resp;
1107
83
  master::GetTabletLocationsRequestPB req;
1108
83
  *req.add_tablet_ids() = tablet_id;
1109
83
  rpc::RpcController rpc;
1110
83
  rpc.set_timeout(timeout);
1111
83
  RETURN_NOT_OK(cluster->GetMasterProxy<master::MasterClientProxy>().GetTabletLocations(
1112
83
      req, &resp, &rpc));
1113
83
  if (resp.has_error()) {
1114
0
    return StatusFromPB(resp.error().status());
1115
0
  }
1116
83
  if (resp.errors_size() > 0) {
1117
0
    CHECK_EQ(1, resp.errors_size()) << resp.ShortDebugString();
1118
0
    return StatusFromPB(resp.errors(0).status());
1119
0
  }
1120
83
  CHECK_EQ
(1, resp.tablet_locations_size()) << resp.ShortDebugString()0
;
1121
83
  *tablet_locations = resp.tablet_locations(0);
1122
83
  return Status::OK();
1123
83
}
1124
1125
Status GetTableLocations(ExternalMiniCluster* cluster,
1126
                         const YBTableName& table_name,
1127
                         const MonoDelta& timeout,
1128
                         const RequireTabletsRunning require_tablets_running,
1129
118
                         master::GetTableLocationsResponsePB* table_locations) {
1130
118
  master::GetTableLocationsRequestPB req;
1131
118
  table_name.SetIntoTableIdentifierPB(req.mutable_table());
1132
118
  req.set_require_tablets_running(require_tablets_running);
1133
118
  req.set_max_returned_locations(std::numeric_limits<int32_t>::max());
1134
118
  rpc::RpcController rpc;
1135
118
  rpc.set_timeout(timeout);
1136
118
  RETURN_NOT_OK(cluster->GetMasterProxy<master::MasterClientProxy>().GetTableLocations(
1137
118
      req, table_locations, &rpc));
1138
118
  if (table_locations->has_error()) {
1139
0
    return StatusFromPB(table_locations->error().status());
1140
0
  }
1141
118
  return Status::OK();
1142
118
}
1143
1144
Status WaitForNumVotersInConfigOnMaster(
1145
    ExternalMiniCluster* cluster,
1146
    const std::string& tablet_id,
1147
    int num_voters,
1148
0
    const MonoDelta& timeout) {
1149
0
  Status s;
1150
0
  MonoTime deadline = MonoTime::Now();
1151
0
  deadline.AddDelta(timeout);
1152
0
  int num_voters_found = 0;
1153
0
  while (true) {
1154
0
    TabletLocationsPB tablet_locations;
1155
0
    MonoDelta time_remaining = deadline.GetDeltaSince(MonoTime::Now());
1156
0
    s = GetTabletLocations(cluster, tablet_id, time_remaining, &tablet_locations);
1157
0
    if (s.ok()) {
1158
0
      num_voters_found = 0;
1159
0
      for (const TabletLocationsPB::ReplicaPB& r : tablet_locations.replicas()) {
1160
0
        if (r.role() == PeerRole::LEADER || r.role() == PeerRole::FOLLOWER) num_voters_found++;
1161
0
      }
1162
0
      if (num_voters_found == num_voters) break;
1163
0
    }
1164
0
    if (deadline.ComesBefore(MonoTime::Now())) break;
1165
0
    SleepFor(MonoDelta::FromMilliseconds(10));
1166
0
  }
1167
0
  RETURN_NOT_OK(s);
1168
0
  if (num_voters_found != num_voters) {
1169
0
    return STATUS(IllegalState,
1170
0
        Substitute("Did not find exactly $0 voters, found $1 voters",
1171
0
                   num_voters, num_voters_found));
1172
0
  }
1173
0
  return Status::OK();
1174
0
}
1175
1176
Status WaitForNumTabletsOnTS(TServerDetails* ts,
1177
                             size_t count,
1178
                             const MonoDelta& timeout,
1179
8
                             vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) {
1180
8
  Status s;
1181
8
  MonoTime deadline = MonoTime::Now();
1182
8
  deadline.AddDelta(timeout);
1183
8
  while (true) {
1184
8
    s = ListTablets(ts, MonoDelta::FromSeconds(10), tablets);
1185
8
    if (s.ok() && tablets->size() == count) break;
1186
0
    if (deadline.ComesBefore(MonoTime::Now())) break;
1187
0
    SleepFor(MonoDelta::FromMilliseconds(10));
1188
0
  }
1189
8
  RETURN_NOT_OK(s);
1190
8
  if (tablets->size() != count) {
1191
0
    return STATUS(IllegalState,
1192
0
        Substitute("Did not find exactly $0 tablets, found $1 tablets",
1193
0
                   count, tablets->size()));
1194
0
  }
1195
8
  return Status::OK();
1196
8
}
1197
1198
Status WaitUntilTabletInState(TServerDetails* ts,
1199
                              const std::string& tablet_id,
1200
                              tablet::RaftGroupStatePB state,
1201
                              const MonoDelta& timeout,
1202
179
                              const MonoDelta& list_tablets_timeout) {
1203
179
  MonoTime start = MonoTime::Now();
1204
179
  MonoTime deadline = start;
1205
179
  deadline.AddDelta(timeout);
1206
179
  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
1207
179
  Status s;
1208
179
  tablet::RaftGroupStatePB last_state = tablet::UNKNOWN;
1209
179
  while (true) {
1210
179
    s = ListTablets(ts, list_tablets_timeout, &tablets);
1211
179
    if (s.ok()) {
1212
179
      bool seen = false;
1213
1.58k
      for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
1214
1.58k
        if (t.tablet_status().tablet_id() == tablet_id) {
1215
179
          seen = true;
1216
179
          last_state = t.tablet_status().state();
1217
179
          if (last_state == state) {
1218
179
            return Status::OK();
1219
179
          }
1220
179
        }
1221
1.58k
      }
1222
0
      if (!seen) {
1223
0
        s = STATUS(NotFound, "Tablet " + tablet_id + " not found");
1224
0
      }
1225
0
    }
1226
0
    if (deadline.ComesBefore(MonoTime::Now())) {
1227
0
      break;
1228
0
    }
1229
0
    SleepFor(MonoDelta::FromMilliseconds(10));
1230
0
  }
1231
0
  return STATUS(TimedOut, Substitute("T $0 P $1: Tablet not in $2 state after $3: "
1232
179
                                     "Tablet state: $4, Status message: $5",
1233
179
                                     tablet_id, ts->uuid(),
1234
179
                                     tablet::RaftGroupStatePB_Name(state),
1235
179
                                     MonoTime::Now().GetDeltaSince(start).ToString(),
1236
179
                                     tablet::RaftGroupStatePB_Name(last_state), s.ToString()));
1237
179
}
1238
1239
// Wait until the specified tablet is in RUNNING state.
1240
Status WaitUntilTabletRunning(TServerDetails* ts,
1241
                              const std::string& tablet_id,
1242
177
                              const MonoDelta& timeout) {
1243
177
  return WaitUntilTabletInState(ts, tablet_id, tablet::RUNNING, timeout);
1244
177
}
1245
1246
Status DeleteTablet(const TServerDetails* ts,
1247
                    const std::string& tablet_id,
1248
                    const tablet::TabletDataState delete_type,
1249
                    const boost::optional<int64_t>& cas_config_opid_index_less_or_equal,
1250
                    const MonoDelta& timeout,
1251
11
                    tserver::TabletServerErrorPB::Code* error_code) {
1252
11
  DeleteTabletRequestPB req;
1253
11
  DeleteTabletResponsePB resp;
1254
11
  RpcController rpc;
1255
11
  rpc.set_timeout(timeout);
1256
1257
11
  req.set_dest_uuid(ts->uuid());
1258
11
  req.set_tablet_id(tablet_id);
1259
11
  req.set_delete_type(delete_type);
1260
11
  if (cas_config_opid_index_less_or_equal) {
1261
0
    req.set_cas_config_opid_index_less_or_equal(*cas_config_opid_index_less_or_equal);
1262
0
  }
1263
1264
11
  RETURN_NOT_OK(ts->tserver_admin_proxy->DeleteTablet(req, &resp, &rpc));
1265
11
  if (resp.has_error()) {
1266
0
    if (error_code) {
1267
0
      *error_code = resp.error().code();
1268
0
    }
1269
0
    return StatusFromPB(resp.error().status());
1270
0
  }
1271
11
  return Status::OK();
1272
11
}
1273
1274
Status StartRemoteBootstrap(const TServerDetails* ts,
1275
                            const string& tablet_id,
1276
                            const string& bootstrap_source_uuid,
1277
                            const HostPort& bootstrap_source_addr,
1278
                            int64_t caller_term,
1279
0
                            const MonoDelta& timeout) {
1280
0
  consensus::StartRemoteBootstrapRequestPB req;
1281
0
  consensus::StartRemoteBootstrapResponsePB resp;
1282
0
  RpcController rpc;
1283
0
  rpc.set_timeout(timeout);
1284
1285
0
  req.set_dest_uuid(ts->uuid());
1286
0
  req.set_tablet_id(tablet_id);
1287
0
  req.set_bootstrap_peer_uuid(bootstrap_source_uuid);
1288
0
  HostPortToPB(bootstrap_source_addr, req.mutable_source_private_addr()->Add());
1289
0
  req.set_caller_term(caller_term);
1290
1291
0
  RETURN_NOT_OK(ts->consensus_proxy->StartRemoteBootstrap(req, &resp, &rpc));
1292
0
  if (resp.has_error()) {
1293
0
    return StatusFromPB(resp.error().status());
1294
0
  }
1295
0
  return Status::OK();
1296
0
}
1297
1298
Status GetLastOpIdForMasterReplica(
1299
    const shared_ptr<consensus::ConsensusServiceProxy>& consensus_proxy,
1300
    const string& tablet_id,
1301
    const string& dest_uuid,
1302
    const consensus::OpIdType opid_type,
1303
    const MonoDelta& timeout,
1304
42
    OpIdPB* opid) {
1305
42
  GetLastOpIdRequestPB opid_req;
1306
42
  GetLastOpIdResponsePB opid_resp;
1307
42
  RpcController controller;
1308
42
  controller.Reset();
1309
42
  controller.set_timeout(timeout);
1310
1311
42
  opid_req.set_dest_uuid(dest_uuid);
1312
42
  opid_req.set_tablet_id(tablet_id);
1313
42
  opid_req.set_opid_type(opid_type);
1314
1315
42
  Status s = consensus_proxy->GetLastOpId(opid_req, &opid_resp, &controller);
1316
42
  if (!s.ok()) {
1317
0
    return STATUS(InvalidArgument, Substitute(
1318
0
        "Failed to fetch opid type $0 from master uuid $1 with error : $2",
1319
0
        opid_type, dest_uuid, s.ToString()));
1320
0
  }
1321
42
  if (opid_resp.has_error()) {
1322
0
    return StatusFromPB(opid_resp.error().status());
1323
0
  }
1324
1325
42
  *opid = opid_resp.opid();
1326
1327
42
  return Status::OK();
1328
42
}
1329
1330
Result<OpId> GetLastOpIdForReplica(
1331
    const TabletId& tablet_id,
1332
    TServerDetails* replica,
1333
    consensus::OpIdType opid_type,
1334
39
    const MonoDelta& timeout) {
1335
39
  return VERIFY_RESULT(GetLastOpIdForEachReplica(tablet_id, {replica}, opid_type, timeout))[0];
1336
39
}
1337
1338
} // namespace itest
1339
} // namespace yb