YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/async_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/async_rpc.h"
15
16
#include "yb/client/batcher.h"
17
#include "yb/client/client_error.h"
18
#include "yb/client/in_flight_op.h"
19
#include "yb/client/meta_cache.h"
20
#include "yb/client/table.h"
21
#include "yb/client/yb_op.h"
22
#include "yb/client/yb_table_name.h"
23
24
#include "yb/common/pgsql_error.h"
25
#include "yb/common/schema.h"
26
#include "yb/common/transaction.h"
27
#include "yb/common/transaction_error.h"
28
#include "yb/common/wire_protocol.h"
29
30
#include "yb/gutil/casts.h"
31
#include "yb/gutil/strings/substitute.h"
32
33
#include "yb/rpc/rpc_controller.h"
34
35
#include "yb/util/cast.h"
36
#include "yb/util/flag_tags.h"
37
#include "yb/util/logging.h"
38
#include "yb/util/metrics.h"
39
#include "yb/util/result.h"
40
#include "yb/util/status_log.h"
41
#include "yb/util/trace.h"
42
#include "yb/util/yb_pg_errcodes.h"
43
44
// TODO: do we need word Redis in following two metrics? ReadRpc and WriteRpc objects emitting
45
// these metrics are used not only in Redis service.
46
METRIC_DEFINE_coarse_histogram(
47
    server, handler_latency_yb_client_write_remote, "yb.client.Write remote call time",
48
    yb::MetricUnit::kMicroseconds, "Microseconds spent in the remote Write call ");
49
METRIC_DEFINE_coarse_histogram(
50
    server, handler_latency_yb_client_read_remote, "yb.client.Read remote call time",
51
    yb::MetricUnit::kMicroseconds, "Microseconds spent in the remote Read call ");
52
METRIC_DEFINE_coarse_histogram(
53
    server, handler_latency_yb_client_write_local, "yb.client.Write local call time",
54
    yb::MetricUnit::kMicroseconds, "Microseconds spent in the local Write call ");
55
METRIC_DEFINE_coarse_histogram(
56
    server, handler_latency_yb_client_read_local, "yb.client.Read local call time",
57
    yb::MetricUnit::kMicroseconds, "Microseconds spent in the local Read call ");
58
METRIC_DEFINE_coarse_histogram(
59
    server, handler_latency_yb_client_time_to_send,
60
    "Time taken for a Write/Read rpc to be sent to the server", yb::MetricUnit::kMicroseconds,
61
    "Microseconds spent before sending the request to the server");
62
63
METRIC_DEFINE_counter(server, consistent_prefix_successful_reads,
64
    "Number of consistent prefix reads that were served by the closest replica.",
65
    yb::MetricUnit::kRequests,
66
    "Number of consistent prefix reads that were served by the closest replica.");
67
68
METRIC_DEFINE_counter(server, consistent_prefix_failed_reads,
69
    "Number of consistent prefix reads that failed to be served by the closest replica.",
70
    yb::MetricUnit::kRequests,
71
    "Number of consistent prefix reads that failed to be served by the closest replica.");
72
73
DEFINE_int32(ybclient_print_trace_every_n, 0,
74
             "Controls the rate at which traces from ybclient are printed. Setting this to 0 "
75
             "disables printing the collected traces.");
76
TAG_FLAG(ybclient_print_trace_every_n, advanced);
77
TAG_FLAG(ybclient_print_trace_every_n, runtime);
78
79
DEFINE_bool(forward_redis_requests, true, "If false, the redis op will not be served if it's not "
80
            "a local request. The op response will be set to the redis error "
81
            "'-MOVED partition_key 0.0.0.0:0'. This works with jedis which only looks at the MOVED "
82
            "part of the reply and ignores the rest. For now, if this flag is true, we will only "
83
            "attempt to read from leaders, so redis_allow_reads_from_followers will be ignored.");
84
85
DEFINE_bool(detect_duplicates_for_retryable_requests, true,
86
            "Enable tracking of write requests that prevents the same write from being applied "
87
                "twice.");
88
89
DEFINE_bool(ysql_forward_rpcs_to_local_tserver, false,
90
            "When true, forward the PGSQL rpcs to the local tServer.");
