YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/master_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
// This module is internal to the client and not a public API.
33
34
#include "yb/master/master_rpc.h"
35
36
#include <mutex>
37
38
#include "yb/common/wire_protocol.h"
39
#include "yb/common/wire_protocol.pb.h"
40
41
#include "yb/master/master_cluster.proxy.h"
42
43
#include "yb/util/async_util.h"
44
#include "yb/util/flag_tags.h"
45
#include "yb/util/net/net_util.h"
46
47
using std::shared_ptr;
48
using std::string;
49
using std::vector;
50
51
using yb::consensus::RaftPeerPB;
52
using yb::rpc::Messenger;
53
using yb::rpc::Rpc;
54
55
using namespace std::placeholders;
56
57
DEFINE_int32(master_leader_rpc_timeout_ms, 500,
58
             "Number of milliseconds that the tserver will keep querying for master leader before"
59
             "selecting a follower.");
60
TAG_FLAG(master_leader_rpc_timeout_ms, advanced);
61
TAG_FLAG(master_leader_rpc_timeout_ms, hidden);
62
63
namespace yb {
64
namespace master {
65
66
namespace {
67
68
////////////////////////////////////////////////////////////
69
// GetMasterRegistrationRpc
70
////////////////////////////////////////////////////////////
71
72
// An RPC for getting a Master server's registration.
73
class GetMasterRegistrationRpc: public rpc::Rpc {
74
 public:
75
76
  // Create a wrapper object for a retriable GetMasterRegistration RPC
77
  // to 'addr'. The result is stored in 'out', which must be a valid
78
  // pointer for the lifetime of this object.
79
  //
80
  // Invokes 'user_cb' upon failure or success of the RPC call.
81
  GetMasterRegistrationRpc(StatusFunctor user_cb,
82
                           const HostPort& addr,
83
                           CoarseTimePoint deadline,
84
                           rpc::Messenger* messenger,
85
                           rpc::ProxyCache* proxy_cache,
86
                           ServerEntryPB* out)
87
      : Rpc(deadline, messenger, proxy_cache),
88
        user_cb_(std::move(user_cb)),
89
        addr_(addr),
90
33.9M
        out_(DCHECK_NOTNULL(out)) {}
91
92
  void SendRpc() override;
93
94
  std::string ToString() const override;
95
96
 private:
97
  void Finished(const Status& status) override;
98
99
  StatusFunctor user_cb_;
100
  HostPort addr_;
101
102
  ServerEntryPB* out_;
103
104
  GetMasterRegistrationResponsePB resp_;
105
};
106
107
33.9M
void GetMasterRegistrationRpc::SendRpc() {
108
33.9M
  MasterClusterProxy proxy(&retrier().proxy_cache(), addr_);
109
33.9M
  GetMasterRegistrationRequestPB req;
110
33.9M
  proxy.GetMasterRegistrationAsync(
111
33.9M
      req, &resp_, PrepareController(),
112
33.9M
      std::bind(&GetMasterRegistrationRpc::Finished, this, Status::OK()));
113
33.9M
}
114
115
1.03k
string GetMasterRegistrationRpc::ToString() const {
116
1.03k
  return Format("GetMasterRegistrationRpc(address: $0, num_attempts: $1, retries: $2)",
117
1.03k
                addr_, num_attempts(), retrier());
118
1.03k
}
119
120
33.8M
void GetMasterRegistrationRpc::Finished(const Status& status) {
121
33.8M
  Status new_status = status;
122
33.8M
  if (new_status.ok() && 
mutable_retrier()->HandleResponse(this, &new_status)33.8M
) {
123
1.03k
    return;
124
1.03k
  }
125
33.8M
  if (new_status.ok() && 
resp_.has_error()31.2M
) {
126
512k
    if (resp_.error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
127
      // If CatalogManager is not initialized, treat the node as a
128
      // FOLLOWER for the time being, as currently this RPC is only
129
      // used for the purposes of finding the leader master.
130
511k
      resp_.set_role(PeerRole::FOLLOWER);
131
511k
      new_status = Status::OK();
132
511k
    } else {
133
414
      out_->mutable_error()->CopyFrom(resp_.error().status());
134
414
      new_status = StatusFromPB(resp_.error().status());
135
414
    }
136
512k
  }
137
33.8M
  if (new_status.ok()) {
138
31.2M
    out_->mutable_instance_id()->CopyFrom(resp_.instance_id());
139
31.2M
    out_->mutable_registration()->CopyFrom(resp_.registration());
140
31.2M
    out_->set_role(resp_.role());
141
31.2M
  }
142
33.8M
  auto callback = std::move(user_cb_);
143
33.8M
  callback(new_status);
144
33.8M
}
145
146
} // namespace
147
148
////////////////////////////////////////////////////////////
149
// GetLeaderMasterRpc
150
////////////////////////////////////////////////////////////
151
152
GetLeaderMasterRpc::GetLeaderMasterRpc(LeaderCallback user_cb,
153
                                       const server::MasterAddresses& addrs,
154
                                       CoarseTimePoint deadline,
155
                                       Messenger* messenger,
156
                                       rpc::ProxyCache* proxy_cache,
157
                                       rpc::Rpcs* rpcs,
158
                                       bool should_timeout_to_follower,
159
                                       bool wait_for_leader_election)
160
    : Rpc(deadline, messenger, proxy_cache),
161
      user_cb_(std::move(user_cb)),
162
      rpcs_(*rpcs),
163
      should_timeout_to_follower_(should_timeout_to_follower),
164
882k
      wait_for_leader_election_(wait_for_leader_election) {
165
882k
  DCHECK(deadline != CoarseTimePoint::max());
166
167
2.59M
  for (const auto& list : addrs) {
168
2.59M
    addrs_.insert(addrs_.end(), list.begin(), list.end());
169
2.59M
  }
170
  // Using resize instead of reserve to explicitly initialized the values.
171
882k
  responses_.resize(addrs_.size());
172
882k
}
173
174
880k
GetLeaderMasterRpc::~GetLeaderMasterRpc() {
175
880k
}
176
177
29.3M
string GetLeaderMasterRpc::ToString() const {
178
29.3M
  return Format("GetLeaderMasterRpc(addrs: $0, num_attempts: $1)", addrs_, num_attempts());
179
29.3M
}
180
181
11.0M
void GetLeaderMasterRpc::SendRpc() {
182
11.0M
  auto self = shared_from_this();
183
184
11.0M
  size_t size = addrs_.size();
185
11.0M
  std::vector<rpc::Rpcs::Handle> handles;
186
11.0M
  handles.reserve(size);
187
11.0M
  {
188
11.0M
    std::lock_guard<simple_spinlock> l(lock_);
189
11.0M
    pending_responses_ = size;
190
44.9M
    for (size_t i = 0; i < size; 
i++33.9M
) {
191
33.9M
      auto handle = rpcs_.Prepare();
192
33.9M
      if (handle == rpcs_.InvalidHandle()) {
193
0
        GetMasterRegistrationRpcCbForNode(i, STATUS(Aborted, "Stopping"), self, handle);
194
0
        continue;
195
0
      }
196
33.9M
      *handle = std::make_shared<GetMasterRegistrationRpc>(
197
33.9M
          std::bind(
198
33.9M
              &GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode, this, i, _1, self, handle),
199
33.9M
          addrs_[i],
200
33.9M
          retrier().deadline(),
201
33.9M
          retrier().messenger(),
202
33.9M
          &retrier().proxy_cache(),
203
33.9M
          &responses_[i]);
204
33.9M
      handles.push_back(handle);
205
33.9M
    }
206
11.0M
  }
207
208
33.9M
  for (const auto& handle : handles) {
209
33.9M
    (**handle).SendRpc();
210
33.9M
  }
211
11.0M
}
212
213
10.1M
void GetLeaderMasterRpc::Finished(const Status& status) {
214
  // If we've received replies from all of the nodes without finding
215
  // the leader, or if there were network errors talking to all of the
216
  // nodes the error is retriable and we can perform a delayed retry.
217
10.1M
  num_iters_++;
218
10.1M
  if (status.IsNetworkError() || 
(10.0M
wait_for_leader_election_10.0M
&&
status.IsNotFound()10.0M
)) {
219
18.4E
    VLOG(4) << "About to retry operation due to error: " << status.ToString();
220
    // TODO (KUDU-573): Allow cancelling delayed tasks on reactor so
221
    // that we can safely use DelayedRetry here.
222
10.1M
    auto retry_status = mutable_retrier()->DelayedRetry(this, status);
223
10.1M
    if (!retry_status.ok()) {
224
0
      LOG(WARNING) << "Failed to schedule retry: " << retry_status;
225
10.1M
    } else {
226
10.1M
      return;
227
10.1M
    }
228
10.1M
  }
229
25.7k
  VLOG(4) << "Completed GetLeaderMasterRpc, calling callback with status "
230
2.40k
          << status.ToString();
231
25.7k
  {
232
25.7k
    std::lock_guard<simple_spinlock> l(lock_);
233
    // 'completed_' prevents 'user_cb_' from being invoked twice.
234
25.7k
    if (completed_) {
235
0
      return;
236
0
    }
237
25.7k
    completed_ = true;
238
25.7k
  }
239
0
  auto callback = std::move(user_cb_);
240
25.7k
  user_cb_.Reset();
241
25.7k
  callback.Run(status, leader_master_);
242
25.7k
}
243
244
void GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode(
245
    size_t idx, const Status& status, const std::shared_ptr<rpc::RpcCommand>& self,
246
33.7M
    rpc::Rpcs::Handle handle) {
247
33.7M
  rpcs_.Unregister(handle);
248
249
  // TODO: handle the situation where one Master is partitioned from
250
  // the rest of the Master consensus configuration, all are reachable by the client,
251
  // and the partitioned node "thinks" it's the leader.
252
  //
253
  // The proper way to do so is to add term/index to the responses
254
  // from the Master, wait for majority of the Masters to respond, and
255
  // pick the one with the highest term/index as the leader.
256
33.7M
  Status new_status = status;
257
33.7M
  {
258
33.7M
    std::lock_guard<simple_spinlock> lock(lock_);
259
33.7M
    --pending_responses_;
260
33.7M
    if (completed_) {
261
      // If 'user_cb_' has been invoked (see Finished above), we can
262
      // stop.
263
1.17M
      return;
264
1.17M
    }
265
32.5M
    auto& resp = responses_[idx];
266
32.5M
    if (new_status.ok()) {
267
30.2M
      if (resp.role() != PeerRole::LEADER) {
268
        // Use a STATUS(NotFound, "") to indicate that the node is not
269
        // the leader: this way, we can handle the case where we've
270
        // received a reply from all of the nodes in the cluster (no
271
        // network or other errors encountered), but haven't found a
272
        // leader (which means that Finished() above can perform a
273
        // delayed retry).
274
275
        // If we have exceeded FLAGS_master_leader_rpc_timeout_ms, set this follower to be master
276
        // leader. This prevents an infinite retry loop when none of the master addresses passed in
277
        // are leaders.
278
29.7M
        if (should_timeout_to_follower_ &&
279
29.7M
            MonoTime::Now() - start_time_ >
280
24.7M
            MonoDelta::FromMilliseconds(FLAGS_master_leader_rpc_timeout_ms)) {
281
398k
          LOG(WARNING) << "More than " << FLAGS_master_leader_rpc_timeout_ms << " ms has passed, "
282
398k
              "choosing to heartbeat to follower master " << resp.instance_id().permanent_uuid()
283
398k
              << " after " << num_iters_ << " iterations of all masters.";
284
398k
          leader_master_ = addrs_[idx];
285
29.3M
        } else {
286
29.3M
          new_status = STATUS(NotFound, "no leader found: " + ToString());
287
29.3M
        }
288
29.7M
      } else {
289
        // We've found a leader.
290
461k
        leader_master_ = addrs_[idx];
291
461k
      }
292
30.2M
    }
293
32.5M
    if (!new_status.ok()) {
294
31.9M
      if (pending_responses_ > 0) {
295
        // Don't call Finished() on error unless we're the last
296
        // outstanding response: calling Finished() will trigger
297
        // a delayed re-try, which don't need to do unless we've
298
        // been unable to find a leader so far.
299
21.7M
        return;
300
21.7M
      }
301
31.9M
    } else {
302
675k
      completed_ = true;
303
675k
    }
304
32.5M
  }
305
306
  // Called if the leader has been determined, or if we've received
307
  // all of the responses.
308
10.8M
  if (new_status.ok()) {
309
857k
    auto callback = std::move(user_cb_);
310
857k
    user_cb_.Reset();
311
857k
    callback.Run(new_status, leader_master_);
312
9.98M
  } else {
313
9.98M
    Finished(new_status);
314
9.98M
  }
315
10.8M
}
316
317
} // namespace master
318
} // namespace yb