YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/forward_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/forward_rpc.h"
15
16
#include "yb/client/client.h"
17
18
#include "yb/common/wire_protocol.h"
19
20
#include "yb/tserver/tserver_service.proxy.h"
21
22
#include "yb/util/cast.h"
23
#include "yb/util/result.h"
24
#include "yb/util/status_log.h"
25
#include "yb/util/trace.h"
26
27
using namespace std::placeholders;
28
29
DECLARE_bool(rpc_dump_all_traces);
30
DECLARE_bool(collect_end_to_end_traces);
31
32
namespace yb {
33
34
using std::shared_ptr;
35
using std::string;
36
using rpc::Rpc;
37
using tserver::WriteRequestPB;
38
using tserver::WriteResponsePB;
39
using tserver::ReadRequestPB;
40
using tserver::ReadResponsePB;
41
using tserver::TabletServerErrorPB;
42
43
namespace client {
44
namespace internal {
45
46
0
static CoarseTimePoint ComputeDeadline() {
47
  // TODO(Sudheer) : Make sure we pass the deadline from the PGGate layer and use that here.
48
0
  MonoDelta timeout = MonoDelta::FromSeconds(60);
49
0
  return CoarseMonoClock::now() + timeout;
50
0
}
51
52
template <class Req, class Resp>
53
ForwardRpc<Req, Resp>::ForwardRpc(const Req *req, Resp *res,
54
                                  rpc::RpcContext&& context,
55
                                  YBConsistencyLevel consistency_level,
56
                                  YBClient *client)
57
  : Rpc(ComputeDeadline(), client->messenger(), &client->proxy_cache()),
58
    req_(req),
59
    res_(res),
60
    context_(std::move(context)),
61
    trace_(new Trace),
62
    start_(MonoTime::Now()),
63
    tablet_invoker_(false /* local_tserver_only */,
64
                    consistency_level,
65
                    client,
66
                    this,
67
                    this,
68
                    nullptr /* tablet */,
69
                    nullptr /* table */,
70
                    mutable_retrier(),
71
0
                    trace_.get()) {
72
0
}
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::ForwardRpc(yb::tserver::WriteRequestPB const*, yb::tserver::WriteResponsePB*, yb::rpc::RpcContext&&, yb::YBConsistencyLevel, yb::client::YBClient*)
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::ForwardRpc(yb::tserver::ReadRequestPB const*, yb::tserver::ReadResponsePB*, yb::rpc::RpcContext&&, yb::YBConsistencyLevel, yb::client::YBClient*)
73
74
template <class Req, class Resp>
75
0
ForwardRpc<Req, Resp>::~ForwardRpc() {
76
0
  if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
77
0
    LOG(INFO) << ToString() << " took "
78
0
              << MonoTime::Now().GetDeltaSince(start_).ToMicroseconds()
79
0
              << "us. Trace:";
80
0
    trace_->Dump(&LOG(INFO), true);
81
0
  }
82
0
}
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::~ForwardRpc()
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::~ForwardRpc()
83
84
template <class Req, class Resp>
85
0
string ForwardRpc<Req, Resp>::ToString() const {
86
0
  return Format("$0(tablet: $1, num_attempts: $2)",
87
0
                read_only() ? "Read" : "Write",
88
0
                req_->tablet_id(),
89
0
                num_attempts());
90
0
}
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::ToString() const
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::ToString() const
91
92
template <class Req, class Resp>
93
0
void ForwardRpc<Req, Resp>::SendRpc() {
94
0
  TRACE_TO(trace_, "SendRpc() called.");
95
0
  retained_self_ = shared_from_this();
96
0
  tablet_invoker_.Execute(req_->tablet_id(), num_attempts() > 1);
97
0
}
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::SendRpc()
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::SendRpc()
98
99
template <class Req, class Resp>
100
0
void ForwardRpc<Req, Resp>::Finished(const Status& status) {
101
0
  Status new_status = status;
102
0
  if (tablet_invoker_.Done(&new_status)) {
103
0
    if (new_status.ok()) {
104
0
      PopulateResponse();
105
0
    }
106
0
    context_.RespondSuccess();
107
0
    retained_self_.reset();
108
0
  }
109
0
}
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::Finished(yb::Status const&)
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::Finished(yb::Status const&)
110
111
template <class Req, class Resp>
112
0
void ForwardRpc<Req, Resp>::Failed(const Status& status) {
113
0
  TabletServerErrorPB *err = res_->mutable_error();
114
0
  StatusToPB(status, err->mutable_status());
115
0
}
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::Failed(yb::Status const&)
Unexecuted instantiation: yb::client::internal::ForwardRpc<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::Failed(yb::Status const&)
116
117
ForwardWriteRpc::ForwardWriteRpc(const WriteRequestPB *req,
118
                                 WriteResponsePB *res,
119
                                 rpc::RpcContext&& context,
120
                                 YBClient *client) :
121
0
  ForwardRpc(req, res, std::move(context), YBConsistencyLevel::STRONG, client) {
122
123
  // Ensure that only PGSQL operations are forwarded.
124
0
  DCHECK(!req->redis_write_batch_size() && !req->ql_write_batch_size());
125
0
}
126
127
0
ForwardWriteRpc::~ForwardWriteRpc() {
128
0
}
129
130
0
void ForwardWriteRpc::SendRpcToTserver(int attempt_num) {
131
0
  auto trace = trace_;
132
0
  TRACE_TO(trace, "SendRpcToTserver");
133
0
  ADOPT_TRACE(trace.get());
134
135
0
  tablet_invoker_.proxy()->WriteAsync(
136
0
      *req_, res_, PrepareController(),
137
0
      std::bind(&ForwardWriteRpc::Finished, this, Status::OK()));
138
0
  TRACE_TO(trace, "RpcDispatched Asynchronously");
139
0
}
140
141
0
void ForwardWriteRpc::PopulateResponse() {
142
0
  for (const auto& r : res_->pgsql_response_batch()) {
143
0
    if (r.has_rows_data_sidecar()) {
144
0
      Slice s = CHECK_RESULT(retrier().controller().GetSidecar(r.rows_data_sidecar()));
145
0
      context_.AddRpcSidecar(s);
146
0
    }
147
0
  }
148
0
}
149
150
ForwardReadRpc::ForwardReadRpc(const ReadRequestPB *req,
151
                               ReadResponsePB *res,
152
                               rpc::RpcContext&& context,
153
                               YBClient *client) :
154
0
  ForwardRpc(req, res, std::move(context), req->consistency_level(), client) {
155
156
  // Ensure that only PGSQL operations are forwarded.
157
0
  DCHECK(!req->redis_batch_size() && !req->ql_batch_size());
158
0
}
159
160
161
0
ForwardReadRpc::~ForwardReadRpc() {
162
0
}
163
164
0
void ForwardReadRpc::SendRpcToTserver(int attempt_num) {
165
0
  auto trace = trace_;
166
0
  TRACE_TO(trace, "SendRpcToTserver");
167
0
  ADOPT_TRACE(trace.get());
168
169
0
  tablet_invoker_.proxy()->ReadAsync(
170
0
      *req_, res_, PrepareController(),
171
0
      std::bind(&ForwardReadRpc::Finished, this, Status::OK()));
172
0
  TRACE_TO(trace, "RpcDispatched Asynchronously");
173
0
}
174
175
0
void ForwardReadRpc::PopulateResponse() {
176
0
  for (const auto& r : res_->pgsql_batch()) {
177
0
    if (r.has_rows_data_sidecar()) {
178
0
      Slice s = CHECK_RESULT(retrier().controller().GetSidecar(r.rows_data_sidecar()));
179
0
      context_.AddRpcSidecar(s);
180
0
    }
181
0
  }
182
0
}
183
184
}  // namespace internal
185
}  // namespace client
186
}  // namespace yb