YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/cluster_itest_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/integration-tests/cluster_itest_util.h"
34
35
#include <stdint.h>
36
37
#include <algorithm>
38
#include <limits>
39
#include <memory>
40
#include <mutex>
41
#include <string>
42
#include <utility>
43
#include <vector>
44
45
#include <boost/optional.hpp>
46
#include <glog/stl_logging.h>
47
#include <gtest/gtest.h>
48
49
#include "yb/client/schema.h"
50
#include "yb/client/yb_table_name.h"
51
52
#include "yb/common/wire_protocol-test-util.h"
53
#include "yb/common/wire_protocol.h"
54
#include "yb/common/wire_protocol.pb.h"
55
56
#include "yb/consensus/consensus.proxy.h"
57
#include "yb/consensus/consensus_meta.h"
58
#include "yb/consensus/consensus_types.pb.h"
59
#include "yb/consensus/opid_util.h"
60
#include "yb/consensus/quorum_util.h"
61
62
#include "yb/gutil/strings/substitute.h"
63
64
#include "yb/integration-tests/external_mini_cluster.h"
65
66
#include "yb/master/master_client.proxy.h"
67
#include "yb/master/master_cluster.proxy.h"
68
69
#include "yb/rpc/rpc_fwd.h"
70
71
#include "yb/server/server_base.proxy.h"
72
73
#include "yb/tserver/tablet_server_test_util.h"
74
#include "yb/tserver/tserver_admin.proxy.h"
75
#include "yb/tserver/tserver_service.pb.h"
76
#include "yb/tserver/tserver_service.proxy.h"
77
78
#include "yb/util/enums.h"
79
#include "yb/util/format.h"
80
#include "yb/util/logging.h"
81
#include "yb/util/monotime.h"
82
#include "yb/util/net/net_fwd.h"
83
#include "yb/util/net/net_util.h"
84
#include "yb/util/result.h"
85
#include "yb/util/status.h"
86
#include "yb/util/status_format.h"
87
#include "yb/util/status_log.h"
88
#include "yb/util/strongly_typed_bool.h"
89
#include "yb/util/test_util.h"
90
91
namespace yb {
92
namespace itest {
93
94
using client::YBClient;
95
using client::YBSchema;
96
using client::YBSchemaBuilder;
97
using client::YBTable;
98
using client::YBTableName;
99
using consensus::CONSENSUS_CONFIG_ACTIVE;
100
using consensus::CONSENSUS_CONFIG_COMMITTED;
101
using consensus::ChangeConfigRequestPB;
102
using consensus::ChangeConfigResponsePB;
103
using consensus::ConsensusConfigType;
104
using consensus::ConsensusStatePB;
105
using consensus::CountVoters;
106
using consensus::GetConsensusStateRequestPB;
107
using consensus::GetConsensusStateResponsePB;
108
using consensus::GetLastOpIdRequestPB;
109
using consensus::GetLastOpIdResponsePB;
110
using consensus::LeaderStepDownRequestPB;
111
using consensus::LeaderStepDownResponsePB;
112
using consensus::RaftPeerPB;
113
using consensus::RunLeaderElectionResponsePB;
114
using consensus::RunLeaderElectionRequestPB;
115
using consensus::kInvalidOpIdIndex;
116
using consensus::LeaderLeaseCheckMode;
117
using consensus::LeaderLeaseStatus;
118
using master::ListTabletServersResponsePB;
119
using master::TabletLocationsPB;
120
using rpc::Messenger;
121
using rpc::RpcController;
122
using std::min;
123
using std::shared_ptr;
124
using std::string;
125
using std::unordered_map;
126
using std::vector;
127
using strings::Substitute;
128
using tserver::CreateTsClientProxies;
129
using tserver::ListTabletsResponsePB;
130
using tserver::DeleteTabletRequestPB;
131
using tserver::DeleteTabletResponsePB;
132
using tserver::TabletServerAdminServiceProxy;
133
using tserver::TabletServerErrorPB;
134
using tserver::TabletServerServiceProxy;
135
using tserver::WriteRequestPB;
136
using tserver::WriteResponsePB;
137
138
453
TServerDetails::TServerDetails() : registration(new master::TSRegistrationPB) {
139
453
}
140
141
279
TServerDetails::~TServerDetails() = default;
142
143
4.93k
const string& TServerDetails::uuid() const {
144
4.93k
  return instance_id.permanent_uuid();
145
4.93k
}
146
147
8
std::string TServerDetails::ToString() const {
148
8
  return Format("TabletServer: $0, Rpc address: $1", instance_id.permanent_uuid(),
149
8
                DesiredHostPort(registration->common(), CloudInfoPB()));
150
8
}
151
152
1
client::YBSchema SimpleIntKeyYBSchema() {
153
1
  YBSchema s;
154
1
  YBSchemaBuilder b;
155
1
  b.AddColumn("key")->Type(INT32)->NotNull()->PrimaryKey();
156
1
  CHECK_OK(b.Build(&s));
157
1
  return s;
158
1
}
159
160
Result<std::vector<OpId>> GetLastOpIdForEachReplica(
161
    const TabletId& tablet_id,
162
    const std::vector<TServerDetails*>& replicas,
163
    consensus::OpIdType opid_type,
164
    const MonoDelta& timeout,
165
196
    consensus::OperationType op_type) {
166
196
  struct Getter {
167
196
    const TabletId& tablet_id;
168
196
    consensus::OpIdType opid_type;
169
196
    consensus::OperationType op_type;
170
171
425
    Result<OpId> operator()(TServerDetails* ts, rpc::RpcController* controller) const {
172
425
      GetLastOpIdRequestPB opid_req;
173
425
      GetLastOpIdResponsePB opid_resp;
174
425
      opid_req.set_tablet_id(tablet_id);
175
425
      opid_req.set_dest_uuid(ts->uuid());
176
425
      opid_req.set_opid_type(opid_type);
177
425
      if (op_type != consensus::OperationType::UNKNOWN_OP) {
178
4
        opid_req.set_op_type(op_type);
179
4
      }
180
425
      RETURN_NOT_OK(ts->consensus_proxy->GetLastOpId(opid_req, &opid_resp, controller));
181
380
      return OpId::FromPB(opid_resp.opid());
182
425
    }
183
196
  };
184
185
196
  return GetForEachReplica(
186
196
      replicas, timeout,
187
196
      Getter{.tablet_id = tablet_id, .opid_type = opid_type, .op_type = op_type});
188
196
}
189
190
165
vector<TServerDetails*> TServerDetailsVector(const TabletServerMap& tablet_servers) {
191
165
  vector<TServerDetails*> result;
192
165
  result.reserve(tablet_servers.size());
193
498
  for (auto& pair : tablet_servers) {
194
498
    result.push_back(pair.second.get());
195
498
  }
196
165
  return result;
197
165
}
198
199
39
vector<TServerDetails*> TServerDetailsVector(const TabletServerMapUnowned& tablet_servers) {
200
39
  vector<TServerDetails*> result;
201
39
  result.reserve(tablet_servers.size());
202
77
  for (auto& pair : tablet_servers) {
203
77
    result.push_back(pair.second);
204
77
  }
205
39
  return result;
206
39
}
207
208
TabletServerMapUnowned CreateTabletServerMapUnowned(const TabletServerMap& tablet_servers,
209
13
                                                    const std::set<std::string>& exclude) {
210
13
  TabletServerMapUnowned result;
211
39
  for (auto& pair : tablet_servers) {
212
39
    if (exclude.find(pair.first) != exclude.end()) {
213
0
      continue;
214
0
    }
215
39
    result.emplace(pair.first, pair.second.get());
216
39
  }
217
13
  return result;
218
13
}
219
220
Status WaitForOpFromCurrentTerm(TServerDetails* replica,
221
                                const string& tablet_id,
222
                                consensus::OpIdType opid_type,
223
                                const MonoDelta& timeout,
224
0
                                OpId* opid) {
225
0
  const MonoTime kStart = MonoTime::Now();
226
0
  const MonoTime kDeadline = kStart + timeout;
227
228
0
  Status s;
229
0
  while (MonoTime::Now() < kDeadline) {
230
0
    ConsensusStatePB cstate;
231
0
    s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_ACTIVE, kDeadline - MonoTime::Now(),
232
0
                          &cstate);
233
0
    if (s.ok()) {
234
0
      Result<OpId> tmp_opid =
235
0
          GetLastOpIdForReplica(tablet_id, replica, opid_type, kDeadline - MonoTime::Now());
236
0
      if (tmp_opid) {
237
0
        if (tmp_opid->term == cstate.current_term()) {
238
0
          if (opid) {
239
0
            *opid = *tmp_opid;
240
0
          }
241
0
          return Status::OK();
242
0
        }
243
0
        s = STATUS(IllegalState, Substitute("Terms don't match. Current term: $0. Latest OpId: $1",
244
0
                                 cstate.current_term(), tmp_opid->ToString()));
245
0
      }
246
0
    }
247
0
    SleepFor(MonoDelta::FromMilliseconds(10));
248
0
  }
249
250
0
  return STATUS(TimedOut, Substitute("Timed out after $0 waiting for op from current term: $1",
251
0
                                     (MonoTime::Now() - kStart).ToString(),
252
0
                                     s.ToString()));
253
0
}
254
255
Status WaitForServersToAgree(const MonoDelta& timeout,
256
                             const TabletServerMap& tablet_servers,
257
                             const string& tablet_id,
258
                             int64_t minimum_index,
259
                             int64_t* actual_index,
260
30
                             MustBeCommitted must_be_committed) {
261
30
  return WaitForServersToAgree(timeout,
262
30
                               TServerDetailsVector(tablet_servers),
263
30
                               tablet_id,
264
30
                               minimum_index,
265
30
                               actual_index,
266
30
                               must_be_committed);
267
30
}
268
269
Status WaitForServersToAgree(const MonoDelta& timeout,
270
                             const TabletServerMapUnowned& tablet_servers,
271
                             const TabletId& tablet_id,
272
                             int64_t minimum_index,
273
                             int64_t* actual_index,
274
39
                             MustBeCommitted must_be_committed) {
275
39
  return WaitForServersToAgree(
276
39
      timeout, TServerDetailsVector(tablet_servers), tablet_id, minimum_index, actual_index,
277
39
      must_be_committed);
278
39
}
279
280
Status WaitForServersToAgree(const MonoDelta& timeout,
281
                             const vector<TServerDetails*>& servers,
282
                             const string& tablet_id,
283
                             int64_t minimum_index,
284
                             int64_t* actual_index,
285
69
                             MustBeCommitted must_be_committed) {
286
69
  auto deadline = CoarseMonoClock::Now() + timeout;
287
69
  if (actual_index != nullptr) {
288
2
    *actual_index = 0;
289
2
  }
290
291
69
  vector<consensus::OpIdType> opid_types{consensus::OpIdType::RECEIVED_OPID};
292
69
  if (must_be_committed) {
293
    // In this mode we require that last received and committed op ids from all servers converge
294
    // on the same value.
295
3
    opid_types.push_back(consensus::OpIdType::COMMITTED_OPID);
296
3
  }
297
298
69
  Status last_non_ok_status;
299
69
  vector<OpId> received_ids;
300
69
  vector<OpId> committed_ids;
301
302
153
  for (int attempt = 1; CoarseMonoClock::Now() < deadline; attempt++) {
303
152
    vector<OpId> ids;
304
305
152
    Status s;
306
159
    for (auto opid_type : opid_types) {
307
159
      auto ids_of_this_type = GetLastOpIdForEachReplica(tablet_id, servers, opid_type, timeout);
308
159
      if (!ids_of_this_type.ok()) {
309
44
        s = ids_of_this_type.status();
310
44
        break;
311
44
      }
312
115
      if (opid_type == consensus::OpIdType::RECEIVED_OPID) {
313
110
        received_ids = *ids_of_this_type;
314
5
      } else {
315
5
        committed_ids = *ids_of_this_type;
316
5
      }
317
115
      std::copy(ids_of_this_type->begin(), ids_of_this_type->end(), std::back_inserter(ids));
318
115
    }
319
320
152
    if (s.ok()) {
321
108
      int64_t cur_index = kInvalidOpIdIndex;
322
108
      bool any_behind = false;
323
108
      bool any_disagree = false;
324
223
      for (const OpId& id : ids) {
325
223
        if (cur_index == kInvalidOpIdIndex) {
326
108
          cur_index = id.index;
327
108
        }
328
223
        if (id.index != cur_index) {
329
10
          any_disagree = true;
330
10
          break;
331
10
        }
332
213
        if (id.index < minimum_index) {
333
30
          any_behind = true;
334
30
          break;
335
30
        }
336
213
      }
337
108
      if (!any_behind && !any_disagree) {
338
68
        LOG(INFO) << "All servers converged on OpIds: " << ids;
339
68
        if (actual_index != nullptr) {
340
2
          *actual_index = cur_index;
341
2
        }
342
68
        return Status::OK();
343
68
      }
344
44
    } else {
345
44
      LOG(WARNING) << "Got error getting last opid for each replica: " << s.ToString();
346
44
      last_non_ok_status = s;
347
44
    }
348
349
84
    LOG(INFO) << "Not converged past " << minimum_index << " yet: " << ids;
350
84
    SleepFor(MonoDelta::FromMilliseconds(min(attempt * 100, 1000)));
351
84
  }
352
1
  return STATUS_FORMAT(
353
69
      TimedOut,
354
69
      "All replicas of tablet $0 could not converge on an index of at least $1 after $2. "
355
69
      "must_be_committed=$3. Latest received ids: $3, committed ids: $4",
356
69
      tablet_id, minimum_index, timeout, must_be_committed, received_ids, committed_ids);
357
69
}
358
359
// Wait until all specified replicas have logged the given index.
360
Status WaitUntilAllReplicasHaveOp(const int64_t log_index,
361
                                  const string& tablet_id,
362
                                  const vector<TServerDetails*>& replicas,
363
                                  const MonoDelta& timeout,
364
1
                                  int64_t* actual_minimum_index) {
365
1
  MonoTime start = MonoTime::Now();
366
1
  MonoDelta passed = MonoDelta::FromMilliseconds(0);
367
3
  while (true) {
368
3
    auto op_ids = GetLastOpIdForEachReplica(tablet_id, replicas, consensus::RECEIVED_OPID, timeout);
369
3
    if (op_ids.ok()) {
370
3
      if (actual_minimum_index != nullptr) {
371
3
        *actual_minimum_index = std::numeric_limits<int64_t>::max();
372
3
      }
373
374
3
      bool any_behind = false;
375
5
      for (const OpId& op_id : *op_ids) {
376
5
        if (actual_minimum_index != nullptr) {
377
5
          *actual_minimum_index = std::min(*actual_minimum_index, op_id.index);
378
5
        }
379
380
5
        if (op_id.index < log_index) {
381
2
          any_behind = true;
382
2
          break;
383
2
        }
384
5
      }
385
3
      if (!any_behind) return Status::OK();
386
0
    } else {
387
0
      LOG(WARNING) << "Got error getting last opid for each replica: " << op_ids.ToString();
388
0
    }
389
2
    passed = MonoTime::Now().GetDeltaSince(start);
390
2
    if (passed.MoreThan(timeout)) {
391
0
      break;
392
0
    }
393
2
    SleepFor(MonoDelta::FromMilliseconds(50));
394
2
  }
395
0
  string replicas_str;
396
0
  for (const TServerDetails* replica : replicas) {
397
0
    if (!replicas_str.empty()) replicas_str += ", ";
398
0
    replicas_str += "{ " + replica->ToString() + " }";
399
0
  }
400
0
  return STATUS(TimedOut, Substitute("Index $0 not available on all replicas after $1. "
401
1
                                              "Replicas: [ $2 ]",
402
1
                                              log_index, passed.ToString()));
403
1
}
404
405
Status WaitUntilNumberOfAliveTServersEqual(int n_tservers,
406
                                           const master::MasterClusterProxy& master_proxy,
407
1
                                           const MonoDelta& timeout) {
408
409
1
  master::ListTabletServersRequestPB req;
410
1
  master::ListTabletServersResponsePB resp;
411
1
  rpc::RpcController controller;
412
1
  controller.set_timeout(timeout);
413
414
  // The field primary_only means only tservers that are alive (tservers that have sent at least on
415
  // heartbeat in the last FLAG_tserver_unresponsive_timeout_ms milliseconds.)
416
1
  req.set_primary_only(true);
417
418
1
  MonoTime start = MonoTime::Now();
419
1
  MonoDelta passed = MonoDelta::FromMilliseconds(0);
420
86
  while (true) {
421
86
    Status s = master_proxy.ListTabletServers(req, &resp, &controller);
422
423
86
    if (s.ok() &&
424
86
        controller.status().ok() &&
425
86
        !resp.has_error()) {
426
86
      if (resp.servers_size() == n_tservers) {
427
1
        passed = MonoTime::Now().GetDeltaSince(start);
428
1
        return Status::OK();
429
1
      }
430
0
    } else {
431
0
      string error;
432
0
      if (!s.ok()) {
433
0
        error = s.ToString();
434
0
      } else if (!controller.status().ok()) {
435
0
        error = controller.status().ToString();
436
0
      } else {
437
0
        error = resp.error().ShortDebugString();
438
0
      }
439
0
      LOG(WARNING) << "Got error getting list of tablet servers: " << error;
440
0
    }
441
85
    passed = MonoTime::Now().GetDeltaSince(start);
442
85
    if (passed.MoreThan(timeout)) {
443
0
      break;
444
0
    }
445
85
    SleepFor(MonoDelta::FromMilliseconds(50));
446
85
    controller.Reset();
447
85
  }
448
0
  return STATUS(TimedOut, Substitute("Number of alive tservers not equal to $0 after $1 ms. ",
449
1
                                     n_tservers, timeout.ToMilliseconds()));
450
1
}
451
452
147
Result<TabletServerMap> CreateTabletServerMap(ExternalMiniCluster* cluster) {
453
147
  auto proxy = cluster->num_masters() > 1
454
3
      ? cluster->GetLeaderMasterProxy<master::MasterClusterProxy>()
455
144
      : cluster->GetMasterProxy<master::MasterClusterProxy>();
456
147
  return CreateTabletServerMap(proxy, &cluster->proxy_cache());
457
147
}
458
459
Result<TabletServerMap> CreateTabletServerMap(
460
147
    const master::MasterClusterProxy& proxy, rpc::ProxyCache* proxy_cache) {
461
147
  master::ListTabletServersRequestPB req;
462
147
  master::ListTabletServersResponsePB resp;
463
147
  rpc::RpcController controller;
464
465
147
  RETURN_NOT_OK(proxy.ListTabletServers(req, &resp, &controller));
466
147
  RETURN_NOT_OK(controller.status());
467
147
  if (resp.has_error()) {
468
0
    return STATUS(RemoteError, "Response had an error", resp.error().ShortDebugString());
469
0
  }
470
471
147
  TabletServerMap result;
472
453
  for (const ListTabletServersResponsePB::Entry& entry : resp.servers()) {
473
453
    HostPort host_port = HostPortFromPB(DesiredHostPort(
474
453
        entry.registration().common(), CloudInfoPB()));
475
476
453
    std::unique_ptr<TServerDetails> peer(new TServerDetails());
477
453
    peer->instance_id.CopyFrom(entry.instance_id());
478
453
    peer->registration->CopyFrom(entry.registration());
479
480
453
    CreateTsClientProxies(host_port,
481
453
                          proxy_cache,
482
453
                          &peer->tserver_proxy,
483
453
                          &peer->tserver_admin_proxy,
484
453
                          &peer->consensus_proxy,
485
453
                          &peer->generic_proxy);
486
487
453
    const auto& key = peer->instance_id.permanent_uuid();
488
0
    CHECK(result.emplace(key, std::move(peer)).second) << "duplicate key: " << key;
489
453
  }
490
147
  return result;
491
147
}
492
493
Status GetConsensusState(const TServerDetails* replica,
494
                         const string& tablet_id,
495
                         consensus::ConsensusConfigType type,
496
                         const MonoDelta& timeout,
497
                         ConsensusStatePB* consensus_state,
498
3.37k
                         LeaderLeaseStatus* leader_lease_status) {
499
3.37k
  DCHECK_ONLY_NOTNULL(replica);
500
501
3.37k
  GetConsensusStateRequestPB req;
502
3.37k
  GetConsensusStateResponsePB resp;
503
3.37k
  RpcController controller;
504
3.37k
  controller.set_timeout(timeout);
505
3.37k
  req.set_dest_uuid(replica->uuid());
506
3.37k
  req.set_tablet_id(tablet_id);
507
3.37k
  req.set_type(type);
508
509
3.37k
  RETURN_NOT_OK(replica->consensus_proxy->GetConsensusState(req, &resp, &controller));
510
3.25k
  if (resp.has_error()) {
511
2
    return StatusFromPB(resp.error().status());
512
2
  }
513
3.25k
  *consensus_state = resp.cstate();
514
3.25k
  if (leader_lease_status) {
515
2.59k
    *leader_lease_status = resp.has_leader_lease_status() ?
516
2.59k
        resp.leader_lease_status() :
517
0
        LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE;  // Could be anything but HAS_LEASE.
518
2.59k
  }
519
3.25k
  return Status::OK();
520
3.25k
}
521
522
Status WaitUntilCommittedConfigNumVotersIs(size_t config_size,
523
                                           const TServerDetails* replica,
524
                                           const std::string& tablet_id,
525
115
                                           const MonoDelta& timeout) {
526
115
  return WaitUntilCommittedConfigMemberTypeIs(config_size, replica, tablet_id, timeout,
527
115
                                              consensus::PeerMemberType::VOTER);
528
115
}
529
530
Status WaitUntilCommittedConfigMemberTypeIs(size_t config_size,
531
                                            const TServerDetails* replica,
532
                                            const std::string& tablet_id,
533
                                            const MonoDelta& timeout,
534
                                            consensus::PeerMemberType member_type) {
535
  DCHECK_ONLY_NOTNULL(replica);
536
537
  MonoTime start = MonoTime::Now();
538
  MonoTime deadline = start + timeout;
539
540
  int backoff_exp = 0;
541
  const int kMaxBackoffExp = 7;
542
  Status s;
543
  ConsensusStatePB cstate;
544
  while (true) {
545
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
546
    s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED,
547
                          remaining_timeout, &cstate);
548
    if (s.ok()) {
549
      if (CountMemberType(cstate.config(), member_type) == config_size) {
550
        return Status::OK();
551
      }
552
      LOG(INFO) << "Got " << yb::ToString(cstate) << " from " << replica->ToString();
553
    } else {
554
      LOG(INFO) << "Got " << s.ToString() << " from " << replica->ToString();
555
    }
556
557
    if (MonoTime::Now().GetDeltaSince(start).MoreThan(timeout)) {
558
      break;
559
    }
560
    SleepFor(MonoDelta::FromMilliseconds(1LLU << backoff_exp));
561
    backoff_exp = min(backoff_exp + 1, kMaxBackoffExp);
562
  }
563
  return STATUS(TimedOut, Substitute("Number of replicas of type $0 does not equal $1 after "
564
                                     "waiting for $2. Last consensus state: $3. Last status: $4",
565
                                     PeerMemberType_Name(member_type), config_size,
566
                                     timeout.ToString(), cstate.ShortDebugString(), s.ToString()));
567
}
568
569
template<class Context>
570
Status WaitUntilCommittedOpIdIndex(TServerDetails* replica,
571
                                   const string& tablet_id,
572
                                   const MonoDelta& timeout,
573
                                   CommittedEntryType type,
574
16
                                   Context context) {
575
16
  MonoTime start = MonoTime::Now();
576
16
  MonoTime deadline = start;
577
16
  deadline.AddDelta(timeout);
578
579
16
  bool config = type == CommittedEntryType::CONFIG;
580
16
  Status s;
581
16
  OpId op_id;
582
16
  ConsensusStatePB cstate;
583
16
  while (true) {
584
16
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
585
586
16
    int64_t op_index = -1;
587
16
    if (config) {
588
0
      s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED,
589
0
          remaining_timeout, &cstate);
590
0
      if (s.ok()) {
591
0
        op_index = cstate.config().opid_index();
592
0
      }
593
16
    } else {
594
16
      auto last_op_id_result = GetLastOpIdForReplica(
595
16
          tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout);
596
16
      if (last_op_id_result.ok()) {
597
16
        op_id = *last_op_id_result;
598
16
        op_index = op_id.index;
599
0
      } else {
600
0
        s = last_op_id_result.status();
601
0
      }
602
16
    }
603
604
16
    if (s.ok() && context.Check(op_index)) {
605
16
      if (config) {
606
0
        LOG(INFO) << "Committed config state is: " << cstate.ShortDebugString() << " for replica: "
607
0
                  << replica->instance_id.permanent_uuid();
608
16
      } else {
609
16
        LOG(INFO) << "Committed op_id index is: " << op_id << " for replica: "
610
16
                  << replica->instance_id.permanent_uuid();
611
16
      }
612
16
      return Status::OK();
613
16
    }
614
0
    auto passed = MonoTime::Now().GetDeltaSince(start);
615
0
    if (passed.MoreThan(timeout)) {
616
0
      auto name = config ? "config" : "consensus";
617
0
      auto last_value = config ? cstate.ShortDebugString() : AsString(op_id);
618
0
      return STATUS(TimedOut,
619
0
                    Substitute("Committed $0 opid_index does not equal $1 "
620
0
                               "after waiting for $2. Last value: $3, Last status: $4",
621
0
                               name,
622
0
                               context.Desired(),
623
0
                               passed.ToString(),
624
0
                               last_value,
625
0
                               s.ToString()));
626
0
    }
627
0
    if (!config) {
628
0
      LOG(INFO) << "Committed index is at: " << op_id.index << " and not yet at "
629
0
                << context.Desired();
630
0
    }
631
0
    SleepFor(MonoDelta::FromMilliseconds(100));
632
0
  }