91
92
DEFINE_CAPABILITY(PickReadTimeAtTabletServer, 0x8284d67b);
93
94
DECLARE_bool(collect_end_to_end_traces);
95
96
using namespace std::placeholders;
97
98
namespace yb {
99
100
using std::shared_ptr;
101
using rpc::ErrorStatusPB;
102
using rpc::Messenger;
103
using rpc::Rpc;
104
using rpc::RpcController;
105
using tserver::WriteRequestPB;
106
using tserver::WriteResponsePB;
107
using tserver::WriteResponsePB_PerRowErrorPB;
108
using strings::Substitute;
109
110
namespace client {
111
112
namespace internal {
113
114
12.0M
bool IsTracingEnabled() {
115
12.0M
  return FLAGS_collect_end_to_end_traces;
116
12.0M
}
117
118
namespace {
119
120
12.0M
bool LocalTabletServerOnly(const InFlightOps& ops) {
121
12.0M
  const auto op_type = ops.front().yb_op->type();
122
12.0M
  return ((op_type == YBOperation::Type::REDIS_READ || 
op_type == YBOperation::Type::REDIS_WRITE11.9M
) &&
123
12.0M
          
!FLAGS_forward_redis_requests208k
);
124
12.0M
}
125
126
}
127
128
AsyncRpcMetrics::AsyncRpcMetrics(const scoped_refptr<yb::MetricEntity>& entity)
129
    : remote_write_rpc_time(METRIC_handler_latency_yb_client_write_remote.Instantiate(entity)),
130
      remote_read_rpc_time(METRIC_handler_latency_yb_client_read_remote.Instantiate(entity)),
131
      local_write_rpc_time(METRIC_handler_latency_yb_client_write_local.Instantiate(entity)),
132
      local_read_rpc_time(METRIC_handler_latency_yb_client_read_local.Instantiate(entity)),
133
      time_to_send(METRIC_handler_latency_yb_client_time_to_send.Instantiate(entity)),
134
      consistent_prefix_successful_reads(
135
          METRIC_consistent_prefix_successful_reads.Instantiate(entity)),
136
129k
      consistent_prefix_failed_reads(METRIC_consistent_prefix_failed_reads.Instantiate(entity)) {
137
129k
}
138
139
AsyncRpc::AsyncRpc(
140
    const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level)
141
    : Rpc(data.batcher->deadline(), data.batcher->messenger(), &data.batcher->proxy_cache()),
142
      batcher_(data.batcher),
143
      ops_(data.ops),
144
      tablet_invoker_(LocalTabletServerOnly(ops_),
145
                      yb_consistency_level == YBConsistencyLevel::CONSISTENT_PREFIX,
146
                      data.batcher->client_,
147
                      this,
148
                      this,
149
                      data.tablet,
150
                      table(),
151
                      mutable_retrier(),
152
                      trace_.get()),
153
      start_(CoarseMonoClock::Now()),
154
12.0M
      async_rpc_metrics_(data.batcher->async_rpc_metrics()) {
155
12.0M
  mutable_retrier()->mutable_controller()->set_allow_local_calls_in_curr_thread(
156
12.0M
      data.allow_local_calls_in_curr_thread);
157
12.0M
}
158
159
12.0M
AsyncRpc::~AsyncRpc() {
160
12.0M
  if (trace_->must_print()) {
161
0
    LOG(INFO) << ToString() << " took " << ToMicroseconds(CoarseMonoClock::Now() - start_)
162
0
              << "us. Trace:\n" << trace_->DumpToString(true);
163
12.0M
  } else {
164
12.0M
    const auto print_trace_every_n = GetAtomicFlag(&FLAGS_ybclient_print_trace_every_n);
165
12.0M
    if (print_trace_every_n > 0) {
166
0
      YB_LOG_EVERY_N(INFO, print_trace_every_n)
167
0
          << ToString() << " took "
168
0
          << ToMicroseconds(CoarseMonoClock::Now() - start_)
169
0
          << "us. Trace:\n" << trace_->DumpToString(true);
170
0
    }
171
12.0M
  }
172
12.0M
}
173
174
12.1M
void AsyncRpc::SendRpc() {
175
12.1M
  TRACE_TO(trace_, "SendRpc() called.");
176
177
12.1M
  retained_self_ = shared_from_this();
178
  // For now, if this is a retry, execute this rpc on the leader even if
179
  // the consistency level is YBConsistencyLevel::CONSISTENT_PREFIX or
180
  // FLAGS_redis_allow_reads_from_followers is set to true.
181
  // TODO(hector): Temporarily blacklist the follower that couldn't serve the read so we can retry
182
  // on another follower.
183
12.1M
  if (async_rpc_metrics_ && 
num_attempts() > 112.0M
&&
tablet_invoker_.is_consistent_prefix()34.9k
) {
184
4.40k
    IncrementCounter(async_rpc_metrics_->consistent_prefix_failed_reads);
185
4.40k
  }
186
12.1M
  tablet_invoker_.Execute(std::string(), num_attempts() > 1);
187
12.1M
}
188
189
183k
std::string AsyncRpc::ToString() const {
190
183k
  const auto& transaction = batcher_->in_flight_ops().metadata.transaction;
191
183k
  const auto subtransaction_opt = batcher_->in_flight_ops().metadata.subtransaction;
192
183k
  return Format("$0(tablet: $1, num_ops: $2, num_attempts: $3, txn: $4, subtxn: $5)",
193
183k
                ops_.front().yb_op->read_only() ? 
"Read"92.7k
:
"Write"90.9k
,
194
183k
                tablet().tablet_id(), ops_.size(), num_attempts(),
195
183k
                transaction.transaction_id,
196
183k
                subtransaction_opt
197
183k
                    ? 
Format("$0", subtransaction_opt->subtransaction_id)12.0k
198
183k
                    : 
"[none]"171k
);
199
183k
}
200
201
36.6M
std::shared_ptr<const YBTable> AsyncRpc::table() const {
202
  // All of the ops for a given tablet obviously correspond to the same table,
203
  // so we'll just grab the table from the first.
204
36.6M
  return ops_[0].yb_op->table();
205
36.6M
}
206
207
12.0M
void AsyncRpc::Finished(const Status& status) {
208
12.0M
  Status new_status = status;
209
12.0M
  if (tablet_invoker_.Done(&new_status)) {
210
12.0M
    if (tablet().is_split() ||
211
12.0M
        ClientError(new_status) == ClientErrorCode::kTablePartitionListIsStale) {
212
46
      ops_[0].yb_op->MarkTablePartitionListAsStale();
213
46
    }
214
12.0M
    if (async_rpc_metrics_ && 
status.ok()12.0M
&&
tablet_invoker_.is_consistent_prefix()12.0M
) {
215
22.0k
      IncrementCounter(async_rpc_metrics_->consistent_prefix_successful_reads);
216
22.0k
    }
217
12.0M
    ProcessResponseFromTserver(new_status);
218
12.0M
    batcher_->Flushed(ops_, new_status, MakeFlushExtraResult());
219
12.0M
    retained_self_.reset();
220
12.0M
  }
221
12.0M
}
222
223
185k
void AsyncRpc::Failed(const Status& status) {
224
185k
  std::string error_message = status.message().ToBuffer();
225
185k
  auto redis_error_code = status.IsInvalidCommand() || 
status.IsInvalidArgument()185k
?
226
185k
      
RedisResponsePB_RedisStatusCode_PARSING_ERROR6
: RedisResponsePB_RedisStatusCode_SERVER_ERROR;
227
996k
  for (auto& op : ops_) {
228
996k
    YBOperation* yb_op = op.yb_op.get();
229
996k
    switch (yb_op->type()) {
230
201
      case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED;
231
215
      case YBOperation::Type::REDIS_WRITE: {
232
215
        RedisResponsePB* resp;
233
215
        if (yb_op->type() == YBOperation::Type::REDIS_READ) {
234
201
          resp = down_cast<YBRedisReadOp*>(yb_op)->mutable_response();
235
201
        } else {
236
14
          resp = down_cast<YBRedisWriteOp*>(yb_op)->mutable_response();
237
14
        }
238
215
        resp->Clear();
239
        // If the tserver replied it is not the leader, respond that the key has moved. We do not
240
        // need to return the address of the new leader because the Redis client will refresh the
241
        // cluster map instead.
242
215
        if (status.IsIllegalState()) {
243
9
          resp->set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR);
244
9
          resp->set_error_message(Substitute("MOVED $0 0.0.0.0:0",
245
9
                                             down_cast<YBRedisOp*>(yb_op)->hash_code()));
246
206
        } else {
247
206
          resp->set_code(redis_error_code);
248
206
          resp->set_error_message(error_message);
249
206
        }
250
215
        break;
251
201
      }
252
149
      case YBOperation::Type::QL_READ: FALLTHROUGH_INTENDED;
253
60.3k
      case YBOperation::Type::QL_WRITE: {
254
60.3k
        QLResponsePB* resp = down_cast<YBqlOp*>(yb_op)->mutable_response();
255
60.3k
        resp->Clear();
256
60.3k
        resp->set_status(status.IsTryAgain() ? 
QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR50.1k
257
60.3k
                                             : 
QLResponsePB::YQL_STATUS_RUNTIME_ERROR10.2k
);
258
60.3k
        resp->set_error_message(error_message);
259
60.3k
        break;
260
149
      }
261
886k
      case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED;
262
935k
      case YBOperation::Type::PGSQL_WRITE: {
263
935k
        PgsqlResponsePB* resp = down_cast<YBPgsqlOp*>(yb_op)->mutable_response();
264
935k
        resp->set_status(
status.IsTryAgain()935k
? PgsqlResponsePB::PGSQL_STATUS_RESTART_REQUIRED_ERROR
265
18.4E
                                             : PgsqlResponsePB::PGSQL_STATUS_RUNTIME_ERROR);
266
935k
        resp->set_error_message(error_message);
267
935k
        const uint8_t* pg_err_ptr = status.ErrorData(PgsqlErrorTag::kCategory);
268
935k
        if (pg_err_ptr != nullptr) {
269
335k
          resp->set_pg_error_code(static_cast<uint32_t>(PgsqlErrorTag::Decode(pg_err_ptr)));
270
600k
        } else {
271
600k
          resp->set_pg_error_code(static_cast<uint32_t>(YBPgErrorCode::YB_PG_INTERNAL_ERROR));
272
600k
        }
273
935k
        const uint8_t* txn_err_ptr = status.ErrorData(TransactionErrorTag::kCategory);
274
935k
        if (txn_err_ptr != nullptr) {
275
933k
          resp->set_txn_error_code(static_cast<uint16_t>(TransactionErrorTag::Decode(txn_err_ptr)));
276
933k
        } else {
277
1.76k
          resp->set_txn_error_code(static_cast<uint16_t>(TransactionErrorCode::kNone));
278
1.76k
        }
279
935k
        break;
280
886k
      }
281
0
      default:
282
0
        LOG(FATAL) << "Unsupported operation " << yb_op->type();
283
0
        break;
284
996k
    }
285
996k
  }
286
185k
}
287
288
11.7M
bool AsyncRpc::IsLocalCall() const {
289
11.7M
  return tablet_invoker_.IsLocalCall();
290
11.7M
}
291
292
namespace {
293
294
template<class T>
295
void SetMetadata(const InFlightOpsTransactionMetadata& metadata,
296
                 bool need_full_metadata,
297
1.59M
                 T* dest) {
298
1.59M
  auto* transaction = dest->mutable_transaction();
299
1.59M
  if (need_full_metadata) {
300
983k
    metadata.transaction.ToPB(transaction);
301
983k
  } else {
302
609k
    metadata.transaction.TransactionIdToPB(transaction);
303
609k
  }
304
1.59M
  dest->set_deprecated_may_have_metadata(true);
305
306
1.59M
  if (metadata.subtransaction && 
!metadata.subtransaction->IsDefaultState()53.1k
) {
307
52.8k
    metadata.subtransaction->ToPB(dest->mutable_subtransaction());
308
52.8k
  }
309
1.59M
}
async_rpc.cc:void yb::client::internal::(anonymous namespace)::SetMetadata<yb::docdb::KeyValueWriteBatchPB>(yb::client::internal::InFlightOpsTransactionMetadata const&, bool, yb::docdb::KeyValueWriteBatchPB*)
Line
Count
Source
297
691k
                 T* dest) {
298
691k
  auto* transaction = dest->mutable_transaction();
299
691k
  if (need_full_metadata) {
300
452k
    metadata.transaction.ToPB(transaction);
301
452k
  } else {
302
238k
    metadata.transaction.TransactionIdToPB(transaction);
303
238k
  }
304
691k
  dest->set_deprecated_may_have_metadata(true);
305
306
691k
  if (metadata.subtransaction && 
!metadata.subtransaction->IsDefaultState()12.8k
) {
307
12.8k
    metadata.subtransaction->ToPB(dest->mutable_subtransaction());
308
12.8k
  }
309
691k
}
async_rpc.cc:void yb::client::internal::(anonymous namespace)::SetMetadata<yb::tserver::ReadRequestPB>(yb::client::internal::InFlightOpsTransactionMetadata const&, bool, yb::tserver::ReadRequestPB*)
Line
Count
Source
297
901k
                 T* dest) {
298
901k
  auto* transaction = dest->mutable_transaction();
299
901k
  if (need_full_metadata) {
300
531k
    metadata.transaction.ToPB(transaction);
301
531k
  } else {
302
370k
    metadata.transaction.TransactionIdToPB(transaction);
303
370k
  }
304
901k
  dest->set_deprecated_may_have_metadata(true);
305
306
901k
  if (metadata.subtransaction && 
!metadata.subtransaction->IsDefaultState()40.2k
) {
307
39.9k
    metadata.subtransaction->ToPB(dest->mutable_subtransaction());
308
39.9k
  }
309
901k
}
310
311
void SetMetadata(const InFlightOpsTransactionMetadata& metadata,
312
                 bool need_full_metadata,
313
690k
                 tserver::WriteRequestPB* req) {
314
690k
  SetMetadata(metadata, need_full_metadata, req->mutable_write_batch());
315
690k
}
316
317
} // namespace
318
319
12.1M
void AsyncRpc::SendRpcToTserver(int attempt_num) {
320
12.1M
  if (async_rpc_metrics_) {
321
12.0M
    async_rpc_metrics_->time_to_send->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_));
