YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc_context.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/rpc_context.h"
34
35
#include <sstream>
36
37
#include <boost/core/null_deleter.hpp>
38
39
#include "yb/rpc/connection.h"
40
#include "yb/rpc/inbound_call.h"
41
#include "yb/rpc/lightweight_message.h"
42
#include "yb/rpc/local_call.h"
43
#include "yb/rpc/outbound_call.h"
44
#include "yb/rpc/reactor.h"
45
#include "yb/rpc/service_if.h"
46
#include "yb/rpc/yb_rpc.h"
47
48
#include "yb/util/debug/trace_event.h"
49
#include "yb/util/format.h"
50
#include "yb/util/jsonwriter.h"
51
#include "yb/util/pb_util.h"
52
#include "yb/util/status_format.h"
53
#include "yb/util/trace.h"
54
55
using google::protobuf::Message;
56
57
namespace yb {
58
namespace rpc {
59
60
using std::shared_ptr;
61
62
namespace {
63
64
// Wrapper for a protobuf message which lazily converts to JSON when
65
// the trace buffer is dumped. This pushes the work of stringification
66
// to the trace dumping process.
67
class PbTracer : public debug::ConvertableToTraceFormat {
68
 public:
69
  enum {
70
    kMaxFieldLengthToTrace = 100
71
  };
72
73
0
  explicit PbTracer(const Message& msg) : msg_(msg.New()) {
74
0
    msg_->CopyFrom(msg);
75
0
  }
76
77
0
  void AppendAsTraceFormat(std::string* out) const override {
78
0
    pb_util::TruncateFields(msg_.get(), kMaxFieldLengthToTrace);
79
0
    std::stringstream ss;
80
0
    JsonWriter jw(&ss, JsonWriter::COMPACT);
81
0
    jw.Protobuf(*msg_);
82
0
    out->append(ss.str());
83
0
  }
84
 private:
85
  const std::unique_ptr<Message> msg_;
86
};
87
88
0
scoped_refptr<debug::ConvertableToTraceFormat> TracePb(const Message& msg) {
89
0
  return make_scoped_refptr(new PbTracer(msg));
90
0
}
91
92
}  // anonymous namespace
93
94
70.3M
Result<size_t> RpcCallPBParams::ParseRequest(Slice param) {
95
70.3M
  google::protobuf::io::CodedInputStream in(param.data(), narrow_cast<int>(param.size()));
96
70.3M
  SetupLimit(&in);
97
70.3M
  auto& message = request();
98
70.3M
  if (PREDICT_FALSE(!message.ParseFromCodedStream(&in))) {
99
1
    return STATUS(InvalidArgument, message.InitializationErrorString());
100
1
  }
101
70.3M
  return message.SpaceUsedLong();
102
70.3M
}
103
104
70.3M
AnyMessageConstPtr RpcCallPBParams::SerializableResponse() {
105
70.3M
  return AnyMessageConstPtr(&response());
106
70.3M
}
107
108
10.8M
google::protobuf::Message* RpcCallPBParams::CastMessage(const AnyMessagePtr& msg) {
109
10.8M
  return msg.protobuf();
110
10.8M
}
111
112
10.8M
const google::protobuf::Message* RpcCallPBParams::CastMessage(const AnyMessageConstPtr& msg) {
113
10.8M
  return msg.protobuf();
114
10.8M
}
115
116
1
Result<size_t> RpcCallLWParams::ParseRequest(Slice param) {
117
1
  RETURN_NOT_OK(request().ParseFromSlice(param));
118
1
  return 0;
119
1
}
120
121
1
AnyMessageConstPtr RpcCallLWParams::SerializableResponse() {
122
1
  return AnyMessageConstPtr(&response());
123
1
}
124
125
0
LightweightMessage* RpcCallLWParams::CastMessage(const AnyMessagePtr& msg) {
126
0
  return msg.lightweight();
127
0
}
128
129
0
const LightweightMessage* RpcCallLWParams::CastMessage(const AnyMessageConstPtr& msg) {
130
0
  return msg.lightweight();
131
0
}
132
133
303M
RpcContext::~RpcContext() {
134
303M
  if (call_ && 
!responded_81.0M
) {
135
0
    LOG(DFATAL) << "RpcContext is destroyed, but response has not been sent, for call: "
136
0
                << call_->ToString();
137
0
  }
138
303M
}
139
140
RpcContext::RpcContext(std::shared_ptr<YBInboundCall> call,
141
                       std::shared_ptr<RpcCallParams> params)
142
    : call_(std::move(call)),
143
70.3M
      params_(std::move(params)) {
144
70.3M
  const Status s = call_->ParseParam(params_.get());
145
70.3M
  if (PREDICT_FALSE(!s.ok())) {
146
1
    RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_REQUEST, s);
147
1
    return;
148
1
  }
149
70.3M
  TRACE_EVENT_ASYNC_BEGIN1("rpc_call", "RPC", this, "call", call_->ToString());
150
70.3M
}
151
152
RpcContext::RpcContext(std::shared_ptr<LocalYBInboundCall> call)
153
10.8M
    : call_(call), params_(call.get(), boost::null_deleter()) {
154
10.8M
  TRACE_EVENT_ASYNC_BEGIN1("rpc_call", "RPC", this, "call", call_->ToString());
155
10.8M
}
156
157
81.1M
void RpcContext::RespondSuccess() {
158
81.1M
  call_->RecordHandlingCompleted();
159
81.1M
  TRACE_EVENT_ASYNC_END1("rpc_call", "RPC", this,
160
81.1M
                         "trace", trace()->DumpToString(true));
161
81.1M
  call_->RespondSuccess(params_->SerializableResponse());
162
81.1M
  responded_ = true;
163
81.1M
}
164
165
4
void RpcContext::RespondFailure(const Status &status) {
166
4
  call_->RecordHandlingCompleted();
167
4
  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
168
4
                         "status", status.ToString(),
169
4
                         "trace", trace()->DumpToString(true));
170
4
  call_->RespondFailure(ErrorStatusPB::ERROR_APPLICATION, status);
171
4
  responded_ = true;
172
4
}
173
174
152
void RpcContext::RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const Status& status) {
175
152
  call_->RecordHandlingCompleted();
176
152
  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
177
152
                         "status", status.ToString(),
178
152
                         "trace", trace()->DumpToString(true));