633
16
}
_ZN2yb5itest27WaitUntilCommittedOpIdIndexINS0_36WaitUntilCommittedOpIdIndexIsContextEEENS_6StatusEPNS0_14TServerDetailsERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEERKNS_9MonoDeltaENS0_18CommittedEntryTypeET_
Line
Count
Source
574
15
                                   Context context) {
575
15
  MonoTime start = MonoTime::Now();
576
15
  MonoTime deadline = start;
577
15
  deadline.AddDelta(timeout);
578
579
15
  bool config = type == CommittedEntryType::CONFIG;
580
15
  Status s;
581
15
  OpId op_id;
582
15
  ConsensusStatePB cstate;
583
15
  while (true) {
584
15
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
585
586
15
    int64_t op_index = -1;
587
15
    if (config) {
588
0
      s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED,
589
0
          remaining_timeout, &cstate);
590
0
      if (s.ok()) {
591
0
        op_index = cstate.config().opid_index();
592
0
      }
593
15
    } else {
594
15
      auto last_op_id_result = GetLastOpIdForReplica(
595
15
          tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout);
596
15
      if (last_op_id_result.ok()) {
597
15
        op_id = *last_op_id_result;
598
15
        op_index = op_id.index;
599
0
      } else {
600
0
        s = last_op_id_result.status();
601
0
      }
602
15
    }
603
604
15
    if (s.ok() && context.Check(op_index)) {
605
15
      if (config) {
606
0
        LOG(INFO) << "Committed config state is: " << cstate.ShortDebugString() << " for replica: "
607
0
                  << replica->instance_id.permanent_uuid();
608
15
      } else {
609
15
        LOG(INFO) << "Committed op_id index is: " << op_id << " for replica: "
610
15
                  << replica->instance_id.permanent_uuid();
611
15
      }
612
15
      return Status::OK();
613
15
    }
614
0
    auto passed = MonoTime::Now().GetDeltaSince(start);
615
0
    if (passed.MoreThan(timeout)) {
616
0
      auto name = config ? "config" : "consensus";
617
0
      auto last_value = config ? cstate.ShortDebugString() : AsString(op_id);
618
0
      return STATUS(TimedOut,
619
0
                    Substitute("Committed $0 opid_index does not equal $1 "
620
0
                               "after waiting for $2. Last value: $3, Last status: $4",
621
0
                               name,
622
0
                               context.Desired(),
623
0
                               passed.ToString(),
624
0
                               last_value,
625
0
                               s.ToString()));
626
0
    }
627
0
    if (!config) {
628
0
      LOG(INFO) << "Committed index is at: " << op_id.index << " and not yet at "
629
0
                << context.Desired();
630
0
    }
631
0
    SleepFor(MonoDelta::FromMilliseconds(100));
632
0
  }
