YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/local_call.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/local_call.h"
17
18
#include "yb/gutil/casts.h"
19
20
#include "yb/rpc/rpc_controller.h"
21
#include "yb/rpc/rpc_header.messages.h"
22
23
#include "yb/util/format.h"
24
#include "yb/util/memory/memory.h"
25
#include "yb/util/result.h"
26
#include "yb/util/status_format.h"
27
28
namespace yb {
29
namespace rpc {
30
31
using std::shared_ptr;
32
33
LocalOutboundCall::LocalOutboundCall(
34
    const RemoteMethod* remote_method,
35
    const shared_ptr<OutboundCallMetrics>& outbound_call_metrics,
36
    AnyMessagePtr response_storage, RpcController* controller,
37
    std::shared_ptr<RpcMetrics> rpc_metrics, ResponseCallback callback)
38
    : OutboundCall(remote_method, outbound_call_metrics, /* method_metrics= */ nullptr,
39
                   response_storage, controller, std::move(rpc_metrics), std::move(callback),
40
10.8M
                   /* callback_thread_pool= */ nullptr) {
41
10.8M
  TRACE_TO(trace_, "LocalOutboundCall");
42
10.8M
}
43
44
Status LocalOutboundCall::SetRequestParam(
45
10.8M
    AnyMessageConstPtr req, const MemTrackerPtr& mem_tracker) {
46
10.8M
  req_ = req;
47
10.8M
  return Status::OK();
48
10.8M
}
49
50
0
void LocalOutboundCall::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) {
51
0
  LOG(FATAL) << "Local call should not require serialization";
52
0
}
53
54
10.8M
const std::shared_ptr<LocalYBInboundCall>& LocalOutboundCall::CreateLocalInboundCall() {
55
10.8M
  DCHECK(inbound_call_.get() == nullptr);
56
10.8M
  const MonoDelta timeout = controller()->timeout();
57
10.8M
  const CoarseTimePoint deadline =
58
18.4E
      
timeout.Initialized()10.8M
?
start_ + timeout10.8M
: CoarseTimePoint::max();
59
10.8M
  auto outbound_call = std::static_pointer_cast<LocalOutboundCall>(shared_from(this));
60
10.8M
  inbound_call_ = InboundCall::Create<LocalYBInboundCall>(
61
10.8M
      &rpc_metrics(), remote_method(), outbound_call, deadline);
62
10.8M
  return inbound_call_;
63
10.8M
}
64
65
7.12M
Result<const uint8_t*const*> LocalOutboundCall::GetSidecarPtr(size_t idx) const {
66
7.12M
  if (idx < 0 || 
idx >= inbound_call_->sidecars_.size()7.12M
) {
67
0
    return STATUS_FORMAT(InvalidArgument, "Index $0 does not reference a valid sidecar", idx);
68
0
  }
69
7.12M
  return inbound_call_->sidecar_pointers_[idx].data();
70
7.12M
}
71
72
3.40M
Result<SidecarHolder> LocalOutboundCall::GetSidecarHolder(size_t idx) const {
73
3.40M
  if (idx >= inbound_call_->sidecars_.size()) {
74
0
    return STATUS_FORMAT(InvalidArgument, "Index $0 does not reference a valid sidecar", idx);
75
0
  }
76
3.40M
  return SidecarHolder(inbound_call_->sidecars_[idx], inbound_call_->sidecars_[idx].AsSlice());
77
3.40M
}
78
79
LocalYBInboundCall::LocalYBInboundCall(
80
    RpcMetrics* rpc_metrics,
81
    const RemoteMethod& remote_method,
82
    std::weak_ptr<LocalOutboundCall> outbound_call,
83
    CoarseTimePoint deadline)
84
    : YBInboundCall(rpc_metrics, remote_method), outbound_call_(outbound_call),
85
10.8M
      deadline_(deadline) {
86
10.8M
}
87
88
7.41M
const Endpoint& LocalYBInboundCall::remote_address() const {
89
7.41M
  static const Endpoint endpoint;
90
7.41M
  return endpoint;
91
7.41M
}
92
93
0
const Endpoint& LocalYBInboundCall::local_address() const {
94
0
  static const Endpoint endpoint;
95
0
  return endpoint;
96
0
}
97
98
10.7M
void LocalYBInboundCall::Respond(AnyMessageConstPtr resp, bool is_success) {
99
10.7M
  auto call = outbound_call();
100
10.7M
  if (!call) {
101
0
    LOG(DFATAL) << "Outbound call is NULL during Respond, looks like double response. "
102
0
                << "is_success: " << is_success;
103
0
    return;
104
0
  }
105
106
10.8M
  
if (10.7M
is_success10.7M
) {
107
10.8M
    call->SetFinished();
108
18.4E
  } else {
109
18.4E
    std::unique_ptr<ErrorStatusPB> error;
110
18.4E
    if (resp.is_lightweight()) {
111
0
      error = std::make_unique<ErrorStatusPB>();
112
0
      yb::down_cast<const LWErrorStatusPB&>(*resp.lightweight()).ToGoogleProtobuf(error.get());
113
18.4E
    } else {
114
18.4E
      error = std::make_unique<ErrorStatusPB>(
115
18.4E
          yb::down_cast<const ErrorStatusPB&>(*resp.protobuf()));
116
18.4E
    }
117
18.4E
    auto status = STATUS(RemoteError, "Local call error", error->message());
118
18.4E
    call->SetFailed(std::move(status), std::move(error));
119
18.4E
  }
120
10.7M
}
121
122
0
Status LocalYBInboundCall::ParseParam(RpcCallParams* params) {
123
0
  LOG(FATAL) << "local call should not require parsing";
124
0
}
125
126
0
Result<size_t> LocalYBInboundCall::ParseRequest(Slice param) {
127
0
  return STATUS(InternalError, "ParseRequest called for local call");
128
0
}
129
130
10.8M
AnyMessageConstPtr LocalYBInboundCall::SerializableResponse() {
131
10.8M
  return outbound_call()->response();
132
10.8M
}
133
134
} // namespace rpc
135
} // namespace yb