/Users/deen/code/yugabyte-db/src/yb/rpc/local_call.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/rpc/local_call.h" |
17 | | |
18 | | #include "yb/gutil/casts.h" |
19 | | |
20 | | #include "yb/rpc/rpc_controller.h" |
21 | | #include "yb/rpc/rpc_header.messages.h" |
22 | | |
23 | | #include "yb/util/format.h" |
24 | | #include "yb/util/memory/memory.h" |
25 | | #include "yb/util/result.h" |
26 | | #include "yb/util/status_format.h" |
27 | | |
28 | | namespace yb { |
29 | | namespace rpc { |
30 | | |
31 | | using std::shared_ptr; |
32 | | |
33 | | LocalOutboundCall::LocalOutboundCall( |
34 | | const RemoteMethod* remote_method, |
35 | | const shared_ptr<OutboundCallMetrics>& outbound_call_metrics, |
36 | | AnyMessagePtr response_storage, RpcController* controller, |
37 | | std::shared_ptr<RpcMetrics> rpc_metrics, ResponseCallback callback) |
38 | | : OutboundCall(remote_method, outbound_call_metrics, /* method_metrics= */ nullptr, |
39 | | response_storage, controller, std::move(rpc_metrics), std::move(callback), |
40 | 10.8M | /* callback_thread_pool= */ nullptr) { |
41 | 10.8M | TRACE_TO(trace_, "LocalOutboundCall"); |
42 | 10.8M | } |
43 | | |
44 | | Status LocalOutboundCall::SetRequestParam( |
45 | 10.8M | AnyMessageConstPtr req, const MemTrackerPtr& mem_tracker) { |
46 | 10.8M | req_ = req; |
47 | 10.8M | return Status::OK(); |
48 | 10.8M | } |
49 | | |
50 | 0 | void LocalOutboundCall::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) { |
51 | 0 | LOG(FATAL) << "Local call should not require serialization"; |
52 | 0 | } |
53 | | |
54 | 10.8M | const std::shared_ptr<LocalYBInboundCall>& LocalOutboundCall::CreateLocalInboundCall() { |
55 | 10.8M | DCHECK(inbound_call_.get() == nullptr); |
56 | 10.8M | const MonoDelta timeout = controller()->timeout(); |
57 | 10.8M | const CoarseTimePoint deadline = |
58 | 18.4E | timeout.Initialized()10.8M ? start_ + timeout10.8M : CoarseTimePoint::max(); |
59 | 10.8M | auto outbound_call = std::static_pointer_cast<LocalOutboundCall>(shared_from(this)); |
60 | 10.8M | inbound_call_ = InboundCall::Create<LocalYBInboundCall>( |
61 | 10.8M | &rpc_metrics(), remote_method(), outbound_call, deadline); |
62 | 10.8M | return inbound_call_; |
63 | 10.8M | } |
64 | | |
65 | 7.12M | Result<const uint8_t*const*> LocalOutboundCall::GetSidecarPtr(size_t idx) const { |
66 | 7.12M | if (idx < 0 || idx >= inbound_call_->sidecars_.size()7.12M ) { |
67 | 0 | return STATUS_FORMAT(InvalidArgument, "Index $0 does not reference a valid sidecar", idx); |
68 | 0 | } |
69 | 7.12M | return inbound_call_->sidecar_pointers_[idx].data(); |
70 | 7.12M | } |
71 | | |
72 | 3.40M | Result<SidecarHolder> LocalOutboundCall::GetSidecarHolder(size_t idx) const { |
73 | 3.40M | if (idx >= inbound_call_->sidecars_.size()) { |
74 | 0 | return STATUS_FORMAT(InvalidArgument, "Index $0 does not reference a valid sidecar", idx); |
75 | 0 | } |
76 | 3.40M | return SidecarHolder(inbound_call_->sidecars_[idx], inbound_call_->sidecars_[idx].AsSlice()); |
77 | 3.40M | } |
78 | | |
79 | | LocalYBInboundCall::LocalYBInboundCall( |
80 | | RpcMetrics* rpc_metrics, |
81 | | const RemoteMethod& remote_method, |
82 | | std::weak_ptr<LocalOutboundCall> outbound_call, |
83 | | CoarseTimePoint deadline) |
84 | | : YBInboundCall(rpc_metrics, remote_method), outbound_call_(outbound_call), |
85 | 10.8M | deadline_(deadline) { |
86 | 10.8M | } |
87 | | |
88 | 7.41M | const Endpoint& LocalYBInboundCall::remote_address() const { |
89 | 7.41M | static const Endpoint endpoint; |
90 | 7.41M | return endpoint; |
91 | 7.41M | } |
92 | | |
93 | 0 | const Endpoint& LocalYBInboundCall::local_address() const { |
94 | 0 | static const Endpoint endpoint; |
95 | 0 | return endpoint; |
96 | 0 | } |
97 | | |
98 | 10.7M | void LocalYBInboundCall::Respond(AnyMessageConstPtr resp, bool is_success) { |
99 | 10.7M | auto call = outbound_call(); |
100 | 10.7M | if (!call) { |
101 | 0 | LOG(DFATAL) << "Outbound call is NULL during Respond, looks like double response. " |
102 | 0 | << "is_success: " << is_success; |
103 | 0 | return; |
104 | 0 | } |
105 | | |
106 | 10.8M | if (10.7M is_success10.7M ) { |
107 | 10.8M | call->SetFinished(); |
108 | 18.4E | } else { |
109 | 18.4E | std::unique_ptr<ErrorStatusPB> error; |
110 | 18.4E | if (resp.is_lightweight()) { |
111 | 0 | error = std::make_unique<ErrorStatusPB>(); |
112 | 0 | yb::down_cast<const LWErrorStatusPB&>(*resp.lightweight()).ToGoogleProtobuf(error.get()); |
113 | 18.4E | } else { |
114 | 18.4E | error = std::make_unique<ErrorStatusPB>( |
115 | 18.4E | yb::down_cast<const ErrorStatusPB&>(*resp.protobuf())); |
116 | 18.4E | } |
117 | 18.4E | auto status = STATUS(RemoteError, "Local call error", error->message()); |
118 | 18.4E | call->SetFailed(std::move(status), std::move(error)); |
119 | 18.4E | } |
120 | 10.7M | } |
121 | | |
122 | 0 | Status LocalYBInboundCall::ParseParam(RpcCallParams* params) { |
123 | 0 | LOG(FATAL) << "local call should not require parsing"; |
124 | 0 | } |
125 | | |
126 | 0 | Result<size_t> LocalYBInboundCall::ParseRequest(Slice param) { |
127 | 0 | return STATUS(InternalError, "ParseRequest called for local call"); |
128 | 0 | } |
129 | | |
130 | 10.8M | AnyMessageConstPtr LocalYBInboundCall::SerializableResponse() { |
131 | 10.8M | return outbound_call()->response(); |
132 | 10.8M | } |
133 | | |
134 | | } // namespace rpc |
135 | | } // namespace yb |