633
15
}
_ZN2yb5itest27WaitUntilCommittedOpIdIndexINS0_47WaitUntilCommittedOpIdIndexIsGreaterThanContextEEENS_6StatusEPNS0_14TServerDetailsERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEERKNS_9MonoDeltaENS0_18CommittedEntryTypeET_
Line
Count
Source
574
1
                                   Context context) {
575
1
  MonoTime start = MonoTime::Now();
576
1
  MonoTime deadline = start;
577
1
  deadline.AddDelta(timeout);
578
579
1
  bool config = type == CommittedEntryType::CONFIG;
580
1
  Status s;
581
1
  OpId op_id;
582
1
  ConsensusStatePB cstate;
583
1
  while (true) {
584
1
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
585
586
1
    int64_t op_index = -1;
587
1
    if (config) {
588
0
      s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED,
589
0
          remaining_timeout, &cstate);
590
0
      if (s.ok()) {
591
0
        op_index = cstate.config().opid_index();
592
0
      }
593
1
    } else {
594
1
      auto last_op_id_result = GetLastOpIdForReplica(
595
1
          tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout);
596
1
      if (last_op_id_result.ok()) {
597
1
        op_id = *last_op_id_result;
598
1
        op_index = op_id.index;
599
0
      } else {
600
0
        s = last_op_id_result.status();
601
0
      }
602
1
    }
603
604
1
    if (s.ok() && context.Check(op_index)) {
605
1
      if (config) {
606
0
        LOG(INFO) << "Committed config state is: " << cstate.ShortDebugString() << " for replica: "
607
0
                  << replica->instance_id.permanent_uuid();
608
1
      } else {
609
1
        LOG(INFO) << "Committed op_id index is: " << op_id << " for replica: "
610
1
                  << replica->instance_id.permanent_uuid();
611
1
      }
612
1
      return Status::OK();
613
1
    }
614
0
    auto passed = MonoTime::Now().GetDeltaSince(start);
615
0
    if (passed.MoreThan(timeout)) {
616
0
      auto name = config ? "config" : "consensus";
617
0
      auto last_value = config ? cstate.ShortDebugString() : AsString(op_id);
618
0
      return STATUS(TimedOut,
619
0
                    Substitute("Committed $0 opid_index does not equal $1 "
620
0
                               "after waiting for $2. Last value: $3, Last status: $4",
621
0
                               name,
622
0
                               context.Desired(),
623
0
                               passed.ToString(),
624
0
                               last_value,
625
0
                               s.ToString()));
626
0
    }
627
0
    if (!config) {
628
0
      LOG(INFO) << "Committed index is at: " << op_id.index << " and not yet at "
629
0
                << context.Desired();
630
0
    }
631
0
    SleepFor(MonoDelta::FromMilliseconds(100));
632
0
  }
