YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/master_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
// This module is internal to the client and not a public API.
33
34
#include "yb/master/master_rpc.h"
35
36
#include <mutex>
37
38
#include "yb/common/wire_protocol.h"
39
#include "yb/common/wire_protocol.pb.h"
40
41
#include "yb/master/master_cluster.proxy.h"
42
43
#include "yb/util/async_util.h"
44
#include "yb/util/flag_tags.h"
45
#include "yb/util/net/net_util.h"
46
47
using std::shared_ptr;
48
using std::string;
49
using std::vector;
50
51
using yb::consensus::RaftPeerPB;
52
using yb::rpc::Messenger;
53
using yb::rpc::Rpc;
54
55
using namespace std::placeholders;
56
57
DEFINE_int32(master_leader_rpc_timeout_ms, 500,
58
             "Number of milliseconds that the tserver will keep querying for master leader before"
59
             "selecting a follower.");
60
TAG_FLAG(master_leader_rpc_timeout_ms, advanced);
61
TAG_FLAG(master_leader_rpc_timeout_ms, hidden);
62
63
namespace yb {
64
namespace master {
65
66
namespace {
67
68
////////////////////////////////////////////////////////////
69
// GetMasterRegistrationRpc
70
////////////////////////////////////////////////////////////
71
72
// An RPC for getting a Master server's registration.
73
class GetMasterRegistrationRpc: public rpc::Rpc {
74
 public:
75
76
  // Create a wrapper object for a retriable GetMasterRegistration RPC
77
  // to 'addr'. The result is stored in 'out', which must be a valid
78
  // pointer for the lifetime of this object.
79
  //
80
  // Invokes 'user_cb' upon failure or success of the RPC call.
81
  GetMasterRegistrationRpc(StatusFunctor user_cb,
82
                           const HostPort& addr,
83
                           CoarseTimePoint deadline,
84
                           rpc::Messenger* messenger,
85
                           rpc::ProxyCache* proxy_cache,
86
                           ServerEntryPB* out)
87
      : Rpc(deadline, messenger, proxy_cache),
88
        user_cb_(std::move(user_cb)),
89
        addr_(addr),
90
4.55M
        out_(DCHECK_NOTNULL(out)) {}
91
92
  void SendRpc() override;
93
94
  std::string ToString() const override;
95
96
 private:
97
  void Finished(const Status& status) override;
98
99
  StatusFunctor user_cb_;
100
  HostPort addr_;
101
102
  ServerEntryPB* out_;
103
104
  GetMasterRegistrationResponsePB resp_;
105
};
106
107
4.54M
void GetMasterRegistrationRpc::SendRpc() {
108
4.54M
  MasterClusterProxy proxy(&retrier().proxy_cache(), addr_);
109
4.54M
  GetMasterRegistrationRequestPB req;
110
4.54M
  proxy.GetMasterRegistrationAsync(
111
4.54M
      req, &resp_, PrepareController(),
112
4.54M
      std::bind(&GetMasterRegistrationRpc::Finished, this, Status::OK()));
113
4.54M
}
114
115
658
string GetMasterRegistrationRpc::ToString() const {
116
658
  return Format("GetMasterRegistrationRpc(address: $0, num_attempts: $1, retries: $2)",
117
658
                addr_, num_attempts(), retrier());
118
658
}
119
120
4.54M
void GetMasterRegistrationRpc::Finished(const Status& status) {
121
4.54M
  Status new_status = status;
122
4.54M
  if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status)) {
123
1.02k
    return;
124
1.02k
  }
125
4.54M
  if (new_status.ok() && resp_.has_error()) {
126
396k
    if (resp_.error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
127
      // If CatalogManager is not initialized, treat the node as a
128
      // FOLLOWER for the time being, as currently this RPC is only
129
      // used for the purposes of finding the leader master.
130
395k
      resp_.set_role(PeerRole::FOLLOWER);
131
395k
      new_status = Status::OK();
132
709
    } else {
133
709
      out_->mutable_error()->CopyFrom(resp_.error().status());
134
709
      new_status = StatusFromPB(resp_.error().status());
135
709
    }
136
396k
  }
137
4.54M
  if (new_status.ok()) {
138
4.24M
    out_->mutable_instance_id()->CopyFrom(resp_.instance_id());
139
4.24M
    out_->mutable_registration()->CopyFrom(resp_.registration());
140
4.24M
    out_->set_role(resp_.role());
141
4.24M
  }
142
4.54M
  auto callback = std::move(user_cb_);
143
4.54M
  callback(new_status);
144
4.54M
}
145
146
} // namespace
147
148
////////////////////////////////////////////////////////////
149
// GetLeaderMasterRpc
150
////////////////////////////////////////////////////////////
151
152
GetLeaderMasterRpc::GetLeaderMasterRpc(LeaderCallback user_cb,
153
                                       const server::MasterAddresses& addrs,
154
                                       CoarseTimePoint deadline,
155
                                       Messenger* messenger,
156
                                       rpc::ProxyCache* proxy_cache,
157
                                       rpc::Rpcs* rpcs,
158
                                       bool should_timeout_to_follower,
159
                                       bool wait_for_leader_election)
