YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_RPC_RPC_H
33
#define YB_RPC_RPC_H
34
35
#include <atomic>
36
#include <future>
37
#include <memory>
38
#include <string>
39
40
#include <boost/container/stable_vector.hpp>
41
#include <boost/optional/optional.hpp>
42
43
#include "yb/rpc/rpc_controller.h"
44
45
#include "yb/util/enums.h"
46
#include "yb/util/monotime.h"
47
48
namespace yb {
49
50
class Trace;
51
52
namespace rpc {
53
54
class Messenger;
55
class Rpc;
56
57
// The command that could be retried by RpcRetrier.
58
class RpcCommand : public std::enable_shared_from_this<RpcCommand> {
59
 public:
60
  RpcCommand();
61
62
  // Asynchronously sends the RPC to the remote end.
63
  //
64
  // Subclasses should use Finished() below as the callback function.
65
  virtual void SendRpc() = 0;
66
67
  // Returns a string representation of the RPC.
68
  virtual std::string ToString() const = 0;
69
70
  // Callback for SendRpc(). If 'status' is not OK, something failed
71
  // before the RPC was sent.
72
  virtual void Finished(const Status& status) = 0;
73
74
  virtual void Abort() = 0;
75
76
  virtual CoarseTimePoint deadline() const = 0;
77
78
782k
  Trace* trace() { return trace_.get(); }
79
80
 protected:
81
  virtual ~RpcCommand();
82
83
  // The trace buffer.
84
  scoped_refptr<Trace> trace_;
85
};
86
87
YB_DEFINE_ENUM(RpcRetrierState, (kIdle)(kRunning)(kScheduling)(kWaiting)(kFinished));
88
YB_DEFINE_ENUM(BackoffStrategy, (kLinear)(kExponential));
89
YB_STRONGLY_TYPED_BOOL(RetryWhenBusy);
90
91
// Provides utilities for retrying failed RPCs.
92
//
93
// All RPCs should use HandleResponse() to retry certain generic errors.
94
class RpcRetrier {
95
 public:
96
  RpcRetrier(CoarseTimePoint deadline, Messenger* messenger, ProxyCache *proxy_cache);
97
98
  ~RpcRetrier();
99
100
  // Tries to handle a failed RPC.
101
  //
102
  // If it was handled (e.g. scheduled for retry in the future), returns
103
  // true. In this case, callers should ensure that 'rpc' remains alive.
104
  //
105
  // Otherwise, returns false and writes the controller status to
106
  // 'out_status'.
107
  // retry_when_busy should be set to false if user does not want to retry request when server
108
  // returned that he is busy.
109
  bool HandleResponse(RpcCommand* rpc, Status* out_status,
110
                      RetryWhenBusy retry_when_busy = RetryWhenBusy::kTrue);
111
112
  // Retries an RPC at some point in the near future. If 'why_status' is not OK,
113
  // records it as the most recent error causing the RPC to retry. This is
114
  // reported to the caller eventually if the RPC never succeeds.
115
  //
116
  // If the RPC's deadline expires, the callback will fire with a timeout
117
  // error when the RPC comes up for retrying. This is true even if the
118
  // deadline has already expired at the time that Retry() was called.
119
  //
120
  // Callers should ensure that 'rpc' remains alive.
121
  CHECKED_STATUS DelayedRetry(
122
      RpcCommand* rpc, const Status& why_status,
123
      BackoffStrategy strategy = BackoffStrategy::kLinear);
124
125
  CHECKED_STATUS DelayedRetry(
126
      RpcCommand* rpc, const Status& why_status, MonoDelta add_delay);
127
128
6.84M
  RpcController* mutable_controller() { return &controller_; }
129
7.52M
  const RpcController& controller() const { return controller_; }
130
131
  // Sets up deadline and returns controller.
132
  // Do not forget that setting deadline in RpcController is NOT thread safe.
133
  RpcController* PrepareController();
134
135
6.46M
  CoarseTimePoint deadline() const { return deadline_; }
136
137
4.54M
  rpc::Messenger* messenger() const {
138
4.54M
    return messenger_;
139
4.54M
  }
140
141
9.08M
  ProxyCache& proxy_cache() const {
142
9.08M
    return proxy_cache_;
143
9.08M
  }
144
145
23.0M
  int attempt_num() const { return attempt_num_; }
146
147
  void Abort();
148
149
  std::string ToString() const;
150
151
7.29M
  bool finished() const {
152
7.29M
    return state_.load(std::memory_order_acquire) == RpcRetrierState::kFinished;
153
7.29M
  }
154
155
0
  CoarseTimePoint start() const {
156
0
    return start_;
157
0
  }
158
159
 private:
160
  CHECKED_STATUS DoDelayedRetry(RpcCommand* rpc, const Status& why_status);
161
162
  // Called when an RPC comes up for retrying. Actually sends the RPC.
163
  void DoRetry(RpcCommand* rpc, const Status& status);
164
165
  // The next sent rpc will be the nth attempt (indexed from 1).
166
  int attempt_num_ = 1;
167
168
  // Delay used for the last DelayedRetry. Depends on argument history of DelayedRetry calls.
169
  MonoDelta retry_delay_ = MonoDelta::kZero;
170
171
  const CoarseTimePoint start_;
172
173
  // If the remote end is busy, the RPC will be retried (with a small
174
  // delay) until this deadline is reached.
175
  //
176
  // May be uninitialized.
177
  const CoarseTimePoint deadline_;
178
179
  // Messenger to use when sending the RPC.
180
  Messenger* messenger_ = nullptr;
181
182
  ProxyCache& proxy_cache_;
183
184
  // RPC controller to use when sending the RPC.
185
  RpcController controller_;
186
187
  // In case any retries have already happened, remembers the last error.
188
  // Errors from the server take precedence over timeout errors.
189
  Status last_error_;
190
191
  std::atomic<ScheduledTaskId> task_id_{kInvalidTaskId};
192
193
  std::atomic<RpcRetrierState> state_{RpcRetrierState::kIdle};
194
195
  DISALLOW_COPY_AND_ASSIGN(RpcRetrier);
196
};
197
198
// An in-flight remote procedure call to some server.
199
class Rpc : public RpcCommand {
200
 public:
201
  Rpc(CoarseTimePoint deadline, Messenger* messenger, ProxyCache* proxy_cache)
202
12.6M
      : retrier_(deadline, messenger, proxy_cache) {
203
12.6M
  }
204
205
12.6M
  virtual ~Rpc() {}
206
207
  // Returns the number of times this RPC has been sent. Will always be at
208
  // least one.
209
15.5M
  int num_attempts() const { return retrier().attempt_num(); }
210
247k
  CoarseTimePoint deadline() const override { return retrier_.deadline(); }
211
212
247k
  void Abort() override {
213
247k
    retrier_.Abort();
214
247k
  }
215
216
  void ScheduleRetry(const Status& status);
217
 protected:
218
42.2M
  const RpcRetrier& retrier() const { return retrier_; }
219
20.0M
  RpcRetrier* mutable_retrier() { return &retrier_; }
220
11.8M
  RpcController* PrepareController() {
221
11.8M
    return retrier_.PrepareController();
222
11.8M
  }
223
224
 private:
225
  friend class RpcRetrier;
226
227
  // Used to retry some failed RPCs.
228
  RpcRetrier retrier_;
229
230
  DISALLOW_COPY_AND_ASSIGN(Rpc);
231
};
232
233
YB_STRONGLY_TYPED_BOOL(RequestShutdown);
234
235
class Rpcs {
236
 public:
237
  explicit Rpcs(std::mutex* mutex = nullptr);
238
430k
  ~Rpcs() { Shutdown(); }
239
240
  typedef boost::container::stable_vector<rpc::RpcCommandPtr> Calls;
241
  typedef Calls::iterator Handle;
242
243
  void Shutdown();
244
  Handle Register(RpcCommandPtr call);
245
  void Register(RpcCommandPtr call, Handle* handle);
246
  bool RegisterAndStart(RpcCommandPtr call, Handle* handle);
247
  RpcCommandPtr Unregister(Handle* handle);
248
  void Abort(std::initializer_list<Handle*> list);
249
  // Request all active calls to abort.
250
  void RequestAbortAll();
251
  Rpcs::Handle Prepare();
252
253
5.04M
  RpcCommandPtr Unregister(Handle handle) {
254
5.04M
    return Unregister(&handle);
255
5.04M
  }
256
257
9.81M
  Handle InvalidHandle() { return calls_.end(); }
258
259
 private:
260
  // Requests all active calls to abort. Returns deadline for waiting on abort completion.
261
  // If shutdown is true - switches Rpcs to shutting down state.
262
  CoarseTimePoint DoRequestAbortAll(RequestShutdown shutdown);
263
264
  boost::optional<std::mutex> mutex_holder_;
265
  std::mutex* mutex_;
266
  std::condition_variable cond_;
267
  Calls calls_;
268
  bool shutdown_ = false;
269
};
270
271
template <class Value>
272
class RpcFutureCallback {
273
 public:
274
  RpcFutureCallback(Rpcs::Handle handle,
275
                    Rpcs* rpcs,
276
                    std::shared_ptr<std::promise<Result<Value>>> promise)
277
0
      : rpcs_(rpcs), handle_(handle), promise_(std::move(promise)) {}
278
279
0
  void operator()(const Status& status, Value value) const {
280
0
    rpcs_->Unregister(handle_);
281
0
    if (status.ok()) {
282
0
      promise_->set_value(std::move(value));
283
0
    } else {
284
0
      promise_->set_value(status);
285
0
    }
286
0
  }
287
 private:
288
  Rpcs* rpcs_;
289
  Rpcs::Handle handle_;
290
  std::shared_ptr<std::promise<Result<Value>>> promise_;
291
};
292
293
template <class Value, class Functor>
294
class WrappedRpcFuture {
295
 public:
296
0
  WrappedRpcFuture(const Functor& functor, Rpcs* rpcs) : functor_(functor), rpcs_(rpcs) {}
297
298
  template <class... Args>
299
0
  std::future<Result<Value>> operator()(Args&&... args) const {
300
0
    auto promise = std::make_shared<std::promise<Result<Value>>>();
301
0
    auto future = promise->get_future();
302
0
    auto handle = rpcs_->Prepare();
303
0
    if (handle == rpcs_->InvalidHandle()) {
304
0
      promise->set_value(STATUS(Aborted, "Rpcs aborted"));
305
0
      return future;
306
0
    }
307
0
    *handle = functor_(std::forward<Args>(args)...,
308
0
                       RpcFutureCallback<Value>(handle, rpcs_, promise));
309
0
    (**handle).SendRpc();
310
0
    return future;
311
0
  }
312
 private:
313
  Functor* functor_;
314
  Rpcs* rpcs_;
315
};
316
317
template <class Value, class Functor>
318
0
WrappedRpcFuture<Value, Functor> WrapRpcFuture(const Functor& functor, Rpcs* rpcs) {
319
0
  return WrappedRpcFuture<Value, Functor>(functor, rpcs);
320
0
}
321
322
} // namespace rpc
323
} // namespace yb
324
325
#endif // YB_RPC_RPC_H