633
1
}
634
635
class WaitUntilCommittedOpIdIndexContext {
636
 public:
637
  explicit WaitUntilCommittedOpIdIndexContext(std::string desired)
638
16
      : desired_(std::move(desired)) {
639
16
  }
640
641
0
  const string& Desired() const {
642
0
    return desired_;
643
0
  }
644
 private:
645
  string desired_;
646
};
647
648
class WaitUntilCommittedOpIdIndexIsContext : public WaitUntilCommittedOpIdIndexContext {
649
 public:
650
  explicit WaitUntilCommittedOpIdIndexIsContext(int64_t value)
651
      : WaitUntilCommittedOpIdIndexContext(Substitute("equal $0", value)),
652
15
        value_(value) {
653
15
  }
654
655
15
  bool Check(int64_t current) {
656
15
    return value_ == current;
657
15
  }
658
 private:
659
  int64_t value_;
660
};
661
662
Status WaitForReplicasReportedToMaster(
663
    ExternalMiniCluster* cluster,
664
    int num_replicas, const string& tablet_id,
665
    const MonoDelta& timeout,
666
    WaitForLeader wait_for_leader,
667
    bool* has_leader,
668
0
    master::TabletLocationsPB* tablet_locations) {
669
0
  MonoTime deadline(MonoTime::Now() + timeout);
670
0
  while (true) {
671
0
    RETURN_NOT_OK(GetTabletLocations(cluster, tablet_id, timeout, tablet_locations));
672
0
    *has_leader = false;
673
0
    if (tablet_locations->replicas_size() == num_replicas) {
674
0
      for (const master::TabletLocationsPB_ReplicaPB& replica :
675
0
                    tablet_locations->replicas()) {
676
0
        if (replica.role() == PeerRole::LEADER) {
677
0
          *has_leader = true;
678
0
        }
679
0
      }
680
0
      if (wait_for_leader == DONT_WAIT_FOR_LEADER ||
681
0
          (wait_for_leader == WAIT_FOR_LEADER && *has_leader)) {
682
0
        break;
683
0
      }
684
0
    }
685
0
    if (deadline < MonoTime::Now()) {
686
0
      return STATUS(TimedOut, Substitute("Timed out after waiting "
687
0
          "for tablet $1 expected to report master with $2 replicas, has_leader: $3",
688
0
          tablet_id, num_replicas, *has_leader));
689
0
    }
690
0
    SleepFor(MonoDelta::FromMilliseconds(20));
691
0
  }
692
0
  if (num_replicas != tablet_locations->replicas_size()) {
693
0
      return STATUS(NotFound, Substitute("Number of replicas for tablet $0 "
694
0
          "reported to master $1:$2",
695
0
          tablet_id, tablet_locations->replicas_size(),
696
0
          yb::ToString(*tablet_locations)));
697
0
  }
698
0
  if (wait_for_leader == WAIT_FOR_LEADER && !(*has_leader)) {
699
0
    return STATUS(NotFound, Substitute("Leader for tablet $0 not found on master, "
700
0
                                       "number of replicas $1:$2",
701
0
                                       tablet_id, tablet_locations->replicas_size(),
702
0
                                       yb::ToString(*tablet_locations)));
703
0
  }
704
0
  return Status::OK();
705
0
}
706
707
Status WaitUntilCommittedOpIdIndexIs(int64_t opid_index,
708
                                     TServerDetails* replica,
709
                                     const string& tablet_id,
710
                                     const MonoDelta& timeout,
711
15
                                     CommittedEntryType type) {
712
15
  return WaitUntilCommittedOpIdIndex(
713
15
      replica,
714
15
      tablet_id,
715
15
      timeout,
716
15
      type,
717
15
      WaitUntilCommittedOpIdIndexIsContext(opid_index));
718
15
}
719
720
class WaitUntilCommittedOpIdIndexIsGreaterThanContext : public WaitUntilCommittedOpIdIndexContext {
721
 public:
722
  explicit WaitUntilCommittedOpIdIndexIsGreaterThanContext(int64_t* value)
723
      : WaitUntilCommittedOpIdIndexContext(Substitute("greater than $0", *value)),
724
1
        original_value_(*value), value_(value) {
725
726
1
  }
727
728
1
  bool Check(int64_t current) {
729
1
    if (current > *value_) {
730
1
      CHECK_EQ(*value_, original_value_);
731
1
      *value_ = current;
732
1
      return true;
733
1
    }
734
0
    return false;
735
0
  }
736
 private:
737
  int64_t original_value_;
738
  int64_t* const value_;
739
};
740
741
Status WaitUntilCommittedOpIdIndexIsGreaterThan(int64_t* index,
742
                                                TServerDetails* replica,
743
                                                const TabletId& tablet_id,
744
                                                const MonoDelta& timeout,
745
1
                                                CommittedEntryType type) {
746
1
  return WaitUntilCommittedOpIdIndex(
747
1
      replica,
748
1
      tablet_id,
749
1
      timeout,
750
1
      type,
751
1
      WaitUntilCommittedOpIdIndexIsGreaterThanContext(index));
752
1
}
753
754
Status WaitUntilCommittedOpIdIndexIsAtLeast(int64_t* index,
755
                                            TServerDetails* replica,
756
                                            const TabletId& tablet_id,
757
                                            const MonoDelta& timeout,
758
1
                                            CommittedEntryType type) {
759
1
  int64_t tmp_index = *index - 1;
760
1
  Status s = WaitUntilCommittedOpIdIndexIsGreaterThan(
761
1
      &tmp_index,
762
1
      replica,
763
1
      tablet_id,
764
1
      timeout,
765
1
      type);
766
1
  *index = tmp_index;
767
1
  return s;
768
1
}
769
770
Status GetReplicaStatusAndCheckIfLeader(const TServerDetails* replica,
771
                                        const string& tablet_id,
772
                                        const MonoDelta& timeout,
773
2.70k
                                        LeaderLeaseCheckMode lease_check_mode) {
774
2.70k
  ConsensusStatePB cstate;
775
2.70k
  LeaderLeaseStatus leader_lease_status;
776
2.70k
  Status s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_ACTIVE,
777
2.70k
                               timeout, &cstate, &leader_lease_status);