322
12.0M
  }
323
12.1M
  CallRemoteMethod();
324
12.1M
}
325
326
template <class Req, class Resp>
327
AsyncRpcBase<Req, Resp>::AsyncRpcBase(
328
    const AsyncRpcData& data, YBConsistencyLevel consistency_level)
329
12.0M
    : AsyncRpc(data, consistency_level) {
330
12.0M
  req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id()));
331
12.0M
  req_.set_include_trace(IsTracingEnabled());
332
12.0M
  const ConsistentReadPoint* read_point = batcher_->read_point();
333
12.0M
  bool has_read_time = false;
334
12.0M
  if (read_point) {
335
11.8M
    req_.set_propagated_hybrid_time(read_point->Now().ToUint64());
336
    // Set read time for consistent read only if the table is transaction-enabled and
337
    // consistent read is required.
338
11.8M
    if (data.need_consistent_read &&
339
11.8M
        
table()->InternalSchema().table_properties().is_transactional()2.98M
) {
340
2.78M
      auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id());
341
2.78M
      if (read_time) {
342
2.21M
        has_read_time = true;
343
2.21M
        read_time.AddToPB(&req_);
344
2.21M
      }
345
2.78M
    }
346
11.8M
  }
347
12.0M
  if (
!ops_.empty()12.0M
) {
348
12.0M
    req_.set_batch_idx(ops_.front().batch_idx);
349
12.0M
  }
350
12.0M
  const auto& metadata = batcher_->in_flight_ops().metadata;
351
12.0M
  if (!metadata.transaction.transaction_id.IsNil()) {
352
1.59M
    SetMetadata(metadata, data.need_metadata, &req_);
353
1.59M
    bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION;
354
1.59M
    LOG_IF(DFATAL, has_read_time && serializable)
355
40
        << "Read time should NOT be specified for serializable isolation: "
356
40
        << read_point->GetReadTime().ToString();
357
1.59M
  }
358
12.0M
}
yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::AsyncRpcBase(yb::client::internal::AsyncRpcData const&, yb::YBConsistencyLevel)
Line
Count
Source
329
2.54M
    : AsyncRpc(data, consistency_level) {
330
2.54M
  req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id()));
331
2.54M
  req_.set_include_trace(IsTracingEnabled());
332
2.54M
  const ConsistentReadPoint* read_point = batcher_->read_point();
333
2.54M
  bool has_read_time = false;
334
2.54M
  if (read_point) {
335
2.39M
    req_.set_propagated_hybrid_time(read_point->Now().ToUint64());
336
    // Set read time for consistent read only if the table is transaction-enabled and
337
    // consistent read is required.
338
2.39M
    if (data.need_consistent_read &&
339
2.39M
        
table()->InternalSchema().table_properties().is_transactional()859k
) {
340
854k
      auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id());
341
854k
      if (read_time) {
342
668k
        has_read_time = true;
343
668k
        read_time.AddToPB(&req_);
344
668k
      }
345
854k
    }
346
2.39M
  }
347
2.54M
  if (!ops_.empty()) {
348
2.54M
    req_.set_batch_idx(ops_.front().batch_idx);
349
2.54M
  }
350
2.54M
  const auto& metadata = batcher_->in_flight_ops().metadata;
351
2.54M
  if (!metadata.transaction.transaction_id.IsNil()) {
352
690k
    SetMetadata(metadata, data.need_metadata, &req_);
353
690k
    bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION;
354
18.4E
    LOG_IF(DFATAL, has_read_time && serializable)
355
18.4E
        << "Read time should NOT be specified for serializable isolation: "
356
18.4E
        << read_point->GetReadTime().ToString();
357
690k
  }
358
2.54M
}
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::AsyncRpcBase(yb::client::internal::AsyncRpcData const&, yb::YBConsistencyLevel)
Line
Count
Source
329
9.54M
    : AsyncRpc(data, consistency_level) {
330
9.54M
  req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id()));
331
9.54M
  req_.set_include_trace(IsTracingEnabled());
332
9.54M
  const ConsistentReadPoint* read_point = batcher_->read_point();
333
9.54M
  bool has_read_time = false;
334
9.54M
  if (read_point) {
335
9.41M
    req_.set_propagated_hybrid_time(read_point->Now().ToUint64());
336
    // Set read time for consistent read only if the table is transaction-enabled and
337
    // consistent read is required.
338
9.41M
    if (data.need_consistent_read &&
339
9.41M
        
table()->InternalSchema().table_properties().is_transactional()2.12M
) {
340
1.93M
      auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id());
341
1.93M
      if (read_time) {
342
1.54M
        has_read_time = true;
343
1.54M
        read_time.AddToPB(&req_);
344
1.54M
      }
345
1.93M
    }
