YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/async_rpc.h
Line
Count
Source
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_CLIENT_ASYNC_RPC_H_
15
#define YB_CLIENT_ASYNC_RPC_H_
16
17
#include <boost/range/iterator_range_core.hpp>
18
#include <boost/version.hpp>
19
20
#include "yb/client/in_flight_op.h"
21
#include "yb/client/tablet_rpc.h"
22
23
#include "yb/common/common_types.pb.h"
24
#include "yb/common/read_hybrid_time.h"
25
26
#include "yb/rpc/rpc_fwd.h"
27
28
#include "yb/tserver/tserver.pb.h"
29
30
#include "yb/util/metrics_fwd.h"
31
32
namespace yb {
33
namespace client {
34
35
class YBTable;
36
class YBClient;
37
38
namespace internal {
39
40
class Batcher;
41
struct InFlightOp;
42
class RemoteTablet;
43
class RemoteTabletServer;
44
45
// Container for async rpc metrics
46
struct AsyncRpcMetrics {
47
  explicit AsyncRpcMetrics(const scoped_refptr<MetricEntity>& metric_entity);
48
49
  scoped_refptr<Histogram> remote_write_rpc_time;
50
  scoped_refptr<Histogram> remote_read_rpc_time;
51
  scoped_refptr<Histogram> local_write_rpc_time;
52
  scoped_refptr<Histogram> local_read_rpc_time;
53
  scoped_refptr<Histogram> time_to_send;
54
  scoped_refptr<Counter> consistent_prefix_successful_reads;
55
  scoped_refptr<Counter> consistent_prefix_failed_reads;
56
};
57
58
using InFlightOps = boost::iterator_range<std::vector<InFlightOp>::iterator>;
59
60
struct AsyncRpcData {
61
  BatcherPtr batcher;
62
  RemoteTablet* tablet = nullptr;
63
  bool allow_local_calls_in_curr_thread = false;
64
  bool need_consistent_read = false;
65
  InFlightOps ops;
66
  bool need_metadata = false;
67
};
68
69
struct FlushExtraResult {
70
  // Latest hybrid time that was present on tserver during processing of this request.
71
  HybridTime propagated_hybrid_time;
72
73
  // When read time was not specified by client it will contain read time that servers used
74
  // to process this request.
75
  ReadHybridTime used_read_time;
76
};
77
78
// An Async RPC which is in-flight to a tablet. Initially, the RPC is sent
79
// to the leader replica, but it may be retried with another replica if the
80
// leader fails.
81
//
82
// Keeps a reference on the owning batcher while alive. It doesn't take a generic callback,
83
// but ProcessResponseFromTserver will update the state after getting the end response.
84
// This class deletes itself after Rpc returns and is processed.
85
class AsyncRpc : public rpc::Rpc, public TabletRpc {
86
 public:
87
  AsyncRpc(const AsyncRpcData& data, YBConsistencyLevel consistency_level);
88
89
  virtual ~AsyncRpc();
90
91
  void SendRpc() override;
92
  string ToString() const override;
93
94
  std::shared_ptr<const YBTable> table() const;
95
7.44M
  const RemoteTablet& tablet() const { return *tablet_invoker_.tablet(); }
96
124k
  const InFlightOps& ops() const { return ops_; }
97
98
 protected:
99
  void Finished(const Status& status) override;
100
101
  void SendRpcToTserver(int attempt_num) override;
102
103
  virtual void CallRemoteMethod() = 0;
104
105
  // This is the last step where errors and responses are collected from the response and
106
  // stored in batcher. If there's a callback from the user, it is done in this step.
107
  virtual void ProcessResponseFromTserver(const Status& status) = 0;
108
109
  // See FlushExtraResult for details.
110
  virtual FlushExtraResult MakeFlushExtraResult() = 0;
111
112
  virtual void SwapResponses() = 0;
113
114
  void Failed(const Status& status) override;
115
116
  // Is this a local call?
117
  bool IsLocalCall() const;
118
119
  // Creates the Local node tablet invoker or remote tablet invoker based on the GFLAG
120
  // 'FLAGS_ysql_forward_rpcs_to_local_tserver'.
121
  TabletInvoker *GetTabletInvoker(AsyncRpcData *data, YBConsistencyLevel yb_consistency_level);
122
123
  // Pointer back to the batcher. Processes the write response when it
124
  // completes, regardless of success or failure.
125
  BatcherPtr batcher_;
126
127
  // Operations which were batched into this RPC.
128
  // These operations are in kRequestSent state.
129
  InFlightOps ops_;
130
131
  TabletInvoker tablet_invoker_;
132
133
  CoarseTimePoint start_;
134
  std::shared_ptr<AsyncRpcMetrics> async_rpc_metrics_;
135
  rpc::RpcCommandPtr retained_self_;
136
};
137
138
template <class Req, class Resp>
139
class AsyncRpcBase : public AsyncRpc {
140
 public:
141
  AsyncRpcBase(const AsyncRpcData& data, YBConsistencyLevel consistency_level);
142
  ~AsyncRpcBase();
143
144
3.80M
  const Resp& resp() const { return resp_; }
145
  Resp& resp() { return resp_; }
146
147
 protected:
148
  // Returns `true` if caller should continue processing response, `false` otherwise.
149
  bool CommonResponseCheck(const Status& status);
150
  void SendRpcToTserver(int attempt_num) override;
151
152
  virtual void NotifyBatcher(const Status& status) = 0;
153
154
  void ProcessResponseFromTserver(const Status& status) override;
155
156
 protected: // TODO replace with private
157
6.02M
  const tserver::TabletServerErrorPB* response_error() const override {
158
5.88M
    return resp_.has_error() ? &resp_.error() : nullptr;
159
6.02M
  }
_ZNK2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE14response_errorEv
Line
Count
Source
157
1.32M
  const tserver::TabletServerErrorPB* response_error() const override {
158
1.24M
    return resp_.has_error() ? &resp_.error() : nullptr;
159
1.32M
  }
_ZNK2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE14response_errorEv
Line
Count
Source
157
4.69M
  const tserver::TabletServerErrorPB* response_error() const override {
158
4.64M
    return resp_.has_error() ? &resp_.error() : nullptr;
159
4.69M
  }
160
161
  FlushExtraResult MakeFlushExtraResult() override;
162
163
  Req req_;
164
  Resp resp_;
165
};
166
167
class WriteRpc : public AsyncRpcBase<tserver::WriteRequestPB, tserver::WriteResponsePB> {
168
 public:
169
  // Relies on ops requests to be not on arena.
170
  explicit WriteRpc(const AsyncRpcData& data);
171
172
  virtual ~WriteRpc();
173
174
 private:
175
  void SwapResponses() override;
176
  void CallRemoteMethod() override;
177
  void NotifyBatcher(const Status& status) override;
178
  bool ShouldRetryExpiredRequest() override;
179
};
180
181
class ReadRpc : public AsyncRpcBase<tserver::ReadRequestPB, tserver::ReadResponsePB> {
182
 public:
183
  // Relies on ops requests to be not on arena.
184
  explicit ReadRpc(const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level);
185
186
  virtual ~ReadRpc();
187
188
 private:
189
  void SwapResponses() override;
190
  void CallRemoteMethod() override;
191
  void NotifyBatcher(const Status& status) override;
192
};
193
194
}  // namespace internal
195
}  // namespace client
196
}  // namespace yb
197
198
#endif  // YB_CLIENT_ASYNC_RPC_H_