778
2.70k
  if (PREDICT_FALSE(!s.ok())) {
779
0
    VLOG(1) << "Error getting consensus state from replica: "
780
0
            << replica->instance_id.permanent_uuid();
781
117
    return STATUS(NotFound, "Error connecting to replica", s.ToString());
782
117
  }
783
2.59k
  const string& replica_uuid = replica->instance_id.permanent_uuid();
784
2.59k
  if (cstate.has_leader_uuid() && cstate.leader_uuid() == replica_uuid &&
785
324
      (lease_check_mode == LeaderLeaseCheckMode::DONT_NEED_LEASE ||
786
323
       leader_lease_status == consensus::LeaderLeaseStatus::HAS_LEASE)) {
787
160
    return Status::OK();
788
160
  }
789
0
  VLOG(1) << "Replica not leader of config: " << replica->instance_id.permanent_uuid();
790
2.43k
  return STATUS_FORMAT(IllegalState,
791
2.43k
      "Replica found but not leader; lease check mode: $0", lease_check_mode);
792
2.43k
}
793
794
Status WaitUntilLeader(const TServerDetails* replica,
795
                       const string& tablet_id,
796
                       const MonoDelta& timeout,
797
24
                       const LeaderLeaseCheckMode lease_check_mode) {
798
24
  MonoTime start = MonoTime::Now();
799
24
  MonoTime deadline = start;
800
24
  deadline.AddDelta(timeout);
801
802
24
  int backoff_exp = 0;
803
24
  const int kMaxBackoffExp = 7;
804
24
  Status s;
805
255
  while (true) {
806
255
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
807
255
    s = GetReplicaStatusAndCheckIfLeader(replica, tablet_id, remaining_timeout,
808
255
                                         lease_check_mode);
809
255
    if (s.ok()) {
810
16
      return Status::OK();
811
16
    }
812
813
239
    if (MonoTime::Now().GetDeltaSince(start).MoreThan(timeout)) {
814
8
      break;
815
8
    }
816
231
    SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
817
231
    backoff_exp = min(backoff_exp + 1, kMaxBackoffExp);
818
231
  }
819
8
  return STATUS(TimedOut, Substitute("Replica $0 is not leader after waiting for $1: $2",
820
24
                                     replica->ToString(), timeout.ToString(), s.ToString()));
821
24
}
822
823
Status FindTabletLeader(const TabletServerMap& tablet_servers,
824
                        const string& tablet_id,
825
                        const MonoDelta& timeout,
826
112
                        TServerDetails** leader) {
827
112
  return FindTabletLeader(TServerDetailsVector(tablet_servers), tablet_id, timeout, leader);
828
112
}
829
830
Status FindTabletLeader(const TabletServerMapUnowned& tablet_servers,
831
                        const string& tablet_id,
832
                        const MonoDelta& timeout,
833
0
                        TServerDetails** leader) {
834
0
  return FindTabletLeader(TServerDetailsVector(tablet_servers), tablet_id, timeout, leader);
835
0
}
836
837
Status FindTabletLeader(const vector<TServerDetails*>& tservers,
838
                        const string& tablet_id,
839
                        const MonoDelta& timeout,
840
112
                        TServerDetails** leader) {
841
112
  MonoTime start = MonoTime::Now();
842
112
  MonoTime deadline = start;
843
112
  deadline.AddDelta(timeout);
844
112
  Status s;
845
112
  int i = 0;
846
2.19k
  while (true) {
847
2.19k
    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now());
848
2.19k
    s = GetReplicaStatusAndCheckIfLeader(tservers[i], tablet_id, remaining_timeout);
849
2.19k
    if (s.ok()) {
850
111
      *leader = tservers[i];
851
111
      return Status::OK();
852
111
    }
853
854
2.08k
    if (deadline.ComesBefore(MonoTime::Now())) break;
855
2.08k
    i = (i + 1) % tservers.size();
856
2.08k
    if (i == 0) {
857
624
      SleepFor(MonoDelta::FromMilliseconds(10));
858
624
    }
859
2.08k
  }