346
9.41M
  }
347
9.54M
  if (
!ops_.empty()9.54M
) {
348
9.54M
    req_.set_batch_idx(ops_.front().batch_idx);
349
9.54M
  }
350
9.54M
  const auto& metadata = batcher_->in_flight_ops().metadata;
351
9.54M
  if (!metadata.transaction.transaction_id.IsNil()) {
352
901k
    SetMetadata(metadata, data.need_metadata, &req_);
353
901k
    bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION;
354
901k
    LOG_IF(DFATAL, has_read_time && serializable)
355
72
        << "Read time should NOT be specified for serializable isolation: "
356
72
        << read_point->GetReadTime().ToString();
357
901k
  }
358
9.54M
}
359
360
template <class Req, class Resp>
361
12.0M
AsyncRpcBase<Req, Resp>::~AsyncRpcBase() {
362
12.0M
  req_.release_tablet_id();
363
12.0M
}
yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::~AsyncRpcBase()
Line
Count
Source
361
2.54M
AsyncRpcBase<Req, Resp>::~AsyncRpcBase() {
362
2.54M
  req_.release_tablet_id();
363
2.54M
}
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::~AsyncRpcBase()
Line
Count
Source
361
9.50M
AsyncRpcBase<Req, Resp>::~AsyncRpcBase() {
362
9.50M
  req_.release_tablet_id();
363
9.50M
}
364
365
template <class Req, class Resp>
366
12.0M
bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) {
367
12.0M
  if (!status.ok()) {
368
183k
    return false;
369
183k
  }
370
11.8M
  if (resp_.has_error()) {
371
0
    LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString()
372
0
                 << ". Requests not processed.";
373
    // If there is an error at the Rpc itself, there should be no individual responses.
374
    // All of them need to be marked as failed.
375
0
    Failed(StatusFromPB(resp_.error().status()));
376
0
    return false;
377
0
  }
378
11.8M
  auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_);
379
11.8M
  if (restart_read_time) {
380
1.87k
    auto read_point = batcher_->read_point();
381
1.87k
    if (read_point) {
382
1.87k
      read_point->RestartRequired(req_.tablet_id(), restart_read_time);
383
1.87k
    }
384
1.87k
    Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(),
385
1.87k
                  TransactionError(TransactionErrorCode::kReadRestartRequired)));
386
1.87k
    return false;
387
1.87k
  }
388
11.8M
  auto local_limit_ht = resp_.local_limit_ht();
389
11.8M
  if (local_limit_ht) {
390
1.54M
    auto read_point = batcher_->read_point();
391
1.54M
    if (read_point) {
392
1.54M
      read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht));
393
1.54M
    }
394
1.54M
  }
395
11.8M
  return true;
396
11.8M
}
yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::CommonResponseCheck(yb::Status const&)
Line
Count
Source
366
2.54M
bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) {
367
2.54M
  if (!status.ok()) {
368
90.7k
    return false;
369
90.7k
  }
370
2.45M
  if (resp_.has_error()) {
371
0
    LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString()
372
0
                 << ". Requests not processed.";
373
    // If there is an error at the Rpc itself, there should be no individual responses.
374
    // All of them need to be marked as failed.
375
0
    Failed(StatusFromPB(resp_.error().status()));
376
0
    return false;
377
0
  }
378
2.45M
  auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_);
379
2.45M
  if (restart_read_time) {
380
0
    auto read_point = batcher_->read_point();
381
0
    if (read_point) {
382
0
      read_point->RestartRequired(req_.tablet_id(), restart_read_time);
383
0
    }
384
0
    Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(),
385
0
                  TransactionError(TransactionErrorCode::kReadRestartRequired)));
386
0
    return false;
387
0
  }
388
2.45M
  auto local_limit_ht = resp_.local_limit_ht();
389
2.45M
  if (local_limit_ht) {
390
0
    auto read_point = batcher_->read_point();
391
0
    if (read_point) {
392
0
      read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht));
393
0
    }
394
0
  }
395
2.45M
  return true;
396
2.45M
}
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::CommonResponseCheck(yb::Status const&)
Line
Count
Source
366
9.50M
bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) {
367
9.50M
  if (!status.ok()) {
368
92.7k
    return false;
369
92.7k
  }
370
9.41M
  if (resp_.has_error()) {
371
0
    LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString()
372
0
                 << ". Requests not processed.";
373
    // If there is an error at the Rpc itself, there should be no individual responses.
374
    // All of them need to be marked as failed.
375
0
    Failed(StatusFromPB(resp_.error().status()));
376
0
    return false;
377
0
  }
378
9.41M
  auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_);
379
9.41M
  if (restart_read_time) {
380
1.87k
    auto read_point = batcher_->read_point();
381
1.87k
    if (read_point) {
382
1.87k
      read_point->RestartRequired(req_.tablet_id(), restart_read_time);
383
1.87k
    }
384
1.87k
    Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(),
385
1.87k
                  TransactionError(TransactionErrorCode::kReadRestartRequired)));
386
1.87k
    return false;
387
1.87k
  }
388
9.41M
  auto local_limit_ht = resp_.local_limit_ht();
389
9.41M
  if (local_limit_ht) {
390
1.54M
    auto read_point = batcher_->read_point();
391
1.54M
    if (read_point) {
392
1.54M
      read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht));
393
1.54M
    }
394
1.54M
  }
395
9.41M
  return true;
396
9.41M
}
397
398
template <class Req, class Resp>
399
12.1M
void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) {
400
12.1M
  if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) {
401
35.7k
    ConsistentReadPoint* read_point = batcher_->read_point();
402
35.7k
    if (read_point && !read_point->GetReadTime()) {
403
1
      auto txn = batcher_->transaction();
404
      // If txn is not set, this is a consistent scan across multiple tablets of a
405
      // non-transactional YCQL table.
406
1
      if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) {
407
1
        read_point->SetCurrentReadTime();
408
1
        read_point->GetReadTime().AddToPB(&req_);
409
1
      }
410
1
    }
411
35.7k
  }
412
413
12.1M
  req_.set_rejection_score(batcher_->RejectionScore(attempt_num));
414
12.1M
  AsyncRpc::SendRpcToTserver(attempt_num);
415
12.1M
}
yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::SendRpcToTserver(int)
Line
Count
Source
399
2.55M
void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) {
400
2.55M
  if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) {
401
13.5k
    ConsistentReadPoint* read_point = batcher_->read_point();
402
13.5k
    if (read_point && !read_point->GetReadTime()) {
403
1
      auto txn = batcher_->transaction();
404
      // If txn is not set, this is a consistent scan across multiple tablets of a
405
      // non-transactional YCQL table.
406
1
      if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) {
407
1
        read_point->SetCurrentReadTime();
408
1
        read_point->GetReadTime().AddToPB(&req_);
409
1
      }
410
1
    }
411
13.5k
  }
412
413
2.55M
  req_.set_rejection_score(batcher_->RejectionScore(attempt_num));
414
2.55M
  AsyncRpc::SendRpcToTserver(attempt_num);
415
2.55M
}
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::SendRpcToTserver(int)
Line
Count
Source
399
9.56M
void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) {
400
9.56M
  if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) {
401
22.2k
    ConsistentReadPoint* read_point = batcher_->read_point();
402
22.2k
    if (read_point && !read_point->GetReadTime()) {
403
0
      auto txn = batcher_->transaction();
404
      // If txn is not set, this is a consistent scan across multiple tablets of a
405
      // non-transactional YCQL table.
406
0
      if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) {
407
0
        read_point->SetCurrentReadTime();
408
0
        read_point->GetReadTime().AddToPB(&req_);
409
0
      }
410
0
    }
411
22.2k
  }
412
413
9.56M
  req_.set_rejection_score(batcher_->RejectionScore(attempt_num));
