YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/local_call.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/local_call.h"
17
18
#include "yb/gutil/casts.h"
19
20
#include "yb/rpc/rpc_controller.h"
21
#include "yb/rpc/rpc_header.messages.h"
22
23
#include "yb/util/format.h"
24
#include "yb/util/memory/memory.h"
25
#include "yb/util/result.h"
26
#include "yb/util/status_format.h"
27
28
namespace yb {
29
namespace rpc {
30
31
using std::shared_ptr;
32
33
LocalOutboundCall::LocalOutboundCall(
34
    const RemoteMethod* remote_method,
35
    const shared_ptr<OutboundCallMetrics>& outbound_call_metrics,
36
    AnyMessagePtr response_storage, RpcController* controller,
37
    std::shared_ptr<RpcMetrics> rpc_metrics, ResponseCallback callback)
38
    : OutboundCall(remote_method, outbound_call_metrics, /* method_metrics= */ nullptr,
39
                   response_storage, controller, std::move(rpc_metrics), std::move(callback),
40
5.39M
                   /* callback_thread_pool= */ nullptr) {
41
5.39M
  TRACE_TO(trace_, "LocalOutboundCall");
42
5.39M
}
43
44
Status LocalOutboundCall::SetRequestParam(
45
5.58M
    AnyMessageConstPtr req, const MemTrackerPtr& mem_tracker) {
46
5.58M
  req_ = req;
47
5.58M
  return Status::OK();
48
5.58M
}
49
50
0
void LocalOutboundCall::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) {
51
0
  LOG(FATAL) << "Local call should not require serialization";
52
0
}
53
54
5.59M
const std::shared_ptr<LocalYBInboundCall>& LocalOutboundCall::CreateLocalInboundCall() {
55
5.59M
  DCHECK(inbound_call_.get() == nullptr);
56
5.59M
  const MonoDelta timeout = controller()->timeout();
57
5.59M
  const CoarseTimePoint deadline =
58
18.4E
      timeout.Initialized() ? start_ + timeout : CoarseTimePoint::max();
59
5.59M
  auto outbound_call = std::static_pointer_cast<LocalOutboundCall>(shared_from(this));
60
5.59M
  inbound_call_ = InboundCall::Create<LocalYBInboundCall>(
61
5.59M
      &rpc_metrics(), remote_method(), outbound_call, deadline);
62
5.59M
  return inbound_call_;
63
5.59M
}
64
65
3.60M
Result<const uint8_t*const*> LocalOutboundCall::GetSidecarPtr(size_t idx) const {
66
3.60M
  if (idx < 0 || idx >= inbound_call_->sidecars_.size()) {
67
0
    return STATUS_FORMAT(InvalidArgument, "Index $0 does not reference a valid sidecar", idx);
68
0
  }
69
3.60M
  return inbound_call_->sidecar_pointers_[idx].data();
70
3.60M
}
71
72
1.54M
Result<SidecarHolder> LocalOutboundCall::GetSidecarHolder(size_t idx) const {
73
1.54M
  if (idx >= inbound_call_->sidecars_.size()) {
74
0
    return STATUS_FORMAT(InvalidArgument, "Index $0 does not reference a valid sidecar", idx);
75
0
  }
76
1.54M
  return SidecarHolder(inbound_call_->sidecars_[idx], inbound_call_->sidecars_[idx].AsSlice());
77
1.54M
}
78
79
LocalYBInboundCall::LocalYBInboundCall(
80
    RpcMetrics* rpc_metrics,
81
    const RemoteMethod& remote_method,
82
    std::weak_ptr<LocalOutboundCall> outbound_call,
83
    CoarseTimePoint deadline)
84
    : YBInboundCall(rpc_metrics, remote_method), outbound_call_(outbound_call),
85
5.59M
      deadline_(deadline) {
86
5.59M
}
87
88
3.75M
const Endpoint& LocalYBInboundCall::remote_address() const {
89
3.75M
  static const Endpoint endpoint;
90
3.75M
  return endpoint;
91
3.75M
}
92
93
0
const Endpoint& LocalYBInboundCall::local_address() const {
94
0
  static const Endpoint endpoint;
95
0
  return endpoint;
96
0
}
97
98
5.59M
void LocalYBInboundCall::Respond(AnyMessageConstPtr resp, bool is_success) {
99
5.59M
  auto call = outbound_call();
100
5.59M
  if (!call) {
101
0
    LOG(DFATAL) << "Outbound call is NULL during Respond, looks like double response. "
102
0
                << "is_success: " << is_success;
103
0
    return;
104
0
  }
105
106
5.59M
  if (is_success) {
107
5.58M
    call->SetFinished();
108
2.98k
  } else {
109
2.98k
    std::unique_ptr<ErrorStatusPB> error;
110
2.98k
    if (resp.is_lightweight()) {
111
0
      error = std::make_unique<ErrorStatusPB>();
112
0
      yb::down_cast<const LWErrorStatusPB&>(*resp.lightweight()).ToGoogleProtobuf(error.get());
113
2.98k
    } else {
114
2.98k
      error = std::make_unique<ErrorStatusPB>(
115
2.98k
          yb::down_cast<const ErrorStatusPB&>(*resp.protobuf()));
116
2.98k
    }
117
2.98k
    auto status = STATUS(RemoteError, "Local call error", error->message());
118
2.98k
    call->SetFailed(std::move(status), std::move(error));
119
2.98k
  }
120
5.59M
}
121
122
0
Status LocalYBInboundCall::ParseParam(RpcCallParams* params) {
123
0
  LOG(FATAL) << "local call should not require parsing";
124
0
}
125
126
0
Result<size_t> LocalYBInboundCall::ParseRequest(Slice param) {
127
0
  return STATUS(InternalError, "ParseRequest called for local call");
128
0
}
129
130
5.59M
AnyMessageConstPtr LocalYBInboundCall::SerializableResponse() {
131
5.59M
  return outbound_call()->response();
132
5.59M
}
133
134
} // namespace rpc
135
} // namespace yb