860
1
  return STATUS(TimedOut, Substitute("Unable to find leader of tablet $0 after $1. "
861
112
                                     "Status message: $2", tablet_id,
862
112
                                     MonoTime::Now().GetDeltaSince(start).ToString(),
863
112
                                     s.ToString()));
864
112
}
865
866
Status FindTabletFollowers(const TabletServerMapUnowned& tablet_servers,
867
                           const string& tablet_id,
868
                           const MonoDelta& timeout,
869
0
                           vector<TServerDetails*>* followers) {
870
0
  TServerDetails* leader;
871
0
  RETURN_NOT_OK(FindTabletLeader(tablet_servers, tablet_id, timeout, &leader));
872
0
  for (const auto& entry : tablet_servers) {
873
0
    TServerDetails* ts = entry.second;
874
0
    if (ts->uuid() != leader->uuid()) {
875
0
      followers->push_back(ts);
876
0
    }
877
0
  }
878
0
  return Status::OK();
879
0
}
880
881
Status StartElection(const TServerDetails* replica,
882
                     const string& tablet_id,
883
                     const MonoDelta& timeout,
884
99
                     consensus::TEST_SuppressVoteRequest suppress_vote_request) {
885
99
  RunLeaderElectionRequestPB req;
886
99
  req.set_dest_uuid(replica->uuid());
887
99
  req.set_tablet_id(tablet_id);
888
99
  req.set_suppress_vote_request(suppress_vote_request);
889
99
  RunLeaderElectionResponsePB resp;
890
99
  RpcController rpc;
891
99
  rpc.set_timeout(timeout);
892
99
  RETURN_NOT_OK(replica->consensus_proxy->RunLeaderElection(req, &resp, &rpc));
893
99
  if (resp.has_error()) {
894
0
    return StatusFromPB(resp.error().status())
895
0
      .CloneAndPrepend(Substitute("Code $0", TabletServerErrorPB::Code_Name(resp.error().code())));
896
0
  }
897
99
  return Status::OK();
898
99
}
899
900
Status RequestVote(const TServerDetails* replica,
901
                   const std::string& tablet_id,
902
                   const std::string& candidate_uuid,
903
                   int64_t candidate_term,
904
                   const OpIdPB& last_logged_opid,
905
                   boost::optional<bool> ignore_live_leader,
906
                   boost::optional<bool> is_pre_election,
907
0
                   const MonoDelta& timeout) {
908
0
  RSTATUS_DCHECK(
909
0
      last_logged_opid.IsInitialized(), Uninitialized, "Last logged op id is uninitialized");
910
0
  consensus::VoteRequestPB req;
911
0
  req.set_dest_uuid(replica->uuid());
912
0
  req.set_tablet_id(tablet_id);
913
0
  req.set_candidate_uuid(candidate_uuid);
914
0
  req.set_candidate_term(candidate_term);
915
0
  *req.mutable_candidate_status()->mutable_last_received() = last_logged_opid;
916
0
  if (ignore_live_leader) req.set_ignore_live_leader(*ignore_live_leader);
917
0
  if (is_pre_election) req.set_preelection(*is_pre_election);
918
0
  consensus::VoteResponsePB resp;
919
0
  RpcController rpc;
920
0
  rpc.set_timeout(timeout);
921
0
  RETURN_NOT_OK(replica->consensus_proxy->RequestConsensusVote(req, &resp, &rpc));
922
0
  if (resp.has_vote_granted() && resp.vote_granted())
923
0
    return Status::OK();
924
0
  if (resp.has_error())
925
0
    return StatusFromPB(resp.error().status());
926
0
  if (resp.has_consensus_error())
927
0
    return StatusFromPB(resp.consensus_error().status());
928
0
  return STATUS(IllegalState, "Unknown error (vote not granted)");
929
0
}
930
931
Status LeaderStepDown(
932
    const TServerDetails* replica,
933
    const string& tablet_id,
934
    const TServerDetails* new_leader,
935
    const MonoDelta& timeout,
936
    const bool disable_graceful_transition,
937
12
    TabletServerErrorPB* error) {
938
12
  LeaderStepDownRequestPB req;
939
12
  req.set_dest_uuid(replica->uuid());
940
12
  req.set_tablet_id(tablet_id);
941
12
  if (disable_graceful_transition) {
942
1
    req.set_disable_graceful_transition(disable_graceful_transition);
943
1
  }
944
12
  if (new_leader) {
945
0
    req.set_new_leader_uuid(new_leader->uuid());
946
0
  }
947
12
  LeaderStepDownResponsePB resp;
948
12
  RpcController rpc;
949
12
  rpc.set_timeout(timeout);
950
12
  RETURN_NOT_OK(replica->consensus_proxy->LeaderStepDown(req, &resp, &rpc));
951
12
  if (resp.has_error()) {
952
1
    if (error != nullptr) {
953
1
      *error = resp.error();
954
1
    }
955
1
    return StatusFromPB(resp.error().status())
956
1
      .CloneAndPrepend(Substitute("Code $0", TabletServerErrorPB::Code_Name(resp.error().code())));
957
1
  }
958
375
  return WaitFor([&]() -> Result<bool> {
959
375
    rpc.Reset();
960
375
    GetConsensusStateRequestPB state_req;
961
375
    state_req.set_dest_uuid(replica->uuid());
962
375
    state_req.set_tablet_id(tablet_id);
963
375
    GetConsensusStateResponsePB state_resp;
964
375
    RETURN_NOT_OK(replica->consensus_proxy->GetConsensusState(state_req, &state_resp, &rpc));
965
375
    return state_resp.cstate().leader_uuid() != replica->uuid();
966
375
  }, timeout, "Leader change");
967
11
}
968
969
Status WriteSimpleTestRow(const TServerDetails* replica,
970
                          const std::string& tablet_id,
971
                          int32_t key,
972
                          int32_t int_val,
973
                          const string& string_val,
974
6.16k
                          const MonoDelta& timeout) {
975
6.16k
  WriteRequestPB req;
976
6.16k
  WriteResponsePB resp;
977
6.16k
  RpcController rpc;
978
6.16k
  rpc.set_timeout(timeout);
979
980
6.16k
  req.set_tablet_id(tablet_id);
981
982
6.16k
  AddTestRowInsert(key, int_val, string_val, &req);
983
984
6.16k
  RETURN_NOT_OK(replica->tserver_proxy->Write(req, &resp, &rpc));
985
6.15k
  if (resp.has_error()) {
986
24
    return StatusFromPB(resp.error().status());
987
24
  }
988
6.12k
  return Status::OK();
989
6.12k
}
990
991
namespace {
992
  Status SendAddRemoveServerRequest(const TServerDetails* leader,
993
                                    const ChangeConfigRequestPB& req,
994
                                    ChangeConfigResponsePB* resp,
995
                                    RpcController* rpc,
996
                                    const MonoDelta& timeout,
997
                                    TabletServerErrorPB::Code* error_code,
998
37
                                    bool retry) {
999
37
    Status status = Status::OK();
1000
37
    MonoTime start = MonoTime::Now();
1001
37
    do {
1002
37
      RETURN_NOT_OK(leader->consensus_proxy->ChangeConfig(req, resp, rpc));
1003
35
      if (!resp->has_error()) {
1004
28
        break;
1005
28
      }
1006
7
      if (error_code) *error_code = resp->error().code();
1007
7
      status = StatusFromPB(resp->error().status());
1008
7
      if (!retry) {
1009
7
        break;
1010
7
      }
1011
0
      if (resp->error().code() != TabletServerErrorPB::LEADER_NOT_READY_CHANGE_CONFIG) {
1012
0
        break;
1013
0
      }
1014
0
      rpc->Reset();
1015
0
    } while (MonoTime::Now().GetDeltaSince(start).LessThan(timeout));
1016
35
    return status;
1017
37
  }
1018
} // namespace
1019
1020
Status AddServer(const TServerDetails* leader,
1021
                 const std::string& tablet_id,
1022
                 const TServerDetails* replica_to_add,
1023
                 consensus::PeerMemberType member_type,
1024
                 const boost::optional<int64_t>& cas_config_opid_index,
1025
                 const MonoDelta& timeout,
1026
                 TabletServerErrorPB::Code* error_code,
1027
15
                 bool retry) {
1028
15
  ChangeConfigRequestPB req;
1029
15
  ChangeConfigResponsePB resp;
1030
15
  RpcController rpc;
1031
15
  rpc.set_timeout(timeout);
1032
1033
15
  req.set_dest_uuid(leader->uuid());
1034
15
  req.set_tablet_id(tablet_id);
1035
15
  req.set_type(consensus::ADD_SERVER);
1036
15
  RaftPeerPB* peer = req.mutable_server();
1037
15
  peer->set_permanent_uuid(replica_to_add->uuid());
1038
15
  peer->set_member_type(member_type);
1039
15
  CopyRegistration(replica_to_add->registration->common(), peer);
1040
15
  if (cas_config_opid_index) {
1041
2
    req.set_cas_config_opid_index(*cas_config_opid_index);
1042
2
  }
1043
1044
15
  return SendAddRemoveServerRequest(leader, req, &resp, &rpc, timeout, error_code, retry);
1045
15
}
1046
1047
Status RemoveServer(const TServerDetails* leader,
1048
                    const std::string& tablet_id,
1049
                    const TServerDetails* replica_to_remove,
1050
                    const boost::optional<int64_t>& cas_config_opid_index,
1051
                    const MonoDelta& timeout,
1052
                    TabletServerErrorPB::Code* error_code,
1053
22
                    bool retry) {
1054
22
  ChangeConfigRequestPB req;
1055
22
  ChangeConfigResponsePB resp;
1056
22
  RpcController rpc;
1057
22
  rpc.set_timeout(timeout);
1058
1059
22
  req.set_dest_uuid(leader->uuid());
1060
22
  req.set_tablet_id(tablet_id);
1061
22
  req.set_type(consensus::REMOVE_SERVER);
1062
22
  if (cas_config_opid_index) {
1063
3
    req.set_cas_config_opid_index(*cas_config_opid_index);
1064
3
  }
1065
22
  RaftPeerPB* peer = req.mutable_server();
1066
22
  peer->set_permanent_uuid(replica_to_remove->uuid());
1067
1068
22
  return SendAddRemoveServerRequest(leader, req, &resp, &rpc, timeout, error_code, retry);
1069
22
}
1070
1071
Status ListTablets(const TServerDetails* ts,
1072
                   const MonoDelta& timeout,
1073
185
                   vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) {
1074
185
  tserver::ListTabletsRequestPB req;
1075
185
  tserver::ListTabletsResponsePB resp;
1076
185
  RpcController rpc;
1077
185
  rpc.set_timeout(timeout);
1078
1079
185
  RETURN_NOT_OK(ts->tserver_proxy->ListTablets(req, &resp, &rpc));
1080
185
  if (resp.has_error()) {
1081
0
    return StatusFromPB(resp.error().status());
1082
0
  }
1083
1084
185
  tablets->assign(resp.status_and_schema().begin(), resp.status_and_schema().end());
1085
185
  return Status::OK();
1086
185
}
1087
1088
Status ListRunningTabletIds(const TServerDetails* ts,
1089
                            const MonoDelta& timeout,
1090
1
                            vector<string>* tablet_ids) {
1091
1
  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
1092
1
  RETURN_NOT_OK(ListTablets(ts, timeout, &tablets));
1093
1
  tablet_ids->clear();
1094
1
  for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
1095
1
    if (t.tablet_status().state() == tablet::RUNNING) {
1096
1
      tablet_ids->push_back(t.tablet_status().tablet_id());
1097
1
    }
1098
1
  }