414
9.56M
  AsyncRpc::SendRpcToTserver(attempt_num);
415
9.56M
}
416
417
template <class Req, class Resp>
418
12.0M
void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) {
419
12.0M
  TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false));
420
12.0M
  if (resp_.has_trace_buffer()) {
421
0
    TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer());
422
0
  }
423
12.0M
  NotifyBatcher(status);
424
12.0M
  if (!CommonResponseCheck(status)) {
425
185k
    return;
426
185k
  }
427
11.8M
  SwapResponses();
428
11.8M
}
yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::ProcessResponseFromTserver(yb::Status const&)
Line
Count
Source
418
2.54M
void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) {
419
2.54M
  TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false));
420
2.54M
  if (resp_.has_trace_buffer()) {
421
0
    TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer());
422
0
  }
423
2.54M
  NotifyBatcher(status);
424
2.54M
  if (!CommonResponseCheck(status)) {
425
90.7k
    return;
426
90.7k
  }
427
2.45M
  SwapResponses();
428
2.45M
}
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::ProcessResponseFromTserver(yb::Status const&)
Line
Count
Source
418
9.52M
void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) {
419
9.52M
  TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false));
420
9.52M
  if (resp_.has_trace_buffer()) {
421
0
    TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer());
422
0
  }
423
9.52M
  NotifyBatcher(status);
424
9.52M
  if (!CommonResponseCheck(status)) {
425
94.6k
    return;
426
94.6k
  }
427
9.42M
  SwapResponses();
428
9.42M
}
429
430
431
template <class Req, class Resp>
432
12.0M
FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() {
433
12.0M
  return {GetPropagatedHybridTime(resp_),
434
12.0M
          resp_.has_used_read_time() ? 
ReadHybridTime::FromPB(resp_.used_read_time())8.04M
435
12.0M
                                     : 
ReadHybridTime()4.01M
};
436
12.0M
}
yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::MakeFlushExtraResult()
Line
Count
Source
432
2.54M
FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() {
433
2.54M
  return {GetPropagatedHybridTime(resp_),
434
2.54M
          resp_.has_used_read_time() ? 
ReadHybridTime::FromPB(resp_.used_read_time())175k
435
2.54M
                                     : 
ReadHybridTime()2.37M
};
436
2.54M
}
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::MakeFlushExtraResult()
Line
Count
Source
432
9.51M
FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() {
433
9.51M
  return {GetPropagatedHybridTime(resp_),
434
9.51M
          resp_.has_used_read_time() ? 
ReadHybridTime::FromPB(resp_.used_read_time())7.87M
435
9.51M
                                     : 
ReadHybridTime()1.64M
};
436
9.51M
}
437
438
template <class Op, class Req>
439
4.56M
void HandleExtraFields(Op* op, Req* req) {
440
4.56M
}
void yb::client::internal::HandleExtraFields<yb::client::YBRedisWriteOp, yb::tserver::WriteRequestPB>(yb::client::YBRedisWriteOp*, yb::tserver::WriteRequestPB*)
Line
Count
Source
439
123k
void HandleExtraFields(Op* op, Req* req) {
440
123k
}
void yb::client::internal::HandleExtraFields<yb::client::YBRedisReadOp, yb::tserver::ReadRequestPB>(yb::client::YBRedisReadOp*, yb::tserver::ReadRequestPB*)
Line
Count
Source
439
84.8k
void HandleExtraFields(Op* op, Req* req) {
440
84.8k
}
void yb::client::internal::HandleExtraFields<yb::client::YBPgsqlReadOp, yb::tserver::ReadRequestPB>(yb::client::YBPgsqlReadOp*, yb::tserver::ReadRequestPB*)
Line
Count
Source
439
4.36M
void HandleExtraFields(Op* op, Req* req) {
440
4.36M
}
441
442
3.99M
void HandleExtraFields(YBqlWriteOp* op, tserver::WriteRequestPB* req) {
443
3.99M
  if (op->write_time_for_backfill()) {
444
4.91k
    req->set_external_hybrid_time(op->write_time_for_backfill().ToUint64());
445
4.91k
    ReadHybridTime::SingleTime(op->write_time_for_backfill()).ToPB(req->mutable_read_time());
446
4.91k
  }
447
3.99M
}
448
449
7.21M
void HandleExtraFields(YBPgsqlWriteOp* op, tserver::WriteRequestPB* req) {
450
7.21M
  if (op->write_time()) {
451
1.26k
    req->set_external_hybrid_time(op->write_time().ToUint64());
452
1.26k
  }
453
7.21M
}
454
455
7.53M
void HandleExtraFields(YBqlReadOp* op, tserver::ReadRequestPB* req) {
456
7.53M
  if (op->read_time()) {
457
316
    op->read_time().AddToPB(req);
458
316
  }
459
7.53M
}
460
461
template <class OpType, class Req, class Out>
462
void FillOps(
463
12.0M
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
12.0M
  out->Reserve(narrow_cast<int>(ops.size()));
465
12.0M
  size_t idx = 0;
466
23.3M
  for (auto& op : ops) {
467
23.3M
    CHECK_EQ(op.yb_op->type(), expected_type);
468
23.3M
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
23.3M
    out->AddAllocated(concrete_op->mutable_request());
470
23.3M
    HandleExtraFields(concrete_op, req);
471
23.3M
    VLOG
(4) << ++idx << ") encoded row: " << op.yb_op->ToString()11.3k
;
472
23.3M
  }
473
12.0M
}
void yb::client::internal::FillOps<yb::client::YBRedisWriteOp, yb::tserver::WriteRequestPB, google::protobuf::RepeatedPtrField<yb::RedisWriteRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::WriteRequestPB*, google::protobuf::RepeatedPtrField<yb::RedisWriteRequestPB>*)
Line
Count
Source
463
123k
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
123k
  out->Reserve(narrow_cast<int>(ops.size()));
465
123k
  size_t idx = 0;
466
123k
  for (auto& op : ops) {
467
123k
    CHECK_EQ(op.yb_op->type(), expected_type);
468
123k
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
123k
    out->AddAllocated(concrete_op->mutable_request());
470
123k
    HandleExtraFields(concrete_op, req);
471
123k
    VLOG
(4) << ++idx << ") encoded row: " << op.yb_op->ToString()0
;
472
123k
  }
473
123k
}
void yb::client::internal::FillOps<yb::client::YBqlWriteOp, yb::tserver::WriteRequestPB, google::protobuf::RepeatedPtrField<yb::QLWriteRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::WriteRequestPB*, google::protobuf::RepeatedPtrField<yb::QLWriteRequestPB>*)
Line
Count
Source
463
1.72M
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
1.72M
  out->Reserve(narrow_cast<int>(ops.size()));
465
1.72M
  size_t idx = 0;
466
3.99M
  for (auto& op : ops) {
467
3.99M
    CHECK_EQ(op.yb_op->type(), expected_type);
468
3.99M
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
3.99M
    out->AddAllocated(concrete_op->mutable_request());
470
3.99M
    HandleExtraFields(concrete_op, req);
471
3.99M
    VLOG
(4) << ++idx << ") encoded row: " << op.yb_op->ToString()672
;
472
3.99M
  }
473
1.72M
}
void yb::client::internal::FillOps<yb::client::YBPgsqlWriteOp, yb::tserver::WriteRequestPB, google::protobuf::RepeatedPtrField<yb::PgsqlWriteRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::WriteRequestPB*, google::protobuf::RepeatedPtrField<yb::PgsqlWriteRequestPB>*)
Line
Count
Source
463
693k
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
693k
  out->Reserve(narrow_cast<int>(ops.size()));
465
693k
  size_t idx = 0;