160
    : Rpc(deadline, messenger, proxy_cache),
161
      user_cb_(std::move(user_cb)),
162
      rpcs_(*rpcs),
163
      should_timeout_to_follower_(should_timeout_to_follower),
164
431k
      wait_for_leader_election_(wait_for_leader_election) {
165
431k
  DCHECK(deadline != CoarseTimePoint::max());
166
167
1.25M
  for (const auto& list : addrs) {
168
1.25M
    addrs_.insert(addrs_.end(), list.begin(), list.end());
169
1.25M
  }
170
  // Using resize instead of reserve to explicitly initialized the values.
171
431k
  responses_.resize(addrs_.size());
172
431k
}
173
174
430k
GetLeaderMasterRpc::~GetLeaderMasterRpc() {
175
430k
}
176
177
3.41M
string GetLeaderMasterRpc::ToString() const {
178
3.41M
  return Format("GetLeaderMasterRpc(addrs: $0, num_attempts: $1)", addrs_, num_attempts());
179
3.41M
}
180
181
1.32M
void GetLeaderMasterRpc::SendRpc() {
182
1.32M
  auto self = shared_from_this();
183
184
1.32M
  size_t size = addrs_.size();
185
1.32M
  std::vector<rpc::Rpcs::Handle> handles;
186
1.32M
  handles.reserve(size);
187
1.32M
  {
188
1.32M
    std::lock_guard<simple_spinlock> l(lock_);
189
1.32M
    pending_responses_ = size;
190
5.87M
    for (size_t i = 0; i < size; i++) {
191
4.54M
      auto handle = rpcs_.Prepare();
192
4.54M
      if (handle == rpcs_.InvalidHandle()) {
193
0
        GetMasterRegistrationRpcCbForNode(i, STATUS(Aborted, "Stopping"), self, handle);
194
0
        continue;
195
0
      }
196
4.54M
      *handle = std::make_shared<GetMasterRegistrationRpc>(
197
4.54M
          std::bind(
198
4.54M
              &GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode, this, i, _1, self, handle),
199
4.54M
          addrs_[i],
200
4.54M
          retrier().deadline(),
201
4.54M
          retrier().messenger(),
202
4.54M
          &retrier().proxy_cache(),
203
4.54M
          &responses_[i]);
204
4.54M
      handles.push_back(handle);
205
4.54M
    }
206
1.32M
  }
207
208
4.54M
  for (const auto& handle : handles) {
209
4.54M
    (**handle).SendRpc();
210
4.54M
  }
211
1.32M
}
212
213
909k
void GetLeaderMasterRpc::Finished(const Status& status) {
214
  // If we've received replies from all of the nodes without finding
215
  // the leader, or if there were network errors talking to all of the
216
  // nodes the error is retriable and we can perform a delayed retry.
217
909k
  num_iters_++;
218
909k
  if (status.IsNetworkError() || (wait_for_leader_election_ && status.IsNotFound())) {
219
18.4E
    VLOG(4) << "About to retry operation due to error: " << status.ToString();
220
    // TODO (KUDU-573): Allow cancelling delayed tasks on reactor so
221
    // that we can safely use DelayedRetry here.
222
901k
    auto retry_status = mutable_retrier()->DelayedRetry(this, status);
223
901k
    if (!retry_status.ok()) {
224
0
      LOG(WARNING) << "Failed to schedule retry: " << retry_status;
225
901k
    } else {
226
901k
      return;
227
901k
    }
228
7.78k
  }
229
1.20k
  VLOG(4) << "Completed GetLeaderMasterRpc, calling callback with status "
230
1.20k
          << status.ToString();
231
7.78k
  {
232
7.78k
    std::lock_guard<simple_spinlock> l(lock_);
233
    // 'completed_' prevents 'user_cb_' from being invoked twice.
234
7.78k
    if (completed_) {
235
0
      return;
236
0
    }
237
7.78k
    completed_ = true;
238
7.78k
  }
239
7.78k
  auto callback = std::move(user_cb_);
240
7.78k
  user_cb_.Reset();
241
7.78k
  callback.Run(status, leader_master_);
242
7.78k
}
243
244
void GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode(
245
    size_t idx, const Status& status, const std::shared_ptr<rpc::RpcCommand>& self,
246
4.53M
    rpc::Rpcs::Handle handle) {
247
4.53M
  rpcs_.Unregister(handle);
248
249
  // TODO: handle the situation where one Master is partitioned from
250
  // the rest of the Master consensus configuration, all are reachable by the client,
251
  // and the partitioned node "thinks" it's the leader.
252
  //
253
  // The proper way to do so is to add term/index to the responses
254
  // from the Master, wait for majority of the Masters to respond, and
255
  // pick the one with the highest term/index as the leader.
256
4.53M
  Status new_status = status;
257
4.53M
  {
258
4.53M
    std::lock_guard<simple_spinlock> lock(lock_);
259
4.53M
    --pending_responses_;
260
4.53M
    if (completed_) {
261
      // If 'user_cb_' has been invoked (see Finished above), we can
262
      // stop.
263
422k
      return;
264
422k
    }
265
4.11M
    auto& resp = responses_[idx];
266
4.11M
    if (new_status.ok()) {
267
3.83M
      if (resp.role() != PeerRole::LEADER) {
268
        // Use a STATUS(NotFound, "") to indicate that the node is not
269
        // the leader: this way, we can handle the case where we've
270
        // received a reply from all of the nodes in the cluster (no
271
        // network or other errors encountered), but haven't found a
272
        // leader (which means that Finished() above can perform a
273
        // delayed retry).
274
275
        // If we have exceeded FLAGS_master_leader_rpc_timeout_ms, set this follower to be master
276
        // leader. This prevents an infinite retry loop when none of the master addresses passed in
277
        // are leaders.
278
3.41M
        if (should_timeout_to_follower_ &&
279
29.2k
            MonoTime::Now() - start_time_ >
280
567
            MonoDelta::FromMilliseconds(FLAGS_master_leader_rpc_timeout_ms)) {
281
567
          LOG(WARNING) << "More than " << FLAGS_master_leader_rpc_timeout_ms << " ms has passed, "
282
567
              "choosing to heartbeat to follower master " << resp.instance_id().permanent_uuid()
283
567
              << " after " << num_iters_ << " iterations of all masters.";
284
567
          leader_master_ = addrs_[idx];
285
3.41M
        } else {
286
3.41M
          new_status = STATUS(NotFound, "no leader found: " + ToString());
287
3.41M
        }
288
424k
      } else {
289
        // We've found a leader.
290
424k
        leader_master_ = addrs_[idx];
291
424k
      }
292
3.83M
    }
293
4.11M
    if (!new_status.ok()) {
294
3.70M
      if (pending_responses_ > 0) {
295
        // Don't call Finished() on error unless we're the last
296
        // outstanding response: calling Finished() will trigger
297
        // a delayed re-try, which don't need to do unless we've
298
        // been unable to find a leader so far.
299
2.79M
        return;
300
2.79M
      }
301
412k
    } else {
302
412k
      completed_ = true;
303
412k
    }
304
4.11M
  }
305
306
  // Called if the leader has been determined, or if we've received
307
  // all of the responses.
308
1.31M
  if (new_status.ok()) {
309
424k
    auto callback = std::move(user_cb_);
310
424k
    user_cb_.Reset();
311
424k
    callback.Run(new_status, leader_master_);
312
890k
  } else {
313
890k
    Finished(new_status);
314
890k
  }
315
1.31M
}
316
317
} // namespace master
318
} // namespace yb