1099
1
  return Status::OK();
1100
1
}
1101
1102
Status GetTabletLocations(ExternalMiniCluster* cluster,
1103
                          const string& tablet_id,
1104
                          const MonoDelta& timeout,
1105
71
                          master::TabletLocationsPB* tablet_locations) {
1106
71
  master::GetTabletLocationsResponsePB resp;
1107
71
  master::GetTabletLocationsRequestPB req;
1108
71
  *req.add_tablet_ids() = tablet_id;
1109
71
  rpc::RpcController rpc;
1110
71
  rpc.set_timeout(timeout);
1111
71
  RETURN_NOT_OK(cluster->GetMasterProxy<master::MasterClientProxy>().GetTabletLocations(
1112
71
      req, &resp, &rpc));
1113
71
  if (resp.has_error()) {
1114
0
    return StatusFromPB(resp.error().status());
1115
0
  }
1116
71
  if (resp.errors_size() > 0) {
1117
0
    CHECK_EQ(1, resp.errors_size()) << resp.ShortDebugString();
1118
0
    return StatusFromPB(resp.errors(0).status());
1119
0
  }
1120
0
  CHECK_EQ(1, resp.tablet_locations_size()) << resp.ShortDebugString();
1121
71
  *tablet_locations = resp.tablet_locations(0);
1122
71
  return Status::OK();
1123
71
}
1124
1125
Status GetTableLocations(ExternalMiniCluster* cluster,
1126
                         const YBTableName& table_name,
1127
                         const MonoDelta& timeout,
1128
                         const RequireTabletsRunning require_tablets_running,
1129
114
                         master::GetTableLocationsResponsePB* table_locations) {
1130
114
  master::GetTableLocationsRequestPB req;
1131
114
  table_name.SetIntoTableIdentifierPB(req.mutable_table());
1132
114
  req.set_require_tablets_running(require_tablets_running);
1133
114
  req.set_max_returned_locations(std::numeric_limits<int32_t>::max());
1134
114
  rpc::RpcController rpc;
1135
114
  rpc.set_timeout(timeout);
1136
114
  RETURN_NOT_OK(cluster->GetMasterProxy<master::MasterClientProxy>().GetTableLocations(
1137
114
      req, table_locations, &rpc));
1138
114
  if (table_locations->has_error()) {
1139
0
    return StatusFromPB(table_locations->error().status());
1140
0
  }
1141
114
  return Status::OK();
1142
114
}
1143
1144
Status WaitForNumVotersInConfigOnMaster(
1145
    ExternalMiniCluster* cluster,
1146
    const std::string& tablet_id,
1147
    int num_voters,
1148
0
    const MonoDelta& timeout) {
1149
0
  Status s;
1150
0
  MonoTime deadline = MonoTime::Now();
1151
0
  deadline.AddDelta(timeout);
1152
0
  int num_voters_found = 0;
1153
0
  while (true) {
1154
0
    TabletLocationsPB tablet_locations;
1155
0
    MonoDelta time_remaining = deadline.GetDeltaSince(MonoTime::Now());
1156
0
    s = GetTabletLocations(cluster, tablet_id, time_remaining, &tablet_locations);
1157
0
    if (s.ok()) {
1158
0
      num_voters_found = 0;
1159
0
      for (const TabletLocationsPB::ReplicaPB& r : tablet_locations.replicas()) {
1160
0
        if (r.role() == PeerRole::LEADER || r.role() == PeerRole::FOLLOWER) num_voters_found++;
1161
0
      }
1162
0
      if (num_voters_found == num_voters) break;
1163
0
    }
1164
0
    if (deadline.ComesBefore(MonoTime::Now())) break;
1165
0
    SleepFor(MonoDelta::FromMilliseconds(10));
1166
0
  }
1167
0
  RETURN_NOT_OK(s);
1168
0
  if (num_voters_found != num_voters) {
1169
0
    return STATUS(IllegalState,
1170
0
        Substitute("Did not find exactly $0 voters, found $1 voters",
1171
0
                   num_voters, num_voters_found));
1172
0
  }
1173
0
  return Status::OK();
1174
0
}
1175
1176
Status WaitForNumTabletsOnTS(TServerDetails* ts,
1177
                             size_t count,
1178
                             const MonoDelta& timeout,
1179
7
                             vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) {
1180
7
  Status s;
1181
7
  MonoTime deadline = MonoTime::Now();
1182
7
  deadline.AddDelta(timeout);
1183
7
  while (true) {
1184
7
    s = ListTablets(ts, MonoDelta::FromSeconds(10), tablets);
1185
7
    if (s.ok() && tablets->size() == count) break;
1186
0
    if (deadline.ComesBefore(MonoTime::Now())) break;
1187
0
    SleepFor(MonoDelta::FromMilliseconds(10));
1188
0
  }
1189
7
  RETURN_NOT_OK(s);
1190
7
  if (tablets->size() != count) {
1191
0
    return STATUS(IllegalState,
1192
0
        Substitute("Did not find exactly $0 tablets, found $1 tablets",
1193
0
                   count, tablets->size()));
1194
0
  }
1195
7
  return Status::OK();
1196
7
}
1197
1198
Status WaitUntilTabletInState(TServerDetails* ts,
1199
                              const std::string& tablet_id,
1200
                              tablet::RaftGroupStatePB state,
1201
                              const MonoDelta& timeout,
1202
177
                              const MonoDelta& list_tablets_timeout) {
1203
177
  MonoTime start = MonoTime::Now();
1204
177
  MonoTime deadline = start;
1205
177
  deadline.AddDelta(timeout);
1206
177
  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
1207
177
  Status s;
1208
177
  tablet::RaftGroupStatePB last_state = tablet::UNKNOWN;
1209
177
  while (true) {
1210
177
    s = ListTablets(ts, list_tablets_timeout, &tablets);
1211
177
    if (s.ok()) {
1212
177
      bool seen = false;
1213
1.58k
      for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
1214
1.58k
        if (t.tablet_status().tablet_id() == tablet_id) {
1215
177
          seen = true;
1216
177
          last_state = t.tablet_status().state();
1217
177
          if (last_state == state) {
1218
177
            return Status::OK();
1219
177
          }
1220
177
        }
1221
1.58k
      }
1222
0
      if (!seen) {
1223
0
        s = STATUS(NotFound, "Tablet " + tablet_id + " not found");
1224
0
      }
1225
0
    }
1226
0
    if (deadline.ComesBefore(MonoTime::Now())) {
1227
0
      break;
1228
0
    }
1229
0
    SleepFor(MonoDelta::FromMilliseconds(10));
1230
0
  }
