/Users/deen/code/yugabyte-db/src/yb/integration-tests/cluster_itest_util.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/integration-tests/cluster_itest_util.h" |
34 | | |
35 | | #include <stdint.h> |
36 | | |
37 | | #include <algorithm> |
38 | | #include <limits> |
39 | | #include <memory> |
40 | | #include <mutex> |
41 | | #include <string> |
42 | | #include <utility> |
43 | | #include <vector> |
44 | | |
45 | | #include <boost/optional.hpp> |
46 | | #include <glog/stl_logging.h> |
47 | | #include <gtest/gtest.h> |
48 | | |
49 | | #include "yb/client/schema.h" |
50 | | #include "yb/client/yb_table_name.h" |
51 | | |
52 | | #include "yb/common/wire_protocol-test-util.h" |
53 | | #include "yb/common/wire_protocol.h" |
54 | | #include "yb/common/wire_protocol.pb.h" |
55 | | |
56 | | #include "yb/consensus/consensus.proxy.h" |
57 | | #include "yb/consensus/consensus_meta.h" |
58 | | #include "yb/consensus/consensus_types.pb.h" |
59 | | #include "yb/consensus/opid_util.h" |
60 | | #include "yb/consensus/quorum_util.h" |
61 | | |
62 | | #include "yb/gutil/strings/substitute.h" |
63 | | |
64 | | #include "yb/integration-tests/external_mini_cluster.h" |
65 | | |
66 | | #include "yb/master/master_client.proxy.h" |
67 | | #include "yb/master/master_cluster.proxy.h" |
68 | | |
69 | | #include "yb/rpc/rpc_fwd.h" |
70 | | |
71 | | #include "yb/server/server_base.proxy.h" |
72 | | |
73 | | #include "yb/tserver/tablet_server_test_util.h" |
74 | | #include "yb/tserver/tserver_admin.proxy.h" |
75 | | #include "yb/tserver/tserver_service.pb.h" |
76 | | #include "yb/tserver/tserver_service.proxy.h" |
77 | | |
78 | | #include "yb/util/enums.h" |
79 | | #include "yb/util/format.h" |
80 | | #include "yb/util/logging.h" |
81 | | #include "yb/util/monotime.h" |
82 | | #include "yb/util/net/net_fwd.h" |
83 | | #include "yb/util/net/net_util.h" |
84 | | #include "yb/util/result.h" |
85 | | #include "yb/util/status.h" |
86 | | #include "yb/util/status_format.h" |
87 | | #include "yb/util/status_log.h" |
88 | | #include "yb/util/strongly_typed_bool.h" |
89 | | #include "yb/util/test_util.h" |
90 | | |
91 | | namespace yb { |
92 | | namespace itest { |
93 | | |
94 | | using client::YBClient; |
95 | | using client::YBSchema; |
96 | | using client::YBSchemaBuilder; |
97 | | using client::YBTable; |
98 | | using client::YBTableName; |
99 | | using consensus::CONSENSUS_CONFIG_ACTIVE; |
100 | | using consensus::CONSENSUS_CONFIG_COMMITTED; |
101 | | using consensus::ChangeConfigRequestPB; |
102 | | using consensus::ChangeConfigResponsePB; |
103 | | using consensus::ConsensusConfigType; |
104 | | using consensus::ConsensusStatePB; |
105 | | using consensus::CountVoters; |
106 | | using consensus::GetConsensusStateRequestPB; |
107 | | using consensus::GetConsensusStateResponsePB; |
108 | | using consensus::GetLastOpIdRequestPB; |
109 | | using consensus::GetLastOpIdResponsePB; |
110 | | using consensus::LeaderStepDownRequestPB; |
111 | | using consensus::LeaderStepDownResponsePB; |
112 | | using consensus::RaftPeerPB; |
113 | | using consensus::RunLeaderElectionResponsePB; |
114 | | using consensus::RunLeaderElectionRequestPB; |
115 | | using consensus::kInvalidOpIdIndex; |
116 | | using consensus::LeaderLeaseCheckMode; |
117 | | using consensus::LeaderLeaseStatus; |
118 | | using master::ListTabletServersResponsePB; |
119 | | using master::TabletLocationsPB; |
120 | | using rpc::Messenger; |
121 | | using rpc::RpcController; |
122 | | using std::min; |
123 | | using std::shared_ptr; |
124 | | using std::string; |
125 | | using std::unordered_map; |
126 | | using std::vector; |
127 | | using strings::Substitute; |
128 | | using tserver::CreateTsClientProxies; |
129 | | using tserver::ListTabletsResponsePB; |
130 | | using tserver::DeleteTabletRequestPB; |
131 | | using tserver::DeleteTabletResponsePB; |
132 | | using tserver::TabletServerAdminServiceProxy; |
133 | | using tserver::TabletServerErrorPB; |
134 | | using tserver::TabletServerServiceProxy; |
135 | | using tserver::WriteRequestPB; |
136 | | using tserver::WriteResponsePB; |
137 | | |
138 | 689 | TServerDetails::TServerDetails() : registration(new master::TSRegistrationPB) { |
139 | 689 | } |
140 | | |
141 | 457 | TServerDetails::~TServerDetails() = default; |
142 | | |
143 | 5.47k | const string& TServerDetails::uuid() const { |
144 | 5.47k | return instance_id.permanent_uuid(); |
145 | 5.47k | } |
146 | | |
147 | 799 | std::string TServerDetails::ToString() const { |
148 | 799 | return Format("TabletServer: $0, Rpc address: $1", instance_id.permanent_uuid(), |
149 | 799 | DesiredHostPort(registration->common(), CloudInfoPB())); |
150 | 799 | } |
151 | | |
152 | 1 | client::YBSchema SimpleIntKeyYBSchema() { |
153 | 1 | YBSchema s; |
154 | 1 | YBSchemaBuilder b; |
155 | 1 | b.AddColumn("key")->Type(INT32)->NotNull()->PrimaryKey(); |
156 | 1 | CHECK_OK(b.Build(&s)); |
157 | 1 | return s; |
158 | 1 | } |
159 | | |
160 | | Result<std::vector<OpId>> GetLastOpIdForEachReplica( |
161 | | const TabletId& tablet_id, |
162 | | const std::vector<TServerDetails*>& replicas, |
163 | | consensus::OpIdType opid_type, |
164 | | const MonoDelta& timeout, |
165 | 188 | consensus::OperationType op_type) { |
166 | 188 | struct Getter { |
167 | 188 | const TabletId& tablet_id; |
168 | 188 | consensus::OpIdType opid_type; |
169 | 188 | consensus::OperationType op_type; |
170 | | |
171 | 405 | Result<OpId> operator()(TServerDetails* ts, rpc::RpcController* controller) const { |
172 | 405 | GetLastOpIdRequestPB opid_req; |
173 | 405 | GetLastOpIdResponsePB opid_resp; |
174 | 405 | opid_req.set_tablet_id(tablet_id); |
175 | 405 | opid_req.set_dest_uuid(ts->uuid()); |
176 | 405 | opid_req.set_opid_type(opid_type); |
177 | 405 | if (op_type != consensus::OperationType::UNKNOWN_OP) { |
178 | 3 | opid_req.set_op_type(op_type); |
179 | 3 | } |
180 | 405 | RETURN_NOT_OK(ts->consensus_proxy->GetLastOpId(opid_req, &opid_resp, controller)); |
181 | 379 | return OpId::FromPB(opid_resp.opid()); |
182 | 405 | } |
183 | 188 | }; |
184 | | |
185 | 188 | return GetForEachReplica( |
186 | 188 | replicas, timeout, |
187 | 188 | Getter{.tablet_id = tablet_id, .opid_type = opid_type, .op_type = op_type}); |
188 | 188 | } |
189 | | |
190 | 169 | vector<TServerDetails*> TServerDetailsVector(const TabletServerMap& tablet_servers) { |
191 | 169 | vector<TServerDetails*> result; |
192 | 169 | result.reserve(tablet_servers.size()); |
193 | 519 | for (auto& pair : tablet_servers) { |
194 | 519 | result.push_back(pair.second.get()); |
195 | 519 | } |
196 | 169 | return result; |
197 | 169 | } |
198 | | |
199 | 53 | vector<TServerDetails*> TServerDetailsVector(const TabletServerMapUnowned& tablet_servers) { |
200 | 53 | vector<TServerDetails*> result; |
201 | 53 | result.reserve(tablet_servers.size()); |
202 | 118 | for (auto& pair : tablet_servers) { |
203 | 118 | result.push_back(pair.second); |
204 | 118 | } |
205 | 53 | return result; |
206 | 53 | } |
207 | | |
208 | | TabletServerMapUnowned CreateTabletServerMapUnowned(const TabletServerMap& tablet_servers, |
209 | 13 | const std::set<std::string>& exclude) { |
210 | 13 | TabletServerMapUnowned result; |
211 | 39 | for (auto& pair : tablet_servers) { |
212 | 39 | if (exclude.find(pair.first) != exclude.end()) { |
213 | 0 | continue; |
214 | 0 | } |
215 | 39 | result.emplace(pair.first, pair.second.get()); |
216 | 39 | } |
217 | 13 | return result; |
218 | 13 | } |
219 | | |
220 | | Status WaitForOpFromCurrentTerm(TServerDetails* replica, |
221 | | const string& tablet_id, |
222 | | consensus::OpIdType opid_type, |
223 | | const MonoDelta& timeout, |
224 | 7 | OpId* opid) { |
225 | 7 | const MonoTime kStart = MonoTime::Now(); |
226 | 7 | const MonoTime kDeadline = kStart + timeout; |
227 | | |
228 | 7 | Status s; |
229 | 31 | while (MonoTime::Now() < kDeadline) { |
230 | 31 | ConsensusStatePB cstate; |
231 | 31 | s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_ACTIVE, kDeadline - MonoTime::Now(), |
232 | 31 | &cstate); |
233 | 31 | if (s.ok()) { |
234 | 7 | Result<OpId> tmp_opid = |
235 | 7 | GetLastOpIdForReplica(tablet_id, replica, opid_type, kDeadline - MonoTime::Now()); |
236 | 7 | if (tmp_opid) { |
237 | 7 | if (tmp_opid->term == cstate.current_term()) { |
238 | 7 | if (opid) { |
239 | 1 | *opid = *tmp_opid; |
240 | 1 | } |
241 | 7 | return Status::OK(); |
242 | 7 | } |
243 | 0 | s = STATUS(IllegalState, Substitute("Terms don't match. Current term: $0. Latest OpId: $1", |
244 | 0 | cstate.current_term(), tmp_opid->ToString())); |
245 | 0 | } |
246 | 7 | } |
247 | 24 | SleepFor(MonoDelta::FromMilliseconds(10)); |
248 | 24 | } |
249 | | |
250 | 0 | return STATUS(TimedOut, Substitute("Timed out after $0 waiting for op from current term: $1", |
251 | 7 | (MonoTime::Now() - kStart).ToString(), |
252 | 7 | s.ToString())); |
253 | 7 | } |
254 | | |
255 | | Status WaitForServersToAgree(const MonoDelta& timeout, |
256 | | const TabletServerMap& tablet_servers, |
257 | | const string& tablet_id, |
258 | | int64_t minimum_index, |
259 | | int64_t* actual_index, |
260 | 28 | MustBeCommitted must_be_committed) { |
261 | 28 | return WaitForServersToAgree(timeout, |
262 | 28 | TServerDetailsVector(tablet_servers), |
263 | 28 | tablet_id, |
264 | 28 | minimum_index, |
265 | 28 | actual_index, |
266 | 28 | must_be_committed); |
267 | 28 | } |
268 | | |
269 | | Status WaitForServersToAgree(const MonoDelta& timeout, |
270 | | const TabletServerMapUnowned& tablet_servers, |
271 | | const TabletId& tablet_id, |
272 | | int64_t minimum_index, |
273 | | int64_t* actual_index, |
274 | 41 | MustBeCommitted must_be_committed) { |
275 | 41 | return WaitForServersToAgree( |
276 | 41 | timeout, TServerDetailsVector(tablet_servers), tablet_id, minimum_index, actual_index, |
277 | 41 | must_be_committed); |
278 | 41 | } |
279 | | |
280 | | Status WaitForServersToAgree(const MonoDelta& timeout, |
281 | | const vector<TServerDetails*>& servers, |
282 | | const string& tablet_id, |
283 | | int64_t minimum_index, |
284 | | int64_t* actual_index, |
285 | 69 | MustBeCommitted must_be_committed) { |
286 | 69 | auto deadline = CoarseMonoClock::Now() + timeout; |
287 | 69 | if (actual_index != nullptr) { |
288 | 2 | *actual_index = 0; |
289 | 2 | } |
290 | | |
291 | 69 | vector<consensus::OpIdType> opid_types{consensus::OpIdType::RECEIVED_OPID}; |
292 | 69 | if (must_be_committed) { |
293 | | // In this mode we require that last received and committed op ids from all servers converge |
294 | | // on the same value. |
295 | 1 | opid_types.push_back(consensus::OpIdType::COMMITTED_OPID); |
296 | 1 | } |
297 | | |
298 | 69 | Status last_non_ok_status; |
299 | 69 | vector<OpId> received_ids; |
300 | 69 | vector<OpId> committed_ids; |
301 | | |
302 | 142 | for (int attempt = 1; CoarseMonoClock::Now() < deadline; attempt++73 ) { |
303 | 140 | vector<OpId> ids; |
304 | | |
305 | 140 | Status s; |
306 | 143 | for (auto opid_type : opid_types) { |
307 | 143 | auto ids_of_this_type = GetLastOpIdForEachReplica(tablet_id, servers, opid_type, timeout); |
308 | 143 | if (!ids_of_this_type.ok()) { |
309 | 26 | s = ids_of_this_type.status(); |
310 | 26 | break; |
311 | 26 | } |
312 | 117 | if (opid_type == consensus::OpIdType::RECEIVED_OPID) { |
313 | 115 | received_ids = *ids_of_this_type; |
314 | 115 | } else { |
315 | 2 | committed_ids = *ids_of_this_type; |
316 | 2 | } |
317 | 117 | std::copy(ids_of_this_type->begin(), ids_of_this_type->end(), std::back_inserter(ids)); |
318 | 117 | } |
319 | | |
320 | 140 | if (s.ok()) { |
321 | 114 | int64_t cur_index = kInvalidOpIdIndex; |
322 | 114 | bool any_behind = false; |
323 | 114 | bool any_disagree = false; |
324 | 234 | for (const OpId& id : ids) { |
325 | 234 | if (cur_index == kInvalidOpIdIndex) { |
326 | 114 | cur_index = id.index; |
327 | 114 | } |
328 | 234 | if (id.index != cur_index) { |
329 | 22 | any_disagree = true; |
330 | 22 | break; |
331 | 22 | } |
332 | 212 | if (id.index < minimum_index) { |
333 | 25 | any_behind = true; |
334 | 25 | break; |
335 | 25 | } |
336 | 212 | } |
337 | 114 | if (!any_behind && !any_disagree89 ) { |
338 | 67 | LOG(INFO) << "All servers converged on OpIds: " << ids; |
339 | 67 | if (actual_index != nullptr) { |
340 | 2 | *actual_index = cur_index; |
341 | 2 | } |
342 | 67 | return Status::OK(); |
343 | 67 | } |
344 | 114 | } else { |
345 | 26 | LOG(WARNING) << "Got error getting last opid for each replica: " << s.ToString(); |
346 | 26 | last_non_ok_status = s; |
347 | 26 | } |
348 | | |
349 | 73 | LOG(INFO) << "Not converged past " << minimum_index << " yet: " << ids; |
350 | 73 | SleepFor(MonoDelta::FromMilliseconds(min(attempt * 100, 1000))); |
351 | 73 | } |
352 | 2 | return STATUS_FORMAT( |
353 | 69 | TimedOut, |
354 | 69 | "All replicas of tablet $0 could not converge on an index of at least $1 after $2. " |
355 | 69 | "must_be_committed=$3. Latest received ids: $3, committed ids: $4", |
356 | 69 | tablet_id, minimum_index, timeout, must_be_committed, received_ids, committed_ids); |
357 | 69 | } |
358 | | |
359 | | // Wait until all specified replicas have logged the given index. |
360 | | Status WaitUntilAllReplicasHaveOp(const int64_t log_index, |
361 | | const string& tablet_id, |
362 | | const vector<TServerDetails*>& replicas, |
363 | | const MonoDelta& timeout, |
364 | 1 | int64_t* actual_minimum_index) { |
365 | 1 | MonoTime start = MonoTime::Now(); |
366 | 1 | MonoDelta passed = MonoDelta::FromMilliseconds(0); |
367 | 3 | while (true) { |
368 | 3 | auto op_ids = GetLastOpIdForEachReplica(tablet_id, replicas, consensus::RECEIVED_OPID, timeout); |
369 | 3 | if (op_ids.ok()) { |
370 | 3 | if (actual_minimum_index != nullptr) { |
371 | 3 | *actual_minimum_index = std::numeric_limits<int64_t>::max(); |
372 | 3 | } |
373 | | |
374 | 3 | bool any_behind = false; |
375 | 9 | for (const OpId& op_id : *op_ids) { |
376 | 9 | if (actual_minimum_index != nullptr) { |
377 | 9 | *actual_minimum_index = std::min(*actual_minimum_index, op_id.index); |
378 | 9 | } |
379 | | |
380 | 9 | if (op_id.index < log_index) { |
381 | 2 | any_behind = true; |
382 | 2 | break; |
383 | 2 | } |
384 | 9 | } |
385 | 3 | if (!any_behind) return Status::OK()1 ; |
386 | 3 | } else { |
387 | 0 | LOG(WARNING) << "Got error getting last opid for each replica: " << op_ids.ToString(); |
388 | 0 | } |
389 | 2 | passed = MonoTime::Now().GetDeltaSince(start); |
390 | 2 | if (passed.MoreThan(timeout)) { |
391 | 0 | break; |
392 | 0 | } |
393 | 2 | SleepFor(MonoDelta::FromMilliseconds(50)); |
394 | 2 | } |
395 | 0 | string replicas_str; |
396 | 0 | for (const TServerDetails* replica : replicas) { |
397 | 0 | if (!replicas_str.empty()) replicas_str += ", "; |
398 | 0 | replicas_str += "{ " + replica->ToString() + " }"; |
399 | 0 | } |
400 | 0 | return STATUS(TimedOut, Substitute("Index $0 not available on all replicas after $1. " |
401 | 1 | "Replicas: [ $2 ]", |
402 | 1 | log_index, passed.ToString())); |
403 | 1 | } |
404 | | |
405 | | Status WaitUntilNumberOfAliveTServersEqual(int n_tservers, |
406 | | const master::MasterClusterProxy& master_proxy, |
407 | 1 | const MonoDelta& timeout) { |
408 | | |
409 | 1 | master::ListTabletServersRequestPB req; |
410 | 1 | master::ListTabletServersResponsePB resp; |
411 | 1 | rpc::RpcController controller; |
412 | 1 | controller.set_timeout(timeout); |
413 | | |
414 | | // The field primary_only means only tservers that are alive (tservers that have sent at least on |
415 | | // heartbeat in the last FLAG_tserver_unresponsive_timeout_ms milliseconds.) |
416 | 1 | req.set_primary_only(true); |
417 | | |
418 | 1 | MonoTime start = MonoTime::Now(); |
419 | 1 | MonoDelta passed = MonoDelta::FromMilliseconds(0); |
420 | 87 | while (true) { |
421 | 87 | Status s = master_proxy.ListTabletServers(req, &resp, &controller); |
422 | | |
423 | 87 | if (s.ok() && |
424 | 87 | controller.status().ok() && |
425 | 87 | !resp.has_error()) { |
426 | 87 | if (resp.servers_size() == n_tservers) { |
427 | 1 | passed = MonoTime::Now().GetDeltaSince(start); |
428 | 1 | return Status::OK(); |
429 | 1 | } |
430 | 87 | } else { |
431 | 0 | string error; |
432 | 0 | if (!s.ok()) { |
433 | 0 | error = s.ToString(); |
434 | 0 | } else if (!controller.status().ok()) { |
435 | 0 | error = controller.status().ToString(); |
436 | 0 | } else { |
437 | 0 | error = resp.error().ShortDebugString(); |
438 | 0 | } |
439 | 0 | LOG(WARNING) << "Got error getting list of tablet servers: " << error; |
440 | 0 | } |
441 | 86 | passed = MonoTime::Now().GetDeltaSince(start); |
442 | 86 | if (passed.MoreThan(timeout)) { |
443 | 0 | break; |
444 | 0 | } |
445 | 86 | SleepFor(MonoDelta::FromMilliseconds(50)); |
446 | 86 | controller.Reset(); |
447 | 86 | } |
448 | 0 | return STATUS(TimedOut, Substitute("Number of alive tservers not equal to $0 after $1 ms. ", |
449 | 1 | n_tservers, timeout.ToMilliseconds())); |
450 | 1 | } |
451 | | |
452 | 223 | Result<TabletServerMap> CreateTabletServerMap(ExternalMiniCluster* cluster) { |
453 | 223 | auto proxy = cluster->num_masters() > 1 |
454 | 223 | ? cluster->GetLeaderMasterProxy<master::MasterClusterProxy>()50 |
455 | 223 | : cluster->GetMasterProxy<master::MasterClusterProxy>()173 ; |
456 | 223 | return CreateTabletServerMap(proxy, &cluster->proxy_cache()); |
457 | 223 | } |
458 | | |
459 | | Result<TabletServerMap> CreateTabletServerMap( |
460 | 223 | const master::MasterClusterProxy& proxy, rpc::ProxyCache* proxy_cache) { |
461 | 223 | master::ListTabletServersRequestPB req; |
462 | 223 | master::ListTabletServersResponsePB resp; |
463 | 223 | rpc::RpcController controller; |
464 | | |
465 | 223 | RETURN_NOT_OK(proxy.ListTabletServers(req, &resp, &controller)); |
466 | 223 | RETURN_NOT_OK(controller.status()); |
467 | 223 | if (resp.has_error()) { |
468 | 0 | return STATUS(RemoteError, "Response had an error", resp.error().ShortDebugString()); |
469 | 0 | } |
470 | | |
471 | 223 | TabletServerMap result; |
472 | 689 | for (const ListTabletServersResponsePB::Entry& entry : resp.servers()) { |
473 | 689 | HostPort host_port = HostPortFromPB(DesiredHostPort( |
474 | 689 | entry.registration().common(), CloudInfoPB())); |
475 | | |
476 | 689 | std::unique_ptr<TServerDetails> peer(new TServerDetails()); |
477 | 689 | peer->instance_id.CopyFrom(entry.instance_id()); |
478 | 689 | peer->registration->CopyFrom(entry.registration()); |
479 | | |
480 | 689 | CreateTsClientProxies(host_port, |
481 | 689 | proxy_cache, |
482 | 689 | &peer->tserver_proxy, |
483 | 689 | &peer->tserver_admin_proxy, |
484 | 689 | &peer->consensus_proxy, |
485 | 689 | &peer->generic_proxy); |
486 | | |
487 | 689 | const auto& key = peer->instance_id.permanent_uuid(); |
488 | 689 | CHECK(result.emplace(key, std::move(peer)).second) << "duplicate key: " << key0 ; |
489 | 689 | } |
490 | 223 | return result; |
491 | 223 | } |
492 | | |
493 | | Status GetConsensusState(const TServerDetails* replica, |
494 | | const string& tablet_id, |
495 | | consensus::ConsensusConfigType type, |
496 | | const MonoDelta& timeout, |
497 | | ConsensusStatePB* consensus_state, |
498 | 3.80k | LeaderLeaseStatus* leader_lease_status) { |
499 | 3.80k | DCHECK_ONLY_NOTNULL(replica); |
500 | | |
501 | 3.80k | GetConsensusStateRequestPB req; |
502 | 3.80k | GetConsensusStateResponsePB resp; |
503 | 3.80k | RpcController controller; |
504 | 3.80k | controller.set_timeout(timeout); |
505 | 3.80k | req.set_dest_uuid(replica->uuid()); |
506 | 3.80k | req.set_tablet_id(tablet_id); |
507 | 3.80k | req.set_type(type); |
508 | | |
509 | 3.80k | RETURN_NOT_OK(replica->consensus_proxy->GetConsensusState(req, &resp, &controller)); |
510 | 3.66k | if (resp.has_error()) { |
511 | 181 | return StatusFromPB(resp.error().status()); |
512 | 181 | } |
513 | 3.48k | *consensus_state = resp.cstate(); |
514 | 3.48k | if (leader_lease_status) { |
515 | 2.72k | *leader_lease_status = resp.has_leader_lease_status() ? |
516 | 2.72k | resp.leader_lease_status() : |
517 | 2.72k | LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE0 ; // Could be anything but HAS_LEASE. |
518 | 2.72k | } |
519 | 3.48k | return Status::OK(); |
520 | 3.66k | } |
521 | | |
522 | | Status WaitUntilCommittedConfigNumVotersIs(size_t config_size, |
523 | | const TServerDetails* replica, |
524 | | const std::string& tablet_id, |
525 | 122 | const MonoDelta& timeout) { |
526 | 122 | return WaitUntilCommittedConfigMemberTypeIs(config_size, replica, tablet_id, timeout, |
527 | 122 | consensus::PeerMemberType::VOTER); |
528 | 122 | } |
529 | | |
530 | | Status WaitUntilCommittedConfigMemberTypeIs(size_t config_size, |
531 | | const TServerDetails* replica, |
532 | | const std::string& tablet_id, |
533 | | const MonoDelta& timeout, |
534 | 128 | consensus::PeerMemberType member_type) { |
535 | 128 | DCHECK_ONLY_NOTNULL(replica); |
536 | | |
537 | 128 | MonoTime start = MonoTime::Now(); |
538 | 128 | MonoTime deadline = start + timeout; |
539 | | |
540 | 128 | int backoff_exp = 0; |
541 | 128 | const int kMaxBackoffExp = 7; |
542 | 128 | Status s; |
543 | 128 | ConsensusStatePB cstate; |
544 | 919 | while (true) { |
545 | 919 | MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now()); |
546 | 919 | s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED, |
547 | 919 | remaining_timeout, &cstate); |
548 | 919 | if (s.ok()) { |
549 | 744 | if (CountMemberType(cstate.config(), member_type) == config_size) { |
550 | 128 | return Status::OK(); |
551 | 128 | } |
552 | 616 | LOG(INFO) << "Got " << yb::ToString(cstate) << " from " << replica->ToString(); |
553 | 616 | } else { |
554 | 175 | LOG(INFO) << "Got " << s.ToString() << " from " << replica->ToString(); |
555 | 175 | } |
556 | | |
557 | 791 | if (MonoTime::Now().GetDeltaSince(start).MoreThan(timeout)) { |
558 | 0 | break; |
559 | 0 | } |
560 | 791 | SleepFor(MonoDelta::FromMilliseconds(1LLU << backoff_exp)); |
561 | 791 | backoff_exp = min(backoff_exp + 1, kMaxBackoffExp); |
562 | 791 | } |
563 | 0 | return STATUS(TimedOut, Substitute("Number of replicas of type $0 does not equal $1 after " |
564 | 128 | "waiting for $2. Last consensus state: $3. Last status: $4", |
565 | 128 | PeerMemberType_Name(member_type), config_size, |
566 | 128 | timeout.ToString(), cstate.ShortDebugString(), s.ToString())); |
567 | 128 | } |
568 | | |
569 | | template<class Context> |
570 | | Status WaitUntilCommittedOpIdIndex(TServerDetails* replica, |
571 | | const string& tablet_id, |
572 | | const MonoDelta& timeout, |
573 | | CommittedEntryType type, |
574 | 17 | Context context) { |
575 | 17 | MonoTime start = MonoTime::Now(); |
576 | 17 | MonoTime deadline = start; |
577 | 17 | deadline.AddDelta(timeout); |
578 | | |
579 | 17 | bool config = type == CommittedEntryType::CONFIG; |
580 | 17 | Status s; |
581 | 17 | OpId op_id; |
582 | 17 | ConsensusStatePB cstate; |
583 | 18 | while (true) { |
584 | 18 | MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now()); |
585 | | |
586 | 18 | int64_t op_index = -1; |
587 | 18 | if (config) { |
588 | 0 | s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED, |
589 | 0 | remaining_timeout, &cstate); |
590 | 0 | if (s.ok()) { |
591 | 0 | op_index = cstate.config().opid_index(); |
592 | 0 | } |
593 | 18 | } else { |
594 | 18 | auto last_op_id_result = GetLastOpIdForReplica( |
595 | 18 | tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout); |
596 | 18 | if (last_op_id_result.ok()) { |
597 | 18 | op_id = *last_op_id_result; |
598 | 18 | op_index = op_id.index; |
599 | 18 | } else { |
600 | 0 | s = last_op_id_result.status(); |
601 | 0 | } |
602 | 18 | } |
603 | | |
604 | 18 | if (s.ok() && context.Check(op_index)) { |
605 | 17 | if (config) { |
606 | 0 | LOG(INFO) << "Committed config state is: " << cstate.ShortDebugString() << " for replica: " |
607 | 0 | << replica->instance_id.permanent_uuid(); |
608 | 17 | } else { |
609 | 17 | LOG(INFO) << "Committed op_id index is: " << op_id << " for replica: " |
610 | 17 | << replica->instance_id.permanent_uuid(); |
611 | 17 | } |
612 | 17 | return Status::OK(); |
613 | 17 | } |
614 | 1 | auto passed = MonoTime::Now().GetDeltaSince(start); |
615 | 1 | if (passed.MoreThan(timeout)) { |
616 | 0 | auto name = config ? "config" : "consensus"; |
617 | 0 | auto last_value = config ? cstate.ShortDebugString() : AsString(op_id); |
618 | 0 | return STATUS(TimedOut, |
619 | 0 | Substitute("Committed $0 opid_index does not equal $1 " |
620 | 0 | "after waiting for $2. Last value: $3, Last status: $4", |
621 | 0 | name, |
622 | 0 | context.Desired(), |
623 | 0 | passed.ToString(), |
624 | 0 | last_value, |
625 | 0 | s.ToString())); |
626 | 0 | } |
627 | 1 | if (!config) { |
628 | 1 | LOG(INFO) << "Committed index is at: " << op_id.index << " and not yet at " |
629 | 1 | << context.Desired(); |
630 | 1 | } |
631 | 1 | SleepFor(MonoDelta::FromMilliseconds(100)); |
632 | 1 | } |
633 | 17 | } yb::Status yb::itest::WaitUntilCommittedOpIdIndex<yb::itest::WaitUntilCommittedOpIdIndexIsContext>(yb::itest::TServerDetails*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::MonoDelta const&, yb::itest::CommittedEntryType, yb::itest::WaitUntilCommittedOpIdIndexIsContext) Line | Count | Source | 574 | 16 | Context context) { | 575 | 16 | MonoTime start = MonoTime::Now(); | 576 | 16 | MonoTime deadline = start; | 577 | 16 | deadline.AddDelta(timeout); | 578 | | | 579 | 16 | bool config = type == CommittedEntryType::CONFIG; | 580 | 16 | Status s; | 581 | 16 | OpId op_id; | 582 | 16 | ConsensusStatePB cstate; | 583 | 17 | while (true) { | 584 | 17 | MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now()); | 585 | | | 586 | 17 | int64_t op_index = -1; | 587 | 17 | if (config) { | 588 | 0 | s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED, | 589 | 0 | remaining_timeout, &cstate); | 590 | 0 | if (s.ok()) { | 591 | 0 | op_index = cstate.config().opid_index(); | 592 | 0 | } | 593 | 17 | } else { | 594 | 17 | auto last_op_id_result = GetLastOpIdForReplica( | 595 | 17 | tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout); | 596 | 17 | if (last_op_id_result.ok()) { | 597 | 17 | op_id = *last_op_id_result; | 598 | 17 | op_index = op_id.index; | 599 | 17 | } else { | 600 | 0 | s = last_op_id_result.status(); | 601 | 0 | } | 602 | 17 | } | 603 | | | 604 | 17 | if (s.ok() && context.Check(op_index)) { | 605 | 16 | if (config) { | 606 | 0 | LOG(INFO) << "Committed config state is: " << cstate.ShortDebugString() << " for replica: " | 607 | 0 | << replica->instance_id.permanent_uuid(); | 608 | 16 | } else { | 609 | 16 | LOG(INFO) << "Committed op_id index is: " << op_id << " for replica: " | 610 | 16 | << replica->instance_id.permanent_uuid(); | 611 | 16 | } | 612 | 16 | return Status::OK(); | 613 | 16 | } | 614 | 1 | auto passed = MonoTime::Now().GetDeltaSince(start); | 615 | 1 | if (passed.MoreThan(timeout)) { | 616 | 0 | auto name = config ? "config" : "consensus"; | 617 | 0 | auto last_value = config ? cstate.ShortDebugString() : AsString(op_id); | 618 | 0 | return STATUS(TimedOut, | 619 | 0 | Substitute("Committed $0 opid_index does not equal $1 " | 620 | 0 | "after waiting for $2. Last value: $3, Last status: $4", | 621 | 0 | name, | 622 | 0 | context.Desired(), | 623 | 0 | passed.ToString(), | 624 | 0 | last_value, | 625 | 0 | s.ToString())); | 626 | 0 | } | 627 | 1 | if (!config) { | 628 | 1 | LOG(INFO) << "Committed index is at: " << op_id.index << " and not yet at " | 629 | 1 | << context.Desired(); | 630 | 1 | } | 631 | 1 | SleepFor(MonoDelta::FromMilliseconds(100)); | 632 | 1 | } | 633 | 16 | } |
yb::Status yb::itest::WaitUntilCommittedOpIdIndex<yb::itest::WaitUntilCommittedOpIdIndexIsGreaterThanContext>(yb::itest::TServerDetails*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::MonoDelta const&, yb::itest::CommittedEntryType, yb::itest::WaitUntilCommittedOpIdIndexIsGreaterThanContext) Line | Count | Source | 574 | 1 | Context context) { | 575 | 1 | MonoTime start = MonoTime::Now(); | 576 | 1 | MonoTime deadline = start; | 577 | 1 | deadline.AddDelta(timeout); | 578 | | | 579 | 1 | bool config = type == CommittedEntryType::CONFIG; | 580 | 1 | Status s; | 581 | 1 | OpId op_id; | 582 | 1 | ConsensusStatePB cstate; | 583 | 1 | while (true) { | 584 | 1 | MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now()); | 585 | | | 586 | 1 | int64_t op_index = -1; | 587 | 1 | if (config) { | 588 | 0 | s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_COMMITTED, | 589 | 0 | remaining_timeout, &cstate); | 590 | 0 | if (s.ok()) { | 591 | 0 | op_index = cstate.config().opid_index(); | 592 | 0 | } | 593 | 1 | } else { | 594 | 1 | auto last_op_id_result = GetLastOpIdForReplica( | 595 | 1 | tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout); | 596 | 1 | if (last_op_id_result.ok()) { | 597 | 1 | op_id = *last_op_id_result; | 598 | 1 | op_index = op_id.index; | 599 | 1 | } else { | 600 | 0 | s = last_op_id_result.status(); | 601 | 0 | } | 602 | 1 | } | 603 | | | 604 | 1 | if (s.ok() && context.Check(op_index)) { | 605 | 1 | if (config) { | 606 | 0 | LOG(INFO) << "Committed config state is: " << cstate.ShortDebugString() << " for replica: " | 607 | 0 | << replica->instance_id.permanent_uuid(); | 608 | 1 | } else { | 609 | 1 | LOG(INFO) << "Committed op_id index is: " << op_id << " for replica: " | 610 | 1 | << replica->instance_id.permanent_uuid(); | 611 | 1 | } | 612 | 1 | return Status::OK(); | 613 | 1 | } | 614 | 0 | auto passed = MonoTime::Now().GetDeltaSince(start); | 615 | 0 | if (passed.MoreThan(timeout)) { | 616 | 0 | auto name = config ? "config" : "consensus"; | 617 | 0 | auto last_value = config ? cstate.ShortDebugString() : AsString(op_id); | 618 | 0 | return STATUS(TimedOut, | 619 | 0 | Substitute("Committed $0 opid_index does not equal $1 " | 620 | 0 | "after waiting for $2. Last value: $3, Last status: $4", | 621 | 0 | name, | 622 | 0 | context.Desired(), | 623 | 0 | passed.ToString(), | 624 | 0 | last_value, | 625 | 0 | s.ToString())); | 626 | 0 | } | 627 | 0 | if (!config) { | 628 | 0 | LOG(INFO) << "Committed index is at: " << op_id.index << " and not yet at " | 629 | 0 | << context.Desired(); | 630 | 0 | } | 631 | 0 | SleepFor(MonoDelta::FromMilliseconds(100)); | 632 | 0 | } | 633 | 1 | } |
|
634 | | |
635 | | class WaitUntilCommittedOpIdIndexContext { |
636 | | public: |
637 | | explicit WaitUntilCommittedOpIdIndexContext(std::string desired) |
638 | 17 | : desired_(std::move(desired)) { |
639 | 17 | } |
640 | | |
641 | 1 | const string& Desired() const { |
642 | 1 | return desired_; |
643 | 1 | } |
644 | | private: |
645 | | string desired_; |
646 | | }; |
647 | | |
648 | | class WaitUntilCommittedOpIdIndexIsContext : public WaitUntilCommittedOpIdIndexContext { |
649 | | public: |
650 | | explicit WaitUntilCommittedOpIdIndexIsContext(int64_t value) |
651 | | : WaitUntilCommittedOpIdIndexContext(Substitute("equal $0", value)), |
652 | 16 | value_(value) { |
653 | 16 | } |
654 | | |
655 | 17 | bool Check(int64_t current) { |
656 | 17 | return value_ == current; |
657 | 17 | } |
658 | | private: |
659 | | int64_t value_; |
660 | | }; |
661 | | |
662 | | Status WaitForReplicasReportedToMaster( |
663 | | ExternalMiniCluster* cluster, |
664 | | int num_replicas, const string& tablet_id, |
665 | | const MonoDelta& timeout, |
666 | | WaitForLeader wait_for_leader, |
667 | | bool* has_leader, |
668 | 12 | master::TabletLocationsPB* tablet_locations) { |
669 | 12 | MonoTime deadline(MonoTime::Now() + timeout); |
670 | 12 | while (true) { |
671 | 12 | RETURN_NOT_OK(GetTabletLocations(cluster, tablet_id, timeout, tablet_locations)); |
672 | 12 | *has_leader = false; |
673 | 12 | if (tablet_locations->replicas_size() == num_replicas) { |
674 | 12 | for (const master::TabletLocationsPB_ReplicaPB& replica : |
675 | 36 | tablet_locations->replicas()) { |
676 | 36 | if (replica.role() == PeerRole::LEADER) { |
677 | 12 | *has_leader = true; |
678 | 12 | } |
679 | 36 | } |
680 | 12 | if (wait_for_leader == DONT_WAIT_FOR_LEADER || |
681 | 12 | (wait_for_leader == WAIT_FOR_LEADER && *has_leader)) { |
682 | 12 | break; |
683 | 12 | } |
684 | 12 | } |
685 | 0 | if (deadline < MonoTime::Now()) { |
686 | 0 | return STATUS(TimedOut, Substitute("Timed out after waiting " |
687 | 0 | "for tablet $1 expected to report master with $2 replicas, has_leader: $3", |
688 | 0 | tablet_id, num_replicas, *has_leader)); |
689 | 0 | } |
690 | 0 | SleepFor(MonoDelta::FromMilliseconds(20)); |
691 | 0 | } |
692 | 12 | if (num_replicas != tablet_locations->replicas_size()) { |
693 | 0 | return STATUS(NotFound, Substitute("Number of replicas for tablet $0 " |
694 | 0 | "reported to master $1:$2", |
695 | 0 | tablet_id, tablet_locations->replicas_size(), |
696 | 0 | yb::ToString(*tablet_locations))); |
697 | 0 | } |
698 | 12 | if (wait_for_leader == WAIT_FOR_LEADER && !(*has_leader)) { |
699 | 0 | return STATUS(NotFound, Substitute("Leader for tablet $0 not found on master, " |
700 | 0 | "number of replicas $1:$2", |
701 | 0 | tablet_id, tablet_locations->replicas_size(), |
702 | 0 | yb::ToString(*tablet_locations))); |
703 | 0 | } |
704 | 12 | return Status::OK(); |
705 | 12 | } |
706 | | |
707 | | Status WaitUntilCommittedOpIdIndexIs(int64_t opid_index, |
708 | | TServerDetails* replica, |
709 | | const string& tablet_id, |
710 | | const MonoDelta& timeout, |
711 | 16 | CommittedEntryType type) { |
712 | 16 | return WaitUntilCommittedOpIdIndex( |
713 | 16 | replica, |
714 | 16 | tablet_id, |
715 | 16 | timeout, |
716 | 16 | type, |
717 | 16 | WaitUntilCommittedOpIdIndexIsContext(opid_index)); |
718 | 16 | } |
719 | | |
720 | | class WaitUntilCommittedOpIdIndexIsGreaterThanContext : public WaitUntilCommittedOpIdIndexContext { |
721 | | public: |
722 | | explicit WaitUntilCommittedOpIdIndexIsGreaterThanContext(int64_t* value) |
723 | | : WaitUntilCommittedOpIdIndexContext(Substitute("greater than $0", *value)), |
724 | 1 | original_value_(*value), value_(value) { |
725 | | |
726 | 1 | } |
727 | | |
728 | 1 | bool Check(int64_t current) { |
729 | 1 | if (current > *value_) { |
730 | 1 | CHECK_EQ(*value_, original_value_); |
731 | 1 | *value_ = current; |
732 | 1 | return true; |
733 | 1 | } |
734 | 0 | return false; |
735 | 1 | } |
736 | | private: |
737 | | int64_t original_value_; |
738 | | int64_t* const value_; |
739 | | }; |
740 | | |
741 | | Status WaitUntilCommittedOpIdIndexIsGreaterThan(int64_t* index, |
742 | | TServerDetails* replica, |
743 | | const TabletId& tablet_id, |
744 | | const MonoDelta& timeout, |
745 | 1 | CommittedEntryType type) { |
746 | 1 | return WaitUntilCommittedOpIdIndex( |
747 | 1 | replica, |
748 | 1 | tablet_id, |
749 | 1 | timeout, |
750 | 1 | type, |
751 | 1 | WaitUntilCommittedOpIdIndexIsGreaterThanContext(index)); |
752 | 1 | } |
753 | | |
754 | | Status WaitUntilCommittedOpIdIndexIsAtLeast(int64_t* index, |
755 | | TServerDetails* replica, |
756 | | const TabletId& tablet_id, |
757 | | const MonoDelta& timeout, |
758 | 1 | CommittedEntryType type) { |
759 | 1 | int64_t tmp_index = *index - 1; |
760 | 1 | Status s = WaitUntilCommittedOpIdIndexIsGreaterThan( |
761 | 1 | &tmp_index, |
762 | 1 | replica, |
763 | 1 | tablet_id, |
764 | 1 | timeout, |
765 | 1 | type); |
766 | 1 | *index = tmp_index; |
767 | 1 | return s; |
768 | 1 | } |
769 | | |
770 | | Status GetReplicaStatusAndCheckIfLeader(const TServerDetails* replica, |
771 | | const string& tablet_id, |
772 | | const MonoDelta& timeout, |
773 | 2.84k | LeaderLeaseCheckMode lease_check_mode) { |
774 | 2.84k | ConsensusStatePB cstate; |
775 | 2.84k | LeaderLeaseStatus leader_lease_status; |
776 | 2.84k | Status s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_ACTIVE, |
777 | 2.84k | timeout, &cstate, &leader_lease_status); |
778 | 2.84k | if (PREDICT_FALSE(!s.ok())) { |
779 | 118 | VLOG(1) << "Error getting consensus state from replica: " |
780 | 0 | << replica->instance_id.permanent_uuid(); |
781 | 118 | return STATUS(NotFound, "Error connecting to replica", s.ToString()); |
782 | 118 | } |
783 | 2.72k | const string& replica_uuid = replica->instance_id.permanent_uuid(); |
784 | 2.72k | if (cstate.has_leader_uuid() && cstate.leader_uuid() == replica_uuid && |
785 | 2.72k | (338 lease_check_mode == LeaderLeaseCheckMode::DONT_NEED_LEASE338 || |
786 | 338 | leader_lease_status == consensus::LeaderLeaseStatus::HAS_LEASE337 )) { |
787 | 173 | return Status::OK(); |
788 | 173 | } |
789 | 2.55k | VLOG(1) << "Replica not leader of config: " << replica->instance_id.permanent_uuid()0 ; |
790 | 2.55k | return STATUS_FORMAT(IllegalState, |
791 | 2.72k | "Replica found but not leader; lease check mode: $0", lease_check_mode); |
792 | 2.72k | } |
793 | | |
794 | | Status WaitUntilLeader(const TServerDetails* replica, |
795 | | const string& tablet_id, |
796 | | const MonoDelta& timeout, |
797 | 25 | const LeaderLeaseCheckMode lease_check_mode) { |
798 | 25 | MonoTime start = MonoTime::Now(); |
799 | 25 | MonoTime deadline = start; |
800 | 25 | deadline.AddDelta(timeout); |
801 | | |
802 | 25 | int backoff_exp = 0; |
803 | 25 | const int kMaxBackoffExp = 7; |
804 | 25 | Status s; |
805 | 280 | while (true) { |
806 | 280 | MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now()); |
807 | 280 | s = GetReplicaStatusAndCheckIfLeader(replica, tablet_id, remaining_timeout, |
808 | 280 | lease_check_mode); |
809 | 280 | if (s.ok()) { |
810 | 17 | return Status::OK(); |
811 | 17 | } |
812 | | |
813 | 263 | if (MonoTime::Now().GetDeltaSince(start).MoreThan(timeout)) { |
814 | 8 | break; |
815 | 8 | } |
816 | 255 | SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp)); |
817 | 255 | backoff_exp = min(backoff_exp + 1, kMaxBackoffExp); |
818 | 255 | } |
819 | 8 | return STATUS(TimedOut, Substitute("Replica $0 is not leader after waiting for $1: $2", |
820 | 25 | replica->ToString(), timeout.ToString(), s.ToString())); |
821 | 25 | } |
822 | | |
823 | | Status FindTabletLeader(const TabletServerMap& tablet_servers, |
824 | | const string& tablet_id, |
825 | | const MonoDelta& timeout, |
826 | 112 | TServerDetails** leader) { |
827 | 112 | return FindTabletLeader(TServerDetailsVector(tablet_servers), tablet_id, timeout, leader); |
828 | 112 | } |
829 | | |
830 | | Status FindTabletLeader(const TabletServerMapUnowned& tablet_servers, |
831 | | const string& tablet_id, |
832 | | const MonoDelta& timeout, |
833 | 12 | TServerDetails** leader) { |
834 | 12 | return FindTabletLeader(TServerDetailsVector(tablet_servers), tablet_id, timeout, leader); |
835 | 12 | } |
836 | | |
837 | | Status FindTabletLeader(const vector<TServerDetails*>& tservers, |
838 | | const string& tablet_id, |
839 | | const MonoDelta& timeout, |
840 | 124 | TServerDetails** leader) { |
841 | 124 | MonoTime start = MonoTime::Now(); |
842 | 124 | MonoTime deadline = start; |
843 | 124 | deadline.AddDelta(timeout); |
844 | 124 | Status s; |
845 | 124 | int i = 0; |
846 | 2.30k | while (true) { |
847 | 2.30k | MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now()); |
848 | 2.30k | s = GetReplicaStatusAndCheckIfLeader(tservers[i], tablet_id, remaining_timeout); |
849 | 2.30k | if (s.ok()) { |
850 | 123 | *leader = tservers[i]; |
851 | 123 | return Status::OK(); |
852 | 123 | } |
853 | | |
854 | 2.18k | if (deadline.ComesBefore(MonoTime::Now())) break1 ; |
855 | 2.18k | i = (i + 1) % tservers.size(); |
856 | 2.18k | if (i == 0) { |
857 | 685 | SleepFor(MonoDelta::FromMilliseconds(10)); |
858 | 685 | } |
859 | 2.18k | } |
860 | 1 | return STATUS(TimedOut, Substitute("Unable to find leader of tablet $0 after $1. " |
861 | 124 | "Status message: $2", tablet_id, |
862 | 124 | MonoTime::Now().GetDeltaSince(start).ToString(), |
863 | 124 | s.ToString())); |
864 | 124 | } |
865 | | |
866 | | Status FindTabletFollowers(const TabletServerMapUnowned& tablet_servers, |
867 | | const string& tablet_id, |
868 | | const MonoDelta& timeout, |
869 | 6 | vector<TServerDetails*>* followers) { |
870 | 6 | TServerDetails* leader; |
871 | 6 | RETURN_NOT_OK(FindTabletLeader(tablet_servers, tablet_id, timeout, &leader)); |
872 | 18 | for (const auto& entry : tablet_servers)6 { |
873 | 18 | TServerDetails* ts = entry.second; |
874 | 18 | if (ts->uuid() != leader->uuid()) { |
875 | 12 | followers->push_back(ts); |
876 | 12 | } |
877 | 18 | } |
878 | 6 | return Status::OK(); |
879 | 6 | } |
880 | | |
881 | | Status StartElection(const TServerDetails* replica, |
882 | | const string& tablet_id, |
883 | | const MonoDelta& timeout, |
884 | 100 | consensus::TEST_SuppressVoteRequest suppress_vote_request) { |
885 | 100 | RunLeaderElectionRequestPB req; |
886 | 100 | req.set_dest_uuid(replica->uuid()); |
887 | 100 | req.set_tablet_id(tablet_id); |
888 | 100 | req.set_suppress_vote_request(suppress_vote_request); |
889 | 100 | RunLeaderElectionResponsePB resp; |
890 | 100 | RpcController rpc; |
891 | 100 | rpc.set_timeout(timeout); |
892 | 100 | RETURN_NOT_OK(replica->consensus_proxy->RunLeaderElection(req, &resp, &rpc)); |
893 | 100 | if (resp.has_error()) { |
894 | 0 | return StatusFromPB(resp.error().status()) |
895 | 0 | .CloneAndPrepend(Substitute("Code $0", TabletServerErrorPB::Code_Name(resp.error().code()))); |
896 | 0 | } |
897 | 100 | return Status::OK(); |
898 | 100 | } |
899 | | |
900 | | Status RequestVote(const TServerDetails* replica, |
901 | | const std::string& tablet_id, |
902 | | const std::string& candidate_uuid, |
903 | | int64_t candidate_term, |
904 | | const OpIdPB& last_logged_opid, |
905 | | boost::optional<bool> ignore_live_leader, |
906 | | boost::optional<bool> is_pre_election, |
907 | 0 | const MonoDelta& timeout) { |
908 | 0 | RSTATUS_DCHECK( |
909 | 0 | last_logged_opid.IsInitialized(), Uninitialized, "Last logged op id is uninitialized"); |
910 | 0 | consensus::VoteRequestPB req; |
911 | 0 | req.set_dest_uuid(replica->uuid()); |
912 | 0 | req.set_tablet_id(tablet_id); |
913 | 0 | req.set_candidate_uuid(candidate_uuid); |
914 | 0 | req.set_candidate_term(candidate_term); |
915 | 0 | *req.mutable_candidate_status()->mutable_last_received() = last_logged_opid; |
916 | 0 | if (ignore_live_leader) req.set_ignore_live_leader(*ignore_live_leader); |
917 | 0 | if (is_pre_election) req.set_preelection(*is_pre_election); |
918 | 0 | consensus::VoteResponsePB resp; |
919 | 0 | RpcController rpc; |
920 | 0 | rpc.set_timeout(timeout); |
921 | 0 | RETURN_NOT_OK(replica->consensus_proxy->RequestConsensusVote(req, &resp, &rpc)); |
922 | 0 | if (resp.has_vote_granted() && resp.vote_granted()) |
923 | 0 | return Status::OK(); |
924 | 0 | if (resp.has_error()) |
925 | 0 | return StatusFromPB(resp.error().status()); |
926 | 0 | if (resp.has_consensus_error()) |
927 | 0 | return StatusFromPB(resp.consensus_error().status()); |
928 | 0 | return STATUS(IllegalState, "Unknown error (vote not granted)"); |
929 | 0 | } |
930 | | |
931 | | Status LeaderStepDown( |
932 | | const TServerDetails* replica, |
933 | | const string& tablet_id, |
934 | | const TServerDetails* new_leader, |
935 | | const MonoDelta& timeout, |
936 | | const bool disable_graceful_transition, |
937 | 13 | TabletServerErrorPB* error) { |
938 | 13 | LeaderStepDownRequestPB req; |
939 | 13 | req.set_dest_uuid(replica->uuid()); |
940 | 13 | req.set_tablet_id(tablet_id); |
941 | 13 | if (disable_graceful_transition) { |
942 | 1 | req.set_disable_graceful_transition(disable_graceful_transition); |
943 | 1 | } |
944 | 13 | if (new_leader) { |
945 | 0 | req.set_new_leader_uuid(new_leader->uuid()); |
946 | 0 | } |
947 | 13 | LeaderStepDownResponsePB resp; |
948 | 13 | RpcController rpc; |
949 | 13 | rpc.set_timeout(timeout); |
950 | 13 | RETURN_NOT_OK(replica->consensus_proxy->LeaderStepDown(req, &resp, &rpc)); |
951 | 12 | if (resp.has_error()) { |
952 | 1 | if (error != nullptr) { |
953 | 1 | *error = resp.error(); |
954 | 1 | } |
955 | 1 | return StatusFromPB(resp.error().status()) |
956 | 1 | .CloneAndPrepend(Substitute("Code $0", TabletServerErrorPB::Code_Name(resp.error().code()))); |
957 | 1 | } |
958 | 377 | return WaitFor([&]() -> Result<bool> 11 { |
959 | 377 | rpc.Reset(); |
960 | 377 | GetConsensusStateRequestPB state_req; |
961 | 377 | state_req.set_dest_uuid(replica->uuid()); |
962 | 377 | state_req.set_tablet_id(tablet_id); |
963 | 377 | GetConsensusStateResponsePB state_resp; |
964 | 377 | RETURN_NOT_OK(replica->consensus_proxy->GetConsensusState(state_req, &state_resp, &rpc)); |
965 | 377 | return state_resp.cstate().leader_uuid() != replica->uuid(); |
966 | 377 | }, timeout, "Leader change"); |
967 | 12 | } |
968 | | |
969 | | Status WriteSimpleTestRow(const TServerDetails* replica, |
970 | | const std::string& tablet_id, |
971 | | int32_t key, |
972 | | int32_t int_val, |
973 | | const string& string_val, |
974 | 13.2k | const MonoDelta& timeout) { |
975 | 13.2k | WriteRequestPB req; |
976 | 13.2k | WriteResponsePB resp; |
977 | 13.2k | RpcController rpc; |
978 | 13.2k | rpc.set_timeout(timeout); |
979 | | |
980 | 13.2k | req.set_tablet_id(tablet_id); |
981 | | |
982 | 13.2k | AddTestRowInsert(key, int_val, string_val, &req); |
983 | | |
984 | 13.2k | RETURN_NOT_OK(replica->tserver_proxy->Write(req, &resp, &rpc)); |
985 | 13.2k | if (resp.has_error()) { |
986 | 8.51k | return StatusFromPB(resp.error().status()); |
987 | 8.51k | } |
988 | 4.77k | return Status::OK(); |
989 | 13.2k | } |
990 | | |
991 | | namespace { |
992 | | Status SendAddRemoveServerRequest(const TServerDetails* leader, |
993 | | const ChangeConfigRequestPB& req, |
994 | | ChangeConfigResponsePB* resp, |
995 | | RpcController* rpc, |
996 | | const MonoDelta& timeout, |
997 | | TabletServerErrorPB::Code* error_code, |
998 | 40 | bool retry) { |
999 | 40 | Status status = Status::OK(); |
1000 | 40 | MonoTime start = MonoTime::Now(); |
1001 | 40 | do { |
1002 | 40 | RETURN_NOT_OK(leader->consensus_proxy->ChangeConfig(req, resp, rpc)); |
1003 | 35 | if (!resp->has_error()) { |
1004 | 28 | break; |
1005 | 28 | } |
1006 | 7 | if (error_code) *error_code = resp->error().code()2 ; |
1007 | 7 | status = StatusFromPB(resp->error().status()); |
1008 | 7 | if (!retry) { |
1009 | 7 | break; |
1010 | 7 | } |
1011 | 0 | if (resp->error().code() != TabletServerErrorPB::LEADER_NOT_READY_CHANGE_CONFIG) { |
1012 | 0 | break; |
1013 | 0 | } |
1014 | 0 | rpc->Reset(); |
1015 | 0 | } while (MonoTime::Now().GetDeltaSince(start).LessThan(timeout)); |
1016 | 35 | return status; |
1017 | 40 | } |
1018 | | } // namespace |
1019 | | |
1020 | | Status AddServer(const TServerDetails* leader, |
1021 | | const std::string& tablet_id, |
1022 | | const TServerDetails* replica_to_add, |
1023 | | consensus::PeerMemberType member_type, |
1024 | | const boost::optional<int64_t>& cas_config_opid_index, |
1025 | | const MonoDelta& timeout, |
1026 | | TabletServerErrorPB::Code* error_code, |
1027 | 15 | bool retry) { |
1028 | 15 | ChangeConfigRequestPB req; |
1029 | 15 | ChangeConfigResponsePB resp; |
1030 | 15 | RpcController rpc; |
1031 | 15 | rpc.set_timeout(timeout); |
1032 | | |
1033 | 15 | req.set_dest_uuid(leader->uuid()); |
1034 | 15 | req.set_tablet_id(tablet_id); |
1035 | 15 | req.set_type(consensus::ADD_SERVER); |
1036 | 15 | RaftPeerPB* peer = req.mutable_server(); |
1037 | 15 | peer->set_permanent_uuid(replica_to_add->uuid()); |
1038 | 15 | peer->set_member_type(member_type); |
1039 | 15 | CopyRegistration(replica_to_add->registration->common(), peer); |
1040 | 15 | if (cas_config_opid_index) { |
1041 | 2 | req.set_cas_config_opid_index(*cas_config_opid_index); |
1042 | 2 | } |
1043 | | |
1044 | 15 | return SendAddRemoveServerRequest(leader, req, &resp, &rpc, timeout, error_code, retry); |
1045 | 15 | } |
1046 | | |
1047 | | Status RemoveServer(const TServerDetails* leader, |
1048 | | const std::string& tablet_id, |
1049 | | const TServerDetails* replica_to_remove, |
1050 | | const boost::optional<int64_t>& cas_config_opid_index, |
1051 | | const MonoDelta& timeout, |
1052 | | TabletServerErrorPB::Code* error_code, |
1053 | 25 | bool retry) { |
1054 | 25 | ChangeConfigRequestPB req; |
1055 | 25 | ChangeConfigResponsePB resp; |
1056 | 25 | RpcController rpc; |
1057 | 25 | rpc.set_timeout(timeout); |
1058 | | |
1059 | 25 | req.set_dest_uuid(leader->uuid()); |
1060 | 25 | req.set_tablet_id(tablet_id); |
1061 | 25 | req.set_type(consensus::REMOVE_SERVER); |
1062 | 25 | if (cas_config_opid_index) { |
1063 | 6 | req.set_cas_config_opid_index(*cas_config_opid_index); |
1064 | 6 | } |
1065 | 25 | RaftPeerPB* peer = req.mutable_server(); |
1066 | 25 | peer->set_permanent_uuid(replica_to_remove->uuid()); |
1067 | | |
1068 | 25 | return SendAddRemoveServerRequest(leader, req, &resp, &rpc, timeout, error_code, retry); |
1069 | 25 | } |
1070 | | |
1071 | | Status ListTablets(const TServerDetails* ts, |
1072 | | const MonoDelta& timeout, |
1073 | 188 | vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) { |
1074 | 188 | tserver::ListTabletsRequestPB req; |
1075 | 188 | tserver::ListTabletsResponsePB resp; |
1076 | 188 | RpcController rpc; |
1077 | 188 | rpc.set_timeout(timeout); |
1078 | | |
1079 | 188 | RETURN_NOT_OK(ts->tserver_proxy->ListTablets(req, &resp, &rpc)); |
1080 | 188 | if (resp.has_error()) { |
1081 | 0 | return StatusFromPB(resp.error().status()); |
1082 | 0 | } |
1083 | | |
1084 | 188 | tablets->assign(resp.status_and_schema().begin(), resp.status_and_schema().end()); |
1085 | 188 | return Status::OK(); |
1086 | 188 | } |
1087 | | |
1088 | | Status ListRunningTabletIds(const TServerDetails* ts, |
1089 | | const MonoDelta& timeout, |
1090 | 1 | vector<string>* tablet_ids) { |
1091 | 1 | vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
1092 | 1 | RETURN_NOT_OK(ListTablets(ts, timeout, &tablets)); |
1093 | 1 | tablet_ids->clear(); |
1094 | 1 | for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) { |
1095 | 1 | if (t.tablet_status().state() == tablet::RUNNING) { |
1096 | 1 | tablet_ids->push_back(t.tablet_status().tablet_id()); |
1097 | 1 | } |
1098 | 1 | } |
1099 | 1 | return Status::OK(); |
1100 | 1 | } |
1101 | | |
1102 | | Status GetTabletLocations(ExternalMiniCluster* cluster, |
1103 | | const string& tablet_id, |
1104 | | const MonoDelta& timeout, |
1105 | 83 | master::TabletLocationsPB* tablet_locations) { |
1106 | 83 | master::GetTabletLocationsResponsePB resp; |
1107 | 83 | master::GetTabletLocationsRequestPB req; |
1108 | 83 | *req.add_tablet_ids() = tablet_id; |
1109 | 83 | rpc::RpcController rpc; |
1110 | 83 | rpc.set_timeout(timeout); |
1111 | 83 | RETURN_NOT_OK(cluster->GetMasterProxy<master::MasterClientProxy>().GetTabletLocations( |
1112 | 83 | req, &resp, &rpc)); |
1113 | 83 | if (resp.has_error()) { |
1114 | 0 | return StatusFromPB(resp.error().status()); |
1115 | 0 | } |
1116 | 83 | if (resp.errors_size() > 0) { |
1117 | 0 | CHECK_EQ(1, resp.errors_size()) << resp.ShortDebugString(); |
1118 | 0 | return StatusFromPB(resp.errors(0).status()); |
1119 | 0 | } |
1120 | 83 | CHECK_EQ(1, resp.tablet_locations_size()) << resp.ShortDebugString()0 ; |
1121 | 83 | *tablet_locations = resp.tablet_locations(0); |
1122 | 83 | return Status::OK(); |
1123 | 83 | } |
1124 | | |
1125 | | Status GetTableLocations(ExternalMiniCluster* cluster, |
1126 | | const YBTableName& table_name, |
1127 | | const MonoDelta& timeout, |
1128 | | const RequireTabletsRunning require_tablets_running, |
1129 | 118 | master::GetTableLocationsResponsePB* table_locations) { |
1130 | 118 | master::GetTableLocationsRequestPB req; |
1131 | 118 | table_name.SetIntoTableIdentifierPB(req.mutable_table()); |
1132 | 118 | req.set_require_tablets_running(require_tablets_running); |
1133 | 118 | req.set_max_returned_locations(std::numeric_limits<int32_t>::max()); |
1134 | 118 | rpc::RpcController rpc; |
1135 | 118 | rpc.set_timeout(timeout); |
1136 | 118 | RETURN_NOT_OK(cluster->GetMasterProxy<master::MasterClientProxy>().GetTableLocations( |
1137 | 118 | req, table_locations, &rpc)); |
1138 | 118 | if (table_locations->has_error()) { |
1139 | 0 | return StatusFromPB(table_locations->error().status()); |
1140 | 0 | } |
1141 | 118 | return Status::OK(); |
1142 | 118 | } |
1143 | | |
1144 | | Status WaitForNumVotersInConfigOnMaster( |
1145 | | ExternalMiniCluster* cluster, |
1146 | | const std::string& tablet_id, |
1147 | | int num_voters, |
1148 | 0 | const MonoDelta& timeout) { |
1149 | 0 | Status s; |
1150 | 0 | MonoTime deadline = MonoTime::Now(); |
1151 | 0 | deadline.AddDelta(timeout); |
1152 | 0 | int num_voters_found = 0; |
1153 | 0 | while (true) { |
1154 | 0 | TabletLocationsPB tablet_locations; |
1155 | 0 | MonoDelta time_remaining = deadline.GetDeltaSince(MonoTime::Now()); |
1156 | 0 | s = GetTabletLocations(cluster, tablet_id, time_remaining, &tablet_locations); |
1157 | 0 | if (s.ok()) { |
1158 | 0 | num_voters_found = 0; |
1159 | 0 | for (const TabletLocationsPB::ReplicaPB& r : tablet_locations.replicas()) { |
1160 | 0 | if (r.role() == PeerRole::LEADER || r.role() == PeerRole::FOLLOWER) num_voters_found++; |
1161 | 0 | } |
1162 | 0 | if (num_voters_found == num_voters) break; |
1163 | 0 | } |
1164 | 0 | if (deadline.ComesBefore(MonoTime::Now())) break; |
1165 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
1166 | 0 | } |
1167 | 0 | RETURN_NOT_OK(s); |
1168 | 0 | if (num_voters_found != num_voters) { |
1169 | 0 | return STATUS(IllegalState, |
1170 | 0 | Substitute("Did not find exactly $0 voters, found $1 voters", |
1171 | 0 | num_voters, num_voters_found)); |
1172 | 0 | } |
1173 | 0 | return Status::OK(); |
1174 | 0 | } |
1175 | | |
1176 | | Status WaitForNumTabletsOnTS(TServerDetails* ts, |
1177 | | size_t count, |
1178 | | const MonoDelta& timeout, |
1179 | 8 | vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) { |
1180 | 8 | Status s; |
1181 | 8 | MonoTime deadline = MonoTime::Now(); |
1182 | 8 | deadline.AddDelta(timeout); |
1183 | 8 | while (true) { |
1184 | 8 | s = ListTablets(ts, MonoDelta::FromSeconds(10), tablets); |
1185 | 8 | if (s.ok() && tablets->size() == count) break; |
1186 | 0 | if (deadline.ComesBefore(MonoTime::Now())) break; |
1187 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
1188 | 0 | } |
1189 | 8 | RETURN_NOT_OK(s); |
1190 | 8 | if (tablets->size() != count) { |
1191 | 0 | return STATUS(IllegalState, |
1192 | 0 | Substitute("Did not find exactly $0 tablets, found $1 tablets", |
1193 | 0 | count, tablets->size())); |
1194 | 0 | } |
1195 | 8 | return Status::OK(); |
1196 | 8 | } |
1197 | | |
1198 | | Status WaitUntilTabletInState(TServerDetails* ts, |
1199 | | const std::string& tablet_id, |
1200 | | tablet::RaftGroupStatePB state, |
1201 | | const MonoDelta& timeout, |
1202 | 179 | const MonoDelta& list_tablets_timeout) { |
1203 | 179 | MonoTime start = MonoTime::Now(); |
1204 | 179 | MonoTime deadline = start; |
1205 | 179 | deadline.AddDelta(timeout); |
1206 | 179 | vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets; |
1207 | 179 | Status s; |
1208 | 179 | tablet::RaftGroupStatePB last_state = tablet::UNKNOWN; |
1209 | 179 | while (true) { |
1210 | 179 | s = ListTablets(ts, list_tablets_timeout, &tablets); |
1211 | 179 | if (s.ok()) { |
1212 | 179 | bool seen = false; |
1213 | 1.58k | for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) { |
1214 | 1.58k | if (t.tablet_status().tablet_id() == tablet_id) { |
1215 | 179 | seen = true; |
1216 | 179 | last_state = t.tablet_status().state(); |
1217 | 179 | if (last_state == state) { |
1218 | 179 | return Status::OK(); |
1219 | 179 | } |
1220 | 179 | } |
1221 | 1.58k | } |
1222 | 0 | if (!seen) { |
1223 | 0 | s = STATUS(NotFound, "Tablet " + tablet_id + " not found"); |
1224 | 0 | } |
1225 | 0 | } |
1226 | 0 | if (deadline.ComesBefore(MonoTime::Now())) { |
1227 | 0 | break; |
1228 | 0 | } |
1229 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
1230 | 0 | } |
1231 | 0 | return STATUS(TimedOut, Substitute("T $0 P $1: Tablet not in $2 state after $3: " |
1232 | 179 | "Tablet state: $4, Status message: $5", |
1233 | 179 | tablet_id, ts->uuid(), |
1234 | 179 | tablet::RaftGroupStatePB_Name(state), |
1235 | 179 | MonoTime::Now().GetDeltaSince(start).ToString(), |
1236 | 179 | tablet::RaftGroupStatePB_Name(last_state), s.ToString())); |
1237 | 179 | } |
1238 | | |
1239 | | // Wait until the specified tablet is in RUNNING state. |
1240 | | Status WaitUntilTabletRunning(TServerDetails* ts, |
1241 | | const std::string& tablet_id, |
1242 | 177 | const MonoDelta& timeout) { |
1243 | 177 | return WaitUntilTabletInState(ts, tablet_id, tablet::RUNNING, timeout); |
1244 | 177 | } |
1245 | | |
1246 | | Status DeleteTablet(const TServerDetails* ts, |
1247 | | const std::string& tablet_id, |
1248 | | const tablet::TabletDataState delete_type, |
1249 | | const boost::optional<int64_t>& cas_config_opid_index_less_or_equal, |
1250 | | const MonoDelta& timeout, |
1251 | 11 | tserver::TabletServerErrorPB::Code* error_code) { |
1252 | 11 | DeleteTabletRequestPB req; |
1253 | 11 | DeleteTabletResponsePB resp; |
1254 | 11 | RpcController rpc; |
1255 | 11 | rpc.set_timeout(timeout); |
1256 | | |
1257 | 11 | req.set_dest_uuid(ts->uuid()); |
1258 | 11 | req.set_tablet_id(tablet_id); |
1259 | 11 | req.set_delete_type(delete_type); |
1260 | 11 | if (cas_config_opid_index_less_or_equal) { |
1261 | 0 | req.set_cas_config_opid_index_less_or_equal(*cas_config_opid_index_less_or_equal); |
1262 | 0 | } |
1263 | | |
1264 | 11 | RETURN_NOT_OK(ts->tserver_admin_proxy->DeleteTablet(req, &resp, &rpc)); |
1265 | 11 | if (resp.has_error()) { |
1266 | 0 | if (error_code) { |
1267 | 0 | *error_code = resp.error().code(); |
1268 | 0 | } |
1269 | 0 | return StatusFromPB(resp.error().status()); |
1270 | 0 | } |
1271 | 11 | return Status::OK(); |
1272 | 11 | } |
1273 | | |
1274 | | Status StartRemoteBootstrap(const TServerDetails* ts, |
1275 | | const string& tablet_id, |
1276 | | const string& bootstrap_source_uuid, |
1277 | | const HostPort& bootstrap_source_addr, |
1278 | | int64_t caller_term, |
1279 | 0 | const MonoDelta& timeout) { |
1280 | 0 | consensus::StartRemoteBootstrapRequestPB req; |
1281 | 0 | consensus::StartRemoteBootstrapResponsePB resp; |
1282 | 0 | RpcController rpc; |
1283 | 0 | rpc.set_timeout(timeout); |
1284 | |
|
1285 | 0 | req.set_dest_uuid(ts->uuid()); |
1286 | 0 | req.set_tablet_id(tablet_id); |
1287 | 0 | req.set_bootstrap_peer_uuid(bootstrap_source_uuid); |
1288 | 0 | HostPortToPB(bootstrap_source_addr, req.mutable_source_private_addr()->Add()); |
1289 | 0 | req.set_caller_term(caller_term); |
1290 | |
|
1291 | 0 | RETURN_NOT_OK(ts->consensus_proxy->StartRemoteBootstrap(req, &resp, &rpc)); |
1292 | 0 | if (resp.has_error()) { |
1293 | 0 | return StatusFromPB(resp.error().status()); |
1294 | 0 | } |
1295 | 0 | return Status::OK(); |
1296 | 0 | } |
1297 | | |
1298 | | Status GetLastOpIdForMasterReplica( |
1299 | | const shared_ptr<consensus::ConsensusServiceProxy>& consensus_proxy, |
1300 | | const string& tablet_id, |
1301 | | const string& dest_uuid, |
1302 | | const consensus::OpIdType opid_type, |
1303 | | const MonoDelta& timeout, |
1304 | 42 | OpIdPB* opid) { |
1305 | 42 | GetLastOpIdRequestPB opid_req; |
1306 | 42 | GetLastOpIdResponsePB opid_resp; |
1307 | 42 | RpcController controller; |
1308 | 42 | controller.Reset(); |
1309 | 42 | controller.set_timeout(timeout); |
1310 | | |
1311 | 42 | opid_req.set_dest_uuid(dest_uuid); |
1312 | 42 | opid_req.set_tablet_id(tablet_id); |
1313 | 42 | opid_req.set_opid_type(opid_type); |
1314 | | |
1315 | 42 | Status s = consensus_proxy->GetLastOpId(opid_req, &opid_resp, &controller); |
1316 | 42 | if (!s.ok()) { |
1317 | 0 | return STATUS(InvalidArgument, Substitute( |
1318 | 0 | "Failed to fetch opid type $0 from master uuid $1 with error : $2", |
1319 | 0 | opid_type, dest_uuid, s.ToString())); |
1320 | 0 | } |
1321 | 42 | if (opid_resp.has_error()) { |
1322 | 0 | return StatusFromPB(opid_resp.error().status()); |
1323 | 0 | } |
1324 | | |
1325 | 42 | *opid = opid_resp.opid(); |
1326 | | |
1327 | 42 | return Status::OK(); |
1328 | 42 | } |
1329 | | |
1330 | | Result<OpId> GetLastOpIdForReplica( |
1331 | | const TabletId& tablet_id, |
1332 | | TServerDetails* replica, |
1333 | | consensus::OpIdType opid_type, |
1334 | 39 | const MonoDelta& timeout) { |
1335 | 39 | return VERIFY_RESULT(GetLastOpIdForEachReplica(tablet_id, {replica}, opid_type, timeout))[0]; |
1336 | 39 | } |
1337 | | |
1338 | | } // namespace itest |
1339 | | } // namespace yb |