466
7.21M
  for (auto& op : ops) {
467
7.21M
    CHECK_EQ(op.yb_op->type(), expected_type);
468
7.21M
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
7.21M
    out->AddAllocated(concrete_op->mutable_request());
470
7.21M
    HandleExtraFields(concrete_op, req);
471
7.21M
    VLOG
(4) << ++idx << ") encoded row: " << op.yb_op->ToString()52
;
472
7.21M
  }
473
693k
}
void yb::client::internal::FillOps<yb::client::YBRedisReadOp, yb::tserver::ReadRequestPB, google::protobuf::RepeatedPtrField<yb::RedisReadRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::ReadRequestPB*, google::protobuf::RepeatedPtrField<yb::RedisReadRequestPB>*)
Line
Count
Source
463
84.8k
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
84.8k
  out->Reserve(narrow_cast<int>(ops.size()));
465
84.8k
  size_t idx = 0;
466
84.8k
  for (auto& op : ops) {
467
84.8k
    CHECK_EQ(op.yb_op->type(), expected_type);
468
84.8k
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
84.8k
    out->AddAllocated(concrete_op->mutable_request());
470
84.8k
    HandleExtraFields(concrete_op, req);
471
84.8k
    VLOG
(4) << ++idx << ") encoded row: " << op.yb_op->ToString()0
;
472
84.8k
  }
473
84.8k
}
void yb::client::internal::FillOps<yb::client::YBqlReadOp, yb::tserver::ReadRequestPB, google::protobuf::RepeatedPtrField<yb::QLReadRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::ReadRequestPB*, google::protobuf::RepeatedPtrField<yb::QLReadRequestPB>*)
Line
Count
Source
463
7.53M
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
7.53M
  out->Reserve(narrow_cast<int>(ops.size()));
465
7.53M
  size_t idx = 0;
466
7.53M
  for (auto& op : ops) {
467
7.53M
    CHECK_EQ(op.yb_op->type(), expected_type);
468
7.53M
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
7.53M
    out->AddAllocated(concrete_op->mutable_request());
470
7.53M
    HandleExtraFields(concrete_op, req);
471
7.53M
    VLOG
(4) << ++idx << ") encoded row: " << op.yb_op->ToString()10.7k
;
472
7.53M
  }
473
7.53M
}
void yb::client::internal::FillOps<yb::client::YBPgsqlReadOp, yb::tserver::ReadRequestPB, google::protobuf::RepeatedPtrField<yb::PgsqlReadRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::ReadRequestPB*, google::protobuf::RepeatedPtrField<yb::PgsqlReadRequestPB>*)
Line
Count
Source
463
1.91M
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
1.91M
  out->Reserve(narrow_cast<int>(ops.size()));
465
1.91M
  size_t idx = 0;
466
4.36M
  for (auto& op : ops) {
467
4.36M
    CHECK_EQ(op.yb_op->type(), expected_type);
468
4.36M
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
4.36M
    out->AddAllocated(concrete_op->mutable_request());
470
4.36M
    HandleExtraFields(concrete_op, req);
471
18.4E
    VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString();
472
4.36M
  }
473
1.91M
}
474
475
template <class Repeated>
476
36.2M
void ReleaseOps(Repeated* repeated) {
477
36.2M
  auto size = repeated->size();
478
36.2M
  if (size) {
479
12.0M
    repeated->ExtractSubrange(0, size, nullptr);
480
12.0M
  }
481
36.2M
}
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::RedisWriteRequestPB> >(google::protobuf::RepeatedPtrField<yb::RedisWriteRequestPB>*)
Line
Count
Source
476
2.54M
void ReleaseOps(Repeated* repeated) {
477
2.54M
  auto size = repeated->size();
478
2.54M
  if (size) {
479
123k
    repeated->ExtractSubrange(0, size, nullptr);
480
123k
  }
481
2.54M
}
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::QLWriteRequestPB> >(google::protobuf::RepeatedPtrField<yb::QLWriteRequestPB>*)
Line
Count
Source
476
2.54M
void ReleaseOps(Repeated* repeated) {
477
2.54M
  auto size = repeated->size();
478
2.54M
  if (size) {
479
1.73M
    repeated->ExtractSubrange(0, size, nullptr);
480
1.73M
  }
481
2.54M
}
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::PgsqlWriteRequestPB> >(google::protobuf::RepeatedPtrField<yb::PgsqlWriteRequestPB>*)
Line
Count
Source
476
2.54M
void ReleaseOps(Repeated* repeated) {
477
2.54M
  auto size = repeated->size();
478
2.54M
  if (size) {
479
693k
    repeated->ExtractSubrange(0, size, nullptr);
480
693k
  }
481
2.54M
}
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::RedisReadRequestPB> >(google::protobuf::RepeatedPtrField<yb::RedisReadRequestPB>*)
Line
Count
Source
476
9.53M
void ReleaseOps(Repeated* repeated) {
477
9.53M
  auto size = repeated->size();
478
9.53M
  if (size) {
479
84.8k
    repeated->ExtractSubrange(0, size, nullptr);
480
84.8k
  }
481
9.53M
}
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::QLReadRequestPB> >(google::protobuf::RepeatedPtrField<yb::QLReadRequestPB>*)
Line
Count
Source
476
9.52M
void ReleaseOps(Repeated* repeated) {
477
9.52M
  auto size = repeated->size();
478
9.52M
  if (size) {
479
7.51M
    repeated->ExtractSubrange(0, size, nullptr);
480
7.51M
  }
481
9.52M
}
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::PgsqlReadRequestPB> >(google::protobuf::RepeatedPtrField<yb::PgsqlReadRequestPB>*)
Line
Count
Source
476
9.50M
void ReleaseOps(Repeated* repeated) {
477
9.50M
  auto size = repeated->size();
478
9.50M
  if (size) {
479
1.90M
    repeated->ExtractSubrange(0, size, nullptr);
480
1.90M
  }
481
9.50M
}
482
483
WriteRpc::WriteRpc(const AsyncRpcData& data)
484
2.54M
    : AsyncRpcBase(data, YBConsistencyLevel::STRONG) {
485
2.54M
  TRACE_TO(trace_, "WriteRpc initiated");
486
2.54M
  VTRACE_TO(1, trace_, "Tablet $0 table $1", data.tablet->tablet_id(), table()->name().ToString());
487
488
  // Add the rows
489
2.54M
  switch (table()->table_type()) {
490
123k
    case YBTableType::REDIS_TABLE_TYPE:
491
123k
      FillOps<YBRedisWriteOp>(
492
123k
          ops_, YBOperation::Type::REDIS_WRITE, &req_, req_.mutable_redis_write_batch());
493
123k
      break;
494
1.73M
    case YBTableType::YQL_TABLE_TYPE:
495
1.73M
      FillOps<YBqlWriteOp>(
496
1.73M
          ops_, YBOperation::Type::QL_WRITE, &req_, req_.mutable_ql_write_batch());
497
1.73M
      break;
498
693k
    case YBTableType::PGSQL_TABLE_TYPE:
499
693k
      FillOps<YBPgsqlWriteOp>(
500
693k
          ops_, YBOperation::Type::PGSQL_WRITE, &req_, req_.mutable_pgsql_write_batch());
501
693k
      break;
502
0
    case YBTableType::UNKNOWN_TABLE_TYPE:
503
0
    case YBTableType::TRANSACTION_STATUS_TABLE_TYPE:
504
0
      LOG(DFATAL) << "Unsupported table type: " << table()->ToString();
505
0
      break;
506
2.54M
  }
507
508
2.54M
  VLOG(3) << "Created batch for " << data.tablet->tablet_id() << ":\n"
509
56
          << req_.ShortDebugString();
510
511
2.54M
  const auto& client_id = batcher_->client_id();
512
2.54M
  if (!client_id.IsNil() && 
FLAGS_detect_duplicates_for_retryable_requests2.54M
) {
513
2.54M
    auto temp = client_id.ToUInt64Pair();
514
2.54M
    req_.set_client_id1(temp.first);
515
2.54M
    req_.set_client_id2(temp.second);
516
2.54M
    auto request_pair = batcher_->NextRequestIdAndMinRunningRequestId(data.tablet->tablet_id());
517
2.54M
    req_.set_request_id(request_pair.first);
518
2.54M
    req_.set_min_running_request_id(request_pair.second);
519
2.54M
  }
520
2.54M
}
521
522
2.54M
WriteRpc::~WriteRpc() {
523
  // Check that we sent request id info, i.e. (client_id, request_id, min_running_request_id).
524
2.54M
  if (req_.has_client_id1()) {
525
2.54M
    batcher_->RequestFinished(tablet().tablet_id(), req_.request_id());
526
2.54M
  }
527
528
2.54M
  if (async_rpc_metrics_) {
529
2.51M
    scoped_refptr<Histogram> write_rpc_time = IsLocalCall() ?
530
1.81M
                                              async_rpc_metrics_->local_write_rpc_time :
531
2.51M
                                              
async_rpc_metrics_->remote_write_rpc_time701k
;
532
2.51M
    write_rpc_time->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_));
