/Users/deen/code/yugabyte-db/src/yb/client/forward_rpc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/forward_rpc.h" |
15 | | |
16 | | #include "yb/client/client.h" |
17 | | |
18 | | #include "yb/common/wire_protocol.h" |
19 | | |
20 | | #include "yb/tserver/tserver_service.proxy.h" |
21 | | |
22 | | #include "yb/util/cast.h" |
23 | | #include "yb/util/result.h" |
24 | | #include "yb/util/status_log.h" |
25 | | #include "yb/util/trace.h" |
26 | | |
27 | | using namespace std::placeholders; |
28 | | |
29 | | DECLARE_bool(rpc_dump_all_traces); |
30 | | DECLARE_bool(collect_end_to_end_traces); |
31 | | |
32 | | namespace yb { |
33 | | |
34 | | using std::shared_ptr; |
35 | | using std::string; |
36 | | using rpc::Rpc; |
37 | | using tserver::WriteRequestPB; |
38 | | using tserver::WriteResponsePB; |
39 | | using tserver::ReadRequestPB; |
40 | | using tserver::ReadResponsePB; |
41 | | using tserver::TabletServerErrorPB; |
42 | | |
43 | | namespace client { |
44 | | namespace internal { |
45 | | |
46 | 0 | static CoarseTimePoint ComputeDeadline() { |
47 | | // TODO(Sudheer) : Make sure we pass the deadline from the PGGate layer and use that here. |
48 | 0 | MonoDelta timeout = MonoDelta::FromSeconds(60); |
49 | 0 | return CoarseMonoClock::now() + timeout; |
50 | 0 | } |
51 | | |
52 | | template <class Req, class Resp> |
53 | | ForwardRpc<Req, Resp>::ForwardRpc(const Req *req, Resp *res, |
54 | | rpc::RpcContext&& context, |
55 | | YBConsistencyLevel consistency_level, |
56 | | YBClient *client) |
57 | | : Rpc(ComputeDeadline(), client->messenger(), &client->proxy_cache()), |
58 | | req_(req), |
59 | | res_(res), |
60 | | context_(std::move(context)), |
61 | | trace_(new Trace), |
62 | | start_(MonoTime::Now()), |
63 | | tablet_invoker_(false /* local_tserver_only */, |
64 | | consistency_level, |
65 | | client, |
66 | | this, |
67 | | this, |
68 | | nullptr /* tablet */, |
69 | | nullptr /* table */, |
70 | | mutable_retrier(), |
71 | 0 | trace_.get()) { |
72 | 0 | } Unexecuted instantiation: _ZN2yb6client8internal10ForwardRpcINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEEC2EPKS4_PS5_ONS_3rpc10RpcContextENS_18YBConsistencyLevelEPNS0_8YBClientE Unexecuted instantiation: _ZN2yb6client8internal10ForwardRpcINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEEC2EPKS4_PS5_ONS_3rpc10RpcContextENS_18YBConsistencyLevelEPNS0_8YBClientE |
73 | | |
74 | | template <class Req, class Resp> |
75 | 0 | ForwardRpc<Req, Resp>::~ForwardRpc() { |
76 | 0 | if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) { |
77 | 0 | LOG(INFO) << ToString() << " took " |
78 | 0 | << MonoTime::Now().GetDeltaSince(start_).ToMicroseconds() |
79 | 0 | << "us. Trace:"; |
80 | 0 | trace_->Dump(&LOG(INFO), true); |
81 | 0 | } |
82 | 0 | } Unexecuted instantiation: _ZN2yb6client8internal10ForwardRpcINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEED2Ev Unexecuted instantiation: _ZN2yb6client8internal10ForwardRpcINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEED2Ev |
83 | | |
84 | | template <class Req, class Resp> |
85 | 0 | string ForwardRpc<Req, Resp>::ToString() const { |
86 | 0 | return Format("$0(tablet: $1, num_attempts: $2)", |
87 | 0 | read_only() ? "Read" : "Write", |
88 | 0 | req_->tablet_id(), |
89 | 0 | num_attempts()); |
90 | 0 | } Unexecuted instantiation: _ZNK2yb6client8internal10ForwardRpcINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE8ToStringEv Unexecuted instantiation: _ZNK2yb6client8internal10ForwardRpcINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE8ToStringEv |
91 | | |
92 | | template <class Req, class Resp> |
93 | 0 | void ForwardRpc<Req, Resp>::SendRpc() { |
94 | 0 | TRACE_TO(trace_, "SendRpc() called."); |
95 | 0 | retained_self_ = shared_from_this(); |
96 | 0 | tablet_invoker_.Execute(req_->tablet_id(), num_attempts() > 1); |
97 | 0 | } Unexecuted instantiation: _ZN2yb6client8internal10ForwardRpcINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE7SendRpcEv Unexecuted instantiation: _ZN2yb6client8internal10ForwardRpcINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE7SendRpcEv |
98 | | |
99 | | template <class Req, class Resp> |
100 | 0 | void ForwardRpc<Req, Resp>::Finished(const Status& status) { |
101 | 0 | Status new_status = status; |
102 | 0 | if (tablet_invoker_.Done(&new_status)) { |
103 | 0 | if (new_status.ok()) { |
104 | 0 | PopulateResponse(); |
105 | 0 | } |
106 | 0 | context_.RespondSuccess(); |
107 | 0 | retained_self_.reset(); |
108 | 0 | } |
109 | 0 | } Unexecuted instantiation: _ZN2yb6client8internal10ForwardRpcINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE8FinishedERKNS_6StatusE Unexecuted instantiation: _ZN2yb6client8internal10ForwardRpcINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE8FinishedERKNS_6StatusE |
110 | | |
111 | | template <class Req, class Resp> |
112 | 0 | void ForwardRpc<Req, Resp>::Failed(const Status& status) { |
113 | 0 | TabletServerErrorPB *err = res_->mutable_error(); |
114 | 0 | StatusToPB(status, err->mutable_status()); |
115 | 0 | } Unexecuted instantiation: _ZN2yb6client8internal10ForwardRpcINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE6FailedERKNS_6StatusE Unexecuted instantiation: _ZN2yb6client8internal10ForwardRpcINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE6FailedERKNS_6StatusE |
116 | | |
117 | | ForwardWriteRpc::ForwardWriteRpc(const WriteRequestPB *req, |
118 | | WriteResponsePB *res, |
119 | | rpc::RpcContext&& context, |
120 | | YBClient *client) : |
121 | 0 | ForwardRpc(req, res, std::move(context), YBConsistencyLevel::STRONG, client) { |
122 | | |
123 | | // Ensure that only PGSQL operations are forwarded. |
124 | 0 | DCHECK(!req->redis_write_batch_size() && !req->ql_write_batch_size()); |
125 | 0 | } |
126 | | |
127 | 0 | ForwardWriteRpc::~ForwardWriteRpc() { |
128 | 0 | } |
129 | | |
130 | 0 | void ForwardWriteRpc::SendRpcToTserver(int attempt_num) { |
131 | 0 | auto trace = trace_; |
132 | 0 | TRACE_TO(trace, "SendRpcToTserver"); |
133 | 0 | ADOPT_TRACE(trace.get()); |
134 | |
|
135 | 0 | tablet_invoker_.proxy()->WriteAsync( |
136 | 0 | *req_, res_, PrepareController(), |
137 | 0 | std::bind(&ForwardWriteRpc::Finished, this, Status::OK())); |
138 | 0 | TRACE_TO(trace, "RpcDispatched Asynchronously"); |
139 | 0 | } |
140 | | |
141 | 0 | void ForwardWriteRpc::PopulateResponse() { |
142 | 0 | for (const auto& r : res_->pgsql_response_batch()) { |
143 | 0 | if (r.has_rows_data_sidecar()) { |
144 | 0 | Slice s = CHECK_RESULT(retrier().controller().GetSidecar(r.rows_data_sidecar())); |
145 | 0 | context_.AddRpcSidecar(s); |
146 | 0 | } |
147 | 0 | } |
148 | 0 | } |
149 | | |
150 | | ForwardReadRpc::ForwardReadRpc(const ReadRequestPB *req, |
151 | | ReadResponsePB *res, |
152 | | rpc::RpcContext&& context, |
153 | | YBClient *client) : |
154 | 0 | ForwardRpc(req, res, std::move(context), req->consistency_level(), client) { |
155 | | |
156 | | // Ensure that only PGSQL operations are forwarded. |
157 | 0 | DCHECK(!req->redis_batch_size() && !req->ql_batch_size()); |
158 | 0 | } |
159 | | |
160 | | |
161 | 0 | ForwardReadRpc::~ForwardReadRpc() { |
162 | 0 | } |
163 | | |
164 | 0 | void ForwardReadRpc::SendRpcToTserver(int attempt_num) { |
165 | 0 | auto trace = trace_; |
166 | 0 | TRACE_TO(trace, "SendRpcToTserver"); |
167 | 0 | ADOPT_TRACE(trace.get()); |
168 | |
|
169 | 0 | tablet_invoker_.proxy()->ReadAsync( |
170 | 0 | *req_, res_, PrepareController(), |
171 | 0 | std::bind(&ForwardReadRpc::Finished, this, Status::OK())); |
172 | 0 | TRACE_TO(trace, "RpcDispatched Asynchronously"); |
173 | 0 | } |
174 | | |
175 | 0 | void ForwardReadRpc::PopulateResponse() { |
176 | 0 | for (const auto& r : res_->pgsql_batch()) { |
177 | 0 | if (r.has_rows_data_sidecar()) { |
178 | 0 | Slice s = CHECK_RESULT(retrier().controller().GetSidecar(r.rows_data_sidecar())); |
179 | 0 | context_.AddRpcSidecar(s); |
180 | 0 | } |
181 | 0 | } |
182 | 0 | } |
183 | | |
184 | | } // namespace internal |
185 | | } // namespace client |
186 | | } // namespace yb |