1231
0
  return STATUS(TimedOut, Substitute("T $0 P $1: Tablet not in $2 state after $3: "
1232
177
                                     "Tablet state: $4, Status message: $5",
1233
177
                                     tablet_id, ts->uuid(),
1234
177
                                     tablet::RaftGroupStatePB_Name(state),
1235
177
                                     MonoTime::Now().GetDeltaSince(start).ToString(),
1236
177
                                     tablet::RaftGroupStatePB_Name(last_state), s.ToString()));
1237
177
}
1238
1239
// Wait until the specified tablet is in RUNNING state.
1240
Status WaitUntilTabletRunning(TServerDetails* ts,
1241
                              const std::string& tablet_id,
1242
177
                              const MonoDelta& timeout) {
1243
177
  return WaitUntilTabletInState(ts, tablet_id, tablet::RUNNING, timeout);
1244
177
}
1245
1246
Status DeleteTablet(const TServerDetails* ts,
1247
                    const std::string& tablet_id,
1248
                    const tablet::TabletDataState delete_type,
1249
                    const boost::optional<int64_t>& cas_config_opid_index_less_or_equal,
1250
                    const MonoDelta& timeout,
1251
11
                    tserver::TabletServerErrorPB::Code* error_code) {
1252
11
  DeleteTabletRequestPB req;
1253
11
  DeleteTabletResponsePB resp;
1254
11
  RpcController rpc;
1255
11
  rpc.set_timeout(timeout);
1256
1257
11
  req.set_dest_uuid(ts->uuid());
1258
11
  req.set_tablet_id(tablet_id);
1259
11
  req.set_delete_type(delete_type);
1260
11
  if (cas_config_opid_index_less_or_equal) {
1261
0
    req.set_cas_config_opid_index_less_or_equal(*cas_config_opid_index_less_or_equal);
1262
0
  }
1263
1264
11
  RETURN_NOT_OK(ts->tserver_admin_proxy->DeleteTablet(req, &resp, &rpc));
1265
11
  if (resp.has_error()) {
1266
0
    if (error_code) {
1267
0
      *error_code = resp.error().code();
1268
0
    }
1269
0
    return StatusFromPB(resp.error().status());
1270
0
  }
1271
11
  return Status::OK();
1272
11
}
1273
1274
Status StartRemoteBootstrap(const TServerDetails* ts,
1275
                            const string& tablet_id,
1276
                            const string& bootstrap_source_uuid,
1277
                            const HostPort& bootstrap_source_addr,
1278
                            int64_t caller_term,
1279
0
                            const MonoDelta& timeout) {
1280
0
  consensus::StartRemoteBootstrapRequestPB req;
1281
0
  consensus::StartRemoteBootstrapResponsePB resp;
1282
0
  RpcController rpc;
1283
0
  rpc.set_timeout(timeout);
1284
1285
0
  req.set_dest_uuid(ts->uuid());
1286
0
  req.set_tablet_id(tablet_id);
1287
0
  req.set_bootstrap_peer_uuid(bootstrap_source_uuid);
1288
0
  HostPortToPB(bootstrap_source_addr, req.mutable_source_private_addr()->Add());
1289
0
  req.set_caller_term(caller_term);
1290
1291
0
  RETURN_NOT_OK(ts->consensus_proxy->StartRemoteBootstrap(req, &resp, &rpc));
1292
0
  if (resp.has_error()) {
1293
0
    return StatusFromPB(resp.error().status());
1294
0
  }
1295
0
  return Status::OK();
1296
0
}
1297
1298
Status GetLastOpIdForMasterReplica(
1299
    const shared_ptr<consensus::ConsensusServiceProxy>& consensus_proxy,
1300
    const string& tablet_id,
1301
    const string& dest_uuid,
1302
    const consensus::OpIdType opid_type,
1303
    const MonoDelta& timeout,
1304
42
    OpIdPB* opid) {
1305
42
  GetLastOpIdRequestPB opid_req;
1306
42
  GetLastOpIdResponsePB opid_resp;
1307
42
  RpcController controller;
1308
42
  controller.Reset();
1309
42
  controller.set_timeout(timeout);
1310
1311
42
  opid_req.set_dest_uuid(dest_uuid);
1312
42
  opid_req.set_tablet_id(tablet_id);
1313
42
  opid_req.set_opid_type(opid_type);
1314
1315
42
  Status s = consensus_proxy->GetLastOpId(opid_req, &opid_resp, &controller);
1316
42
  if (!s.ok()) {
1317
0
    return STATUS(InvalidArgument, Substitute(
1318
0
        "Failed to fetch opid type $0 from master uuid $1 with error : $2",
1319
0
        opid_type, dest_uuid, s.ToString()));
1320
0
  }
1321
42
  if (opid_resp.has_error()) {
1322
0
    return StatusFromPB(opid_resp.error().status());
1323
0
  }
1324
1325
42
  *opid = opid_resp.opid();
1326
1327
42
  return Status::OK();
1328
42
}
1329
1330
Result<OpId> GetLastOpIdForReplica(
1331
    const TabletId& tablet_id,
1332
    TServerDetails* replica,
1333
    consensus::OpIdType opid_type,
1334
30
    const MonoDelta& timeout) {
1335
30
  return VERIFY_RESULT(GetLastOpIdForEachReplica(tablet_id, {replica}, opid_type, timeout))[0];
1336
30
}
1337
1338
} // namespace itest
1339
} // namespace yb