533
2.51M
  }
534
535
2.54M
  ReleaseOps(req_.mutable_redis_write_batch());
536
2.54M
  ReleaseOps(req_.mutable_ql_write_batch());
537
2.54M
  ReleaseOps(req_.mutable_pgsql_write_batch());
538
2.54M
}
539
540
2.55M
void WriteRpc::CallRemoteMethod() {
541
2.55M
  auto trace = trace_; // It is possible that we receive reply before returning from WriteAsync.
542
                       // Since send happens before we return from WriteAsync.
543
                       // So under heavy load it is possible that our request is handled and
544
                       // reply is received before WriteAsync returned.
545
2.55M
  TRACE_TO(trace, "SendRpcToTserver");
546
2.55M
  ADOPT_TRACE(trace.get());
547
548
2.55M
  tablet_invoker_.WriteAsync(req_, &resp_, PrepareController(),
549
2.55M
                             std::bind(&WriteRpc::Finished, this, Status::OK()));
550
2.55M
  TRACE_TO(trace, "RpcDispatched Asynchronously");
551
2.55M
}
552
553
2.45M
void WriteRpc::SwapResponses() {
554
2.45M
  int redis_idx = 0;
555
2.45M
  int ql_idx = 0;
556
2.45M
  int pgsql_idx = 0;
557
558
  // Retrieve Redis and QL responses and make sure we received all the responses back.
559
11.2M
  for (auto& op : ops_) {
560
11.2M
    YBOperation* yb_op = op.yb_op.get();
561
11.2M
    switch (yb_op->type()) {
562
123k
      case YBOperation::Type::REDIS_WRITE: {
563
123k
        if (redis_idx >= resp_.redis_response_batch().size()) {
564
0
          ++redis_idx;
565
0
          continue;
566
0
        }
567
        // Restore Redis write request PB and extract response.
568
123k
        auto* redis_op = down_cast<YBRedisWriteOp*>(yb_op);
569
123k
        redis_op->mutable_response()->Swap(resp_.mutable_redis_response_batch(redis_idx));
570
123k
        redis_idx++;
571
123k
        break;
572
123k
      }
573
3.93M
      case YBOperation::Type::QL_WRITE: {
574
3.93M
        if (ql_idx >= resp_.ql_response_batch().size()) {
575
0
          ++ql_idx;
576
0
          continue;
577
0
        }
578
        // Restore QL write request PB and extract response.
579
3.93M
        auto* ql_op = down_cast<YBqlWriteOp*>(yb_op);
580
3.93M
        ql_op->mutable_response()->Swap(resp_.mutable_ql_response_batch(ql_idx));
581
3.93M
        const auto& ql_response = ql_op->response();
582
3.93M
        if (ql_response.has_rows_data_sidecar()) {
583
273
          Slice rows_data = CHECK_RESULT(
584
273
              retrier().controller().GetSidecar(ql_response.rows_data_sidecar()));
585
273
          ql_op->mutable_rows_data()->assign(rows_data.cdata(), rows_data.size());
586
273
        }
587
3.93M
        ql_idx++;
588
3.93M
        break;
589
3.93M
      }
590
7.15M
      case YBOperation::Type::PGSQL_WRITE: {
591
7.15M
        if (pgsql_idx >= resp_.pgsql_response_batch().size()) {
592
0
          ++pgsql_idx;
593
0
          continue;
594
0
        }
595
        // Restore QL write request PB and extract response.
596
7.15M
        auto* pgsql_op = down_cast<YBPgsqlWriteOp*>(yb_op);
597
7.15M
        pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_response_batch(pgsql_idx));
598
7.15M
        const auto& pgsql_response = pgsql_op->response();
599
7.15M
        if (pgsql_response.has_rows_data_sidecar()) {
600
7.14M
          auto holder = CHECK_RESULT(
601
7.14M
              retrier().controller().GetSidecarHolder(pgsql_response.rows_data_sidecar()));
602
7.14M
          down_cast<YBPgsqlWriteOp*>(yb_op)->SetRowsData(holder.first, holder.second);
603
7.14M
        }
604
7.15M
        pgsql_idx++;
605
7.15M
        break;
606
7.15M
      }
607
0
      case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED;
608
0
      case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED;
609
0
      case YBOperation::Type::QL_READ:
610
0
        LOG(FATAL) << "Not a write operation " << op.yb_op->type();
611
0
        break;
612
11.2M
    }
613
11.2M
  }
614
615
2.45M
  if (redis_idx != resp_.redis_response_batch().size() ||
616
2.45M
      
ql_idx != resp_.ql_response_batch().size()2.45M
||
617
2.45M
      
pgsql_idx != resp_.pgsql_response_batch().size()2.45M
) {
618
0
    LOG(ERROR) << Substitute("Write response count mismatch: "
619
0
                             "$0 Redis requests sent, $1 responses received. "
620
0
                             "$2 Apache CQL requests sent, $3 responses received. "
621
0
                             "$4 PostgreSQL requests sent, $5 responses received.",
622
0
                             redis_idx, resp_.redis_response_batch().size(),
623
0
                             ql_idx, resp_.ql_response_batch().size(),
624
0
                             pgsql_idx, resp_.pgsql_response_batch().size());
625
0
    auto status = STATUS(IllegalState, "Write response count mismatch");
626
0
    LOG(ERROR) << status << ", request: " << req_.ShortDebugString()
627
0
               << ", response: " << resp_.ShortDebugString();
628
0
    batcher_->AddOpCountMismatchError();
629
0
    Failed(status);
630
0
  }