179
152
  call_->RespondFailure(err, status);
180
152
  responded_ = true;
181
152
}
182
183
void RpcContext::RespondApplicationError(int error_ext_id, const std::string& message,
184
2.03k
                                         const Message& app_error_pb) {
185
2.03k
  call_->RecordHandlingCompleted();
186
2.03k
  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
187
2.03k
                         "response", TracePb(app_error_pb),
188
2.03k
                         "trace", trace()->DumpToString(true));
189
2.03k
  call_->RespondApplicationError(error_ext_id, message, app_error_pb);
190
2.03k
  responded_ = true;
191
2.03k
}
192
193
28.4M
size_t RpcContext::AddRpcSidecar(const Slice& car) {
194
28.4M
  return call_->AddRpcSidecar(car);
195
28.4M
}
196
197
9.43M
void RpcContext::ResetRpcSidecars() {
198
9.43M
  call_->ResetRpcSidecars();
199
9.43M
}
200
201
633k
void RpcContext::ReserveSidecarSpace(size_t space) {
202
633k
  call_->ReserveSidecarSpace(space);
203
633k
}
204
205
9.53M
const Endpoint& RpcContext::remote_address() const {
206
9.53M
  return call_->remote_address();
207
9.53M
}
208
209
0
const Endpoint& RpcContext::local_address() const {
210
0
  return call_->local_address();
211
0
}
212
213
27.5M
std::string RpcContext::requestor_string() const {
214
27.5M
  return yb::ToString(call_->remote_address());
215
27.5M
}
216
217
53.2M
CoarseTimePoint RpcContext::GetClientDeadline() const {
218
53.2M
  return call_->GetClientDeadline();
219
53.2M
}
220
221
0
MonoTime RpcContext::ReceiveTime() const {
222
0
  return call_->ReceiveTime();
223
0
}
224
225
1
Trace* RpcContext::trace() {
226
1
  return call_->trace();
227
1
}
228
229
1
void RpcContext::Panic(const char* filepath, int line_number, const string& message) {
230
  // Use the LogMessage class directly so that the log messages appear to come from
231
  // the line of code which caused the panic, not this code.
232
8
#define MY_ERROR google::LogMessage(filepath, line_number, google::GLOG_ERROR).stream()
233
2
#define MY_FATAL google::LogMessageFatal(filepath, line_number).stream()
234
235
1
  MY_ERROR << "Panic handling " << call_->ToString() << ": " << message;
236
1
  Trace* t = trace();
237
1
  if (t) {
238
1
    MY_ERROR << "RPC trace:";
239
1
    t->Dump(&MY_ERROR, true);
240
1
  }
241
1
  MY_FATAL << "Exiting due to panic.";
242
243
1
#undef MY_ERROR
244
1
#undef MY_FATAL
245
1
}
246
247
5.19k
void RpcContext::CloseConnection() {
248
5.19k
  auto connection = call_->connection();
249
5.30k
  connection->reactor()->ScheduleReactorFunctor([connection](Reactor*) {
250
5.30k
    connection->Close();
251
5.30k
  }, 
SOURCE_LOCATION5.19k
());
252
5.19k
}
253
254
0
std::string RpcContext::ToString() const {
255
0
  return call_->ToString();
256
0
}
257
258
1
void PanicRpc(RpcContext* context, const char* file, int line_number, const std::string& message) {
259
1
  if (context) {
260
1
    context->Panic(file, line_number, message);
261
1
  } else {
262
0
    google::LogMessage(file, line_number, google::GLOG_FATAL).stream() << message;
263
0
  }
264
1
}
265
266
}  // namespace rpc
267
}  // namespace yb