631
2.45M
}
632
633
2.54M
void WriteRpc::NotifyBatcher(const Status& status) {
634
2.54M
  batcher_->ProcessWriteResponse(*this, status);
635
2.54M
}
636
637
0
bool WriteRpc::ShouldRetryExpiredRequest() {
638
0
  return req_.min_running_request_id() == kInitializeFromMinRunning;
639
0
}
640
641
ReadRpc::ReadRpc(const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level)
642
9.54M
    : AsyncRpcBase(data, yb_consistency_level) {
643
9.54M
  TRACE_TO(trace_, "ReadRpc initiated");
644
9.54M
  VTRACE_TO(1, trace_, "Tablet $0 table $1", data.tablet->tablet_id(), table()->name().ToString());
645
9.54M
  req_.set_consistency_level(yb_consistency_level);
646
9.54M
  req_.set_proxy_uuid(data.batcher->proxy_uuid());
647
648
9.54M
  switch (table()->table_type()) {
649
84.8k
    case YBTableType::REDIS_TABLE_TYPE:
650
84.8k
      FillOps<YBRedisReadOp>(
651
84.8k
          ops_, YBOperation::Type::REDIS_READ, &req_, req_.mutable_redis_batch());
652
84.8k
      break;
653
7.54M
    case YBTableType::YQL_TABLE_TYPE:
654
7.54M
      FillOps<YBqlReadOp>(
655
7.54M
          ops_, YBOperation::Type::QL_READ, &req_, req_.mutable_ql_batch());
656
7.54M
      break;
657
1.91M
    case YBTableType::PGSQL_TABLE_TYPE:
658
1.91M
      FillOps<YBPgsqlReadOp>(
659
1.91M
          ops_, YBOperation::Type::PGSQL_READ, &req_, req_.mutable_pgsql_batch());
660
1.91M
      break;
661
0
    case YBTableType::UNKNOWN_TABLE_TYPE:
662
0
    case YBTableType::TRANSACTION_STATUS_TABLE_TYPE:
663
0
      LOG(DFATAL) << "Unsupported table type: " << table()->ToString();
664
0
      break;
665
9.54M
  }
666
667
18.4E
  VLOG(3) << "Created batch for " << data.tablet->tablet_id() << ":\n"
668
18.4E
          << req_.ShortDebugString();
669
9.52M
}
670
671
9.52M
ReadRpc::~ReadRpc() {
672
  // Get locality metrics if enabled, but skip for system tables as those go to the master.
673
9.52M
  if (async_rpc_metrics_ && 
!table()->name().is_system()9.51M
) {
674
9.28M
    scoped_refptr<Histogram> read_rpc_time = IsLocalCall() ?
675
7.41M
                                             async_rpc_metrics_->local_read_rpc_time :
676
9.28M
                                             
async_rpc_metrics_->remote_read_rpc_time1.86M
;
677
678
9.28M
    read_rpc_time->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_));
679
9.28M
  }
680
681
9.52M
  ReleaseOps(req_.mutable_redis_batch());
682
9.52M
  ReleaseOps(req_.mutable_ql_batch());
683
9.52M
  ReleaseOps(req_.mutable_pgsql_batch());
684
9.52M
}
685
686
9.56M
void ReadRpc::CallRemoteMethod() {
687
9.56M
  auto trace = trace_; // It is possible that we receive reply before returning from ReadAsync.
688
                       // Detailed explanation in WriteRpc::SendRpcToTserver.
689
9.56M
  TRACE_TO(trace, "SendRpcToTserver");
690
9.56M
  ADOPT_TRACE(trace.get());
691
692
9.56M
  tablet_invoker_.ReadAsync(req_, &resp_, PrepareController(),
693
9.56M
                            std::bind(&ReadRpc::Finished, this, Status::OK()));
694
9.56M
  TRACE_TO(trace, "RpcDispatched Asynchronously");
695
9.56M
}
696
697
9.40M
void ReadRpc::SwapResponses() {
698
9.40M
  int redis_idx = 0;
699
9.40M
  int ql_idx = 0;
700
9.40M
  int pgsql_idx = 0;
701
702
  // Retrieve Redis and QL responses and make sure we received all the responses back.
703
11.0M
  for (auto& op : ops_) {
704
11.0M
    YBOperation* yb_op = op.yb_op.get();
705
11.0M
    switch (yb_op->type()) {
706
84.6k
      case YBOperation::Type::REDIS_READ: {
707
84.6k
        if (redis_idx >= resp_.redis_batch().size()) {
708
0
          batcher_->AddOpCountMismatchError();
709
0
          return;
710
0
        }
711
        // Restore Redis read request PB and extract response.
712
84.6k
        auto* redis_op = down_cast<YBRedisReadOp*>(yb_op);
713
84.6k
        redis_op->mutable_response()->Swap(resp_.mutable_redis_batch(redis_idx));
714
84.6k
        redis_idx++;
715
84.6k
        break;
716
84.6k
      }
717
7.52M
      case YBOperation::Type::QL_READ: {
718
7.52M
        if (ql_idx >= resp_.ql_batch().size()) {
719
0
          batcher_->AddOpCountMismatchError();
720
0
          return;
721
0
        }
722
        // Restore QL read request PB and extract response.
723
7.52M
        auto* ql_op = down_cast<YBqlReadOp*>(yb_op);
724
7.52M
        ql_op->mutable_response()->Swap(resp_.mutable_ql_batch(ql_idx));
725
7.52M
        const auto& ql_response = ql_op->response();
726
7.52M
        if (ql_response.has_rows_data_sidecar()) {
727
7.50M
          Slice rows_data = CHECK_RESULT(retrier().controller().GetSidecar(
728
7.50M
              ql_response.rows_data_sidecar()));
729
7.50M
          ql_op->mutable_rows_data()->assign(rows_data.cdata(), rows_data.size());
730
7.50M
        }
731
7.52M
        ql_idx++;
732
7.52M
        break;
733
7.52M
      }
734
3.47M
      case YBOperation::Type::PGSQL_READ: {
735
3.47M
        if (pgsql_idx >= resp_.pgsql_batch().size()) {
736
0
          batcher_->AddOpCountMismatchError();
737
0
          return;
738
0
        }
739
        // Restore PGSQL read request PB and extract response.
740
3.47M
        auto* pgsql_op = down_cast<YBPgsqlReadOp*>(yb_op);
741
3.47M
        if (resp_.has_used_read_time()) {
742
1.95M
          pgsql_op->SetUsedReadTime(ReadHybridTime::FromPB(resp_.used_read_time()));
743
1.95M
        }
744
3.47M
        pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_batch(pgsql_idx));
745
3.47M
        const auto& pgsql_response = pgsql_op->response();
746
3.47M
        if (
pgsql_response.has_rows_data_sidecar()3.47M
) {
747
3.47M
          auto holder = CHECK_RESULT(
748
3.47M
              retrier().controller().GetSidecarHolder(pgsql_response.rows_data_sidecar()));
749
3.47M
          down_cast<YBPgsqlReadOp*>(yb_op)->SetRowsData(holder.first, holder.second);
750
3.47M
        }
751
3.47M
        pgsql_idx++;
752
3.47M
        break;
753
3.47M
      }
754
0
      case YBOperation::Type::PGSQL_WRITE: FALLTHROUGH_INTENDED;
755
0
      case YBOperation::Type::REDIS_WRITE: FALLTHROUGH_INTENDED;
756
0
      case YBOperation::Type::QL_WRITE:
757
0
        LOG(FATAL) << "Not a read operation " << op.yb_op->type();
758
0
        break;
759
11.0M
    }
760
11.0M
  }
761
762
9.43M
  if (redis_idx != resp_.redis_batch().size() ||
763
9.43M
      
ql_idx != resp_.ql_batch().size()9.41M
||
764
9.43M
      
pgsql_idx != resp_.pgsql_batch().size()9.40M
) {
765
0
    LOG(ERROR) << Substitute("Read response count mismatch: "
766
0
                             "$0 Redis requests sent, $1 responses received. "
767
0
                             "$2 QL requests sent, $3 responses received. "
768
0
                             "$4 QL requests sent, $5 responses received.",
769
0
                             redis_idx, resp_.redis_batch().size(),
770
0
                             ql_idx, resp_.ql_batch().size(),
771
0
                             pgsql_idx, resp_.pgsql_batch().size());
772
0
    batcher_->AddOpCountMismatchError();
773
0
    Failed(STATUS(IllegalState, "Read response count mismatch"));
774
0
  }
775
776
9.43M
}
777
778
9.54M
void ReadRpc::NotifyBatcher(const Status& status) {
779
9.54M
  batcher_->ProcessReadResponse(*this, status);
780
9.54M
}
781
782
}  // namespace internal
783
}  // namespace client
784
}  // namespace yb