YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/async_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/async_rpc.h"
15
16
#include "yb/client/batcher.h"
17
#include "yb/client/client_error.h"
18
#include "yb/client/in_flight_op.h"
19
#include "yb/client/meta_cache.h"
20
#include "yb/client/table.h"
21
#include "yb/client/yb_op.h"
22
#include "yb/client/yb_table_name.h"
23
24
#include "yb/common/pgsql_error.h"
25
#include "yb/common/schema.h"
26
#include "yb/common/transaction.h"
27
#include "yb/common/transaction_error.h"
28
#include "yb/common/wire_protocol.h"
29
30
#include "yb/gutil/casts.h"
31
#include "yb/gutil/strings/substitute.h"
32
33
#include "yb/rpc/rpc_controller.h"
34
35
#include "yb/util/cast.h"
36
#include "yb/util/flag_tags.h"
37
#include "yb/util/logging.h"
38
#include "yb/util/metrics.h"
39
#include "yb/util/result.h"
40
#include "yb/util/status_log.h"
41
#include "yb/util/trace.h"
42
#include "yb/util/yb_pg_errcodes.h"
43
44
// TODO: do we need word Redis in following two metrics? ReadRpc and WriteRpc objects emitting
45
// these metrics are used not only in Redis service.
46
METRIC_DEFINE_coarse_histogram(
47
    server, handler_latency_yb_client_write_remote, "yb.client.Write remote call time",
48
    yb::MetricUnit::kMicroseconds, "Microseconds spent in the remote Write call ");
49
METRIC_DEFINE_coarse_histogram(
50
    server, handler_latency_yb_client_read_remote, "yb.client.Read remote call time",
51
    yb::MetricUnit::kMicroseconds, "Microseconds spent in the remote Read call ");
52
METRIC_DEFINE_coarse_histogram(
53
    server, handler_latency_yb_client_write_local, "yb.client.Write local call time",
54
    yb::MetricUnit::kMicroseconds, "Microseconds spent in the local Write call ");
55
METRIC_DEFINE_coarse_histogram(
56
    server, handler_latency_yb_client_read_local, "yb.client.Read local call time",
57
    yb::MetricUnit::kMicroseconds, "Microseconds spent in the local Read call ");
58
METRIC_DEFINE_coarse_histogram(
59
    server, handler_latency_yb_client_time_to_send,
60
    "Time taken for a Write/Read rpc to be sent to the server", yb::MetricUnit::kMicroseconds,
61
    "Microseconds spent before sending the request to the server");
62
63
METRIC_DEFINE_counter(server, consistent_prefix_successful_reads,
64
    "Number of consistent prefix reads that were served by the closest replica.",
65
    yb::MetricUnit::kRequests,
66
    "Number of consistent prefix reads that were served by the closest replica.");
67
68
METRIC_DEFINE_counter(server, consistent_prefix_failed_reads,
69
    "Number of consistent prefix reads that failed to be served by the closest replica.",
70
    yb::MetricUnit::kRequests,
71
    "Number of consistent prefix reads that failed to be served by the closest replica.");
72
73
DEFINE_int32(ybclient_print_trace_every_n, 0,
74
             "Controls the rate at which traces from ybclient are printed. Setting this to 0 "
75
             "disables printing the collected traces.");
76
TAG_FLAG(ybclient_print_trace_every_n, advanced);
77
TAG_FLAG(ybclient_print_trace_every_n, runtime);
78
79
DEFINE_bool(forward_redis_requests, true, "If false, the redis op will not be served if it's not "
80
            "a local request. The op response will be set to the redis error "
81
            "'-MOVED partition_key 0.0.0.0:0'. This works with jedis which only looks at the MOVED "
82
            "part of the reply and ignores the rest. For now, if this flag is true, we will only "
83
            "attempt to read from leaders, so redis_allow_reads_from_followers will be ignored.");
84
85
DEFINE_bool(detect_duplicates_for_retryable_requests, true,
86
            "Enable tracking of write requests that prevents the same write from being applied "
87
                "twice.");
88
89
DEFINE_bool(ysql_forward_rpcs_to_local_tserver, false,
90
            "When true, forward the PGSQL rpcs to the local tServer.");
91
92
DEFINE_CAPABILITY(PickReadTimeAtTabletServer, 0x8284d67b);
93
94
DECLARE_bool(collect_end_to_end_traces);
95
96
using namespace std::placeholders;
97
98
namespace yb {
99
100
using std::shared_ptr;
101
using rpc::ErrorStatusPB;
102
using rpc::Messenger;
103
using rpc::Rpc;
104
using rpc::RpcController;
105
using tserver::WriteRequestPB;
106
using tserver::WriteResponsePB;
107
using tserver::WriteResponsePB_PerRowErrorPB;
108
using strings::Substitute;
109
110
namespace client {
111
112
namespace internal {
113
114
5.99M
bool IsTracingEnabled() {
115
5.99M
  return FLAGS_collect_end_to_end_traces;
116
5.99M
}
117
118
namespace {
119
120
5.99M
bool LocalTabletServerOnly(const InFlightOps& ops) {
121
5.99M
  const auto op_type = ops.front().yb_op->type();
122
5.99M
  return ((op_type == YBOperation::Type::REDIS_READ || op_type == YBOperation::Type::REDIS_WRITE) &&
123
104k
          !FLAGS_forward_redis_requests);
124
5.99M
}
125
126
}
127
128
AsyncRpcMetrics::AsyncRpcMetrics(const scoped_refptr<yb::MetricEntity>& entity)
129
    : remote_write_rpc_time(METRIC_handler_latency_yb_client_write_remote.Instantiate(entity)),
130
      remote_read_rpc_time(METRIC_handler_latency_yb_client_read_remote.Instantiate(entity)),
131
      local_write_rpc_time(METRIC_handler_latency_yb_client_write_local.Instantiate(entity)),
132
      local_read_rpc_time(METRIC_handler_latency_yb_client_read_local.Instantiate(entity)),
133
      time_to_send(METRIC_handler_latency_yb_client_time_to_send.Instantiate(entity)),
134
      consistent_prefix_successful_reads(
135
          METRIC_consistent_prefix_successful_reads.Instantiate(entity)),
136
123k
      consistent_prefix_failed_reads(METRIC_consistent_prefix_failed_reads.Instantiate(entity)) {
137
123k
}
138
139
AsyncRpc::AsyncRpc(
140
    const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level)
141
    : Rpc(data.batcher->deadline(), data.batcher->messenger(), &data.batcher->proxy_cache()),
142
      batcher_(data.batcher),
143
      ops_(data.ops),
144
      tablet_invoker_(LocalTabletServerOnly(ops_),
145
                      yb_consistency_level == YBConsistencyLevel::CONSISTENT_PREFIX,
146
                      data.batcher->client_,
147
                      this,
148
                      this,
149
                      data.tablet,
150
                      table(),
151
                      mutable_retrier(),
152
                      trace_.get()),
153
      start_(CoarseMonoClock::Now()),
154
6.01M
      async_rpc_metrics_(data.batcher->async_rpc_metrics()) {
155
6.01M
  mutable_retrier()->mutable_controller()->set_allow_local_calls_in_curr_thread(
156
6.01M
      data.allow_local_calls_in_curr_thread);
157
6.01M
}
158
159
6.00M
AsyncRpc::~AsyncRpc() {
160
6.00M
  if (trace_->must_print()) {
161
0
    LOG(INFO) << ToString() << " took " << ToMicroseconds(CoarseMonoClock::Now() - start_)
162
0
              << "us. Trace:\n" << trace_->DumpToString(true);
163
6.00M
  } else {
164
6.00M
    const auto print_trace_every_n = GetAtomicFlag(&FLAGS_ybclient_print_trace_every_n);
165
6.00M
    if (print_trace_every_n > 0) {
166
0
      YB_LOG_EVERY_N(INFO, print_trace_every_n)
167
0
          << ToString() << " took "
168
0
          << ToMicroseconds(CoarseMonoClock::Now() - start_)
169
0
          << "us. Trace:\n" << trace_->DumpToString(true);
170
0
    }
171
6.00M
  }
172
6.00M
}
173
174
6.02M
void AsyncRpc::SendRpc() {
175
6.02M
  TRACE_TO(trace_, "SendRpc() called.");
176
177
6.02M
  retained_self_ = shared_from_this();
178
  // For now, if this is a retry, execute this rpc on the leader even if
179
  // the consistency level is YBConsistencyLevel::CONSISTENT_PREFIX or
180
  // FLAGS_redis_allow_reads_from_followers is set to true.
181
  // TODO(hector): Temporarily blacklist the follower that couldn't serve the read so we can retry
182
  // on another follower.
183
6.02M
  if (async_rpc_metrics_ && num_attempts() > 1 && tablet_invoker_.is_consistent_prefix()) {
184
2.23k
    IncrementCounter(async_rpc_metrics_->consistent_prefix_failed_reads);
185
2.23k
  }
186
6.02M
  tablet_invoker_.Execute(std::string(), num_attempts() > 1);
187
6.02M
}
188
189
124k
std::string AsyncRpc::ToString() const {
190
124k
  const auto& transaction = batcher_->in_flight_ops().metadata.transaction;
191
124k
  const auto subtransaction_opt = batcher_->in_flight_ops().metadata.subtransaction;
192
124k
  return Format("$0(tablet: $1, num_ops: $2, num_attempts: $3, txn: $4, subtxn: $5)",
193
81.1k
                ops_.front().yb_op->read_only() ? "Read" : "Write",
194
124k
                tablet().tablet_id(), ops_.size(), num_attempts(),
195
124k
                transaction.transaction_id,
196
124k
                subtransaction_opt
197
23.6k
                    ? Format("$0", subtransaction_opt->subtransaction_id)
198
101k
                    : "[none]");
199
124k
}
200
201
18.0M
std::shared_ptr<const YBTable> AsyncRpc::table() const {
202
  // All of the ops for a given tablet obviously correspond to the same table,
203
  // so we'll just grab the table from the first.
204
18.0M
  return ops_[0].yb_op->table();
205
18.0M
}
206
207
6.01M
void AsyncRpc::Finished(const Status& status) {
208
6.01M
  Status new_status = status;
209
6.01M
  if (tablet_invoker_.Done(&new_status)) {
210
5.98M
    if (tablet().is_split() ||
211
6.00M
        ClientError(new_status) == ClientErrorCode::kTablePartitionListIsStale) {
212
31
      ops_[0].yb_op->MarkTablePartitionListAsStale();
213
31
    }
214
5.98M
    if (async_rpc_metrics_ && status.ok() && tablet_invoker_.is_consistent_prefix()) {
215
11.8k
      IncrementCounter(async_rpc_metrics_->consistent_prefix_successful_reads);
216
11.8k
    }
217
5.98M
    ProcessResponseFromTserver(new_status);
218
5.98M
    batcher_->Flushed(ops_, new_status, MakeFlushExtraResult());
219
5.98M
    retained_self_.reset();
220
5.98M
  }
221
6.01M
}
222
223
124k
void AsyncRpc::Failed(const Status& status) {
224
124k
  std::string error_message = status.message().ToBuffer();
225
124k
  auto redis_error_code = status.IsInvalidCommand() || status.IsInvalidArgument() ?
226
124k
      RedisResponsePB_RedisStatusCode_PARSING_ERROR : RedisResponsePB_RedisStatusCode_SERVER_ERROR;
227
494k
  for (auto& op : ops_) {
228
494k
    YBOperation* yb_op = op.yb_op.get();
229
494k
    switch (yb_op->type()) {
230
84
      case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED;
231
89
      case YBOperation::Type::REDIS_WRITE: {
232
89
        RedisResponsePB* resp;
233
89
        if (yb_op->type() == YBOperation::Type::REDIS_READ) {
234
84
          resp = down_cast<YBRedisReadOp*>(yb_op)->mutable_response();
235
5
        } else {
236
5
          resp = down_cast<YBRedisWriteOp*>(yb_op)->mutable_response();
237
5
        }
238
89
        resp->Clear();
239
        // If the tserver replied it is not the leader, respond that the key has moved. We do not
240
        // need to return the address of the new leader because the Redis client will refresh the
241
        // cluster map instead.
242
89
        if (status.IsIllegalState()) {
243
5
          resp->set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR);
244
5
          resp->set_error_message(Substitute("MOVED $0 0.0.0.0:0",
245
5
                                             down_cast<YBRedisOp*>(yb_op)->hash_code()));
246
84
        } else {
247
84
          resp->set_code(redis_error_code);
248
84
          resp->set_error_message(error_message);
249
84
        }
250
89
        break;
251
84
      }
252
168
      case YBOperation::Type::QL_READ: FALLTHROUGH_INTENDED;
253
72.7k
      case YBOperation::Type::QL_WRITE: {
254
72.7k
        QLResponsePB* resp = down_cast<YBqlOp*>(yb_op)->mutable_response();
255
72.7k
        resp->Clear();
256
62.5k
        resp->set_status(status.IsTryAgain() ? QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR
257
10.1k
                                             : QLResponsePB::YQL_STATUS_RUNTIME_ERROR);
258
72.7k
        resp->set_error_message(error_message);
259
72.7k
        break;
260
168
      }
261
398k
      case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED;
262
421k
      case YBOperation::Type::PGSQL_WRITE: {
263
421k
        PgsqlResponsePB* resp = down_cast<YBPgsqlOp*>(yb_op)->mutable_response();
264
421k
        resp->set_status(status.IsTryAgain() ? PgsqlResponsePB::PGSQL_STATUS_RESTART_REQUIRED_ERROR
265
18.4E
                                             : PgsqlResponsePB::PGSQL_STATUS_RUNTIME_ERROR);
266
421k
        resp->set_error_message(error_message);
267
421k
        const uint8_t* pg_err_ptr = status.ErrorData(PgsqlErrorTag::kCategory);
268
421k
        if (pg_err_ptr != nullptr) {
269
151k
          resp->set_pg_error_code(static_cast<uint32_t>(PgsqlErrorTag::Decode(pg_err_ptr)));
270
270k
        } else {
271
270k
          resp->set_pg_error_code(static_cast<uint32_t>(YBPgErrorCode::YB_PG_INTERNAL_ERROR));
272
270k
        }
273
421k
        const uint8_t* txn_err_ptr = status.ErrorData(TransactionErrorTag::kCategory);
274
422k
        if (txn_err_ptr != nullptr) {
275
422k
          resp->set_txn_error_code(static_cast<uint16_t>(TransactionErrorTag::Decode(txn_err_ptr)));
276
18.4E
        } else {
277
18.4E
          resp->set_txn_error_code(static_cast<uint16_t>(TransactionErrorCode::kNone));
278
18.4E
        }
279
421k
        break;
280
398k
      }
281
0
      default:
282
0
        LOG(FATAL) << "Unsupported operation " << yb_op->type();
283
0
        break;
284
494k
    }
285
494k
  }
286
124k
}
287
288
5.70M
bool AsyncRpc::IsLocalCall() const {
289
5.70M
  return tablet_invoker_.IsLocalCall();
290
5.70M
}
291
292
namespace {
293
294
template<class T>
295
void SetMetadata(const InFlightOpsTransactionMetadata& metadata,
296
                 bool need_full_metadata,
297
781k
                 T* dest) {
298
781k
  auto* transaction = dest->mutable_transaction();
299
781k
  if (need_full_metadata) {
300
553k
    metadata.transaction.ToPB(transaction);
301
228k
  } else {
302
228k
    metadata.transaction.TransactionIdToPB(transaction);
303
228k
  }
304
781k
  dest->set_deprecated_may_have_metadata(true);
305
306
781k
  if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()) {
307
63.4k
    metadata.subtransaction->ToPB(dest->mutable_subtransaction());
308
63.4k
  }
309
781k
}
async_rpc.cc:_ZN2yb6client8internal12_GLOBAL__N_111SetMetadataINS_5docdb20KeyValueWriteBatchPBEEEvRKNS1_30InFlightOpsTransactionMetadataEbPT_
Line
Count
Source
297
414k
                 T* dest) {
298
414k
  auto* transaction = dest->mutable_transaction();
299
414k
  if (need_full_metadata) {
300
315k
    metadata.transaction.ToPB(transaction);
301
99.1k
  } else {
302
99.1k
    metadata.transaction.TransactionIdToPB(transaction);
303
99.1k
  }
304
414k
  dest->set_deprecated_may_have_metadata(true);
305
306
414k
  if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()) {
307
18.9k
    metadata.subtransaction->ToPB(dest->mutable_subtransaction());
308
18.9k
  }
309
414k
}
async_rpc.cc:_ZN2yb6client8internal12_GLOBAL__N_111SetMetadataINS_7tserver13ReadRequestPBEEEvRKNS1_30InFlightOpsTransactionMetadataEbPT_
Line
Count
Source
297
367k
                 T* dest) {
298
367k
  auto* transaction = dest->mutable_transaction();
299
367k
  if (need_full_metadata) {
300
238k
    metadata.transaction.ToPB(transaction);
301
129k
  } else {
302
129k
    metadata.transaction.TransactionIdToPB(transaction);
303
129k
  }
304
367k
  dest->set_deprecated_may_have_metadata(true);
305
306
367k
  if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()) {
307
44.4k
    metadata.subtransaction->ToPB(dest->mutable_subtransaction());
308
44.4k
  }
309
367k
}
310
311
void SetMetadata(const InFlightOpsTransactionMetadata& metadata,
312
                 bool need_full_metadata,
313
413k
                 tserver::WriteRequestPB* req) {
314
413k
  SetMetadata(metadata, need_full_metadata, req->mutable_write_batch());
315
413k
}
316
317
} // namespace
318
319
6.02M
void AsyncRpc::SendRpcToTserver(int attempt_num) {
320
6.02M
  if (async_rpc_metrics_) {
321
5.97M
    async_rpc_metrics_->time_to_send->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_));
322
5.97M
  }
323
6.02M
  CallRemoteMethod();
324
6.02M
}
325
326
template <class Req, class Resp>
327
AsyncRpcBase<Req, Resp>::AsyncRpcBase(
328
    const AsyncRpcData& data, YBConsistencyLevel consistency_level)
329
6.01M
    : AsyncRpc(data, consistency_level) {
330
6.01M
  req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id()));
331
6.01M
  req_.set_include_trace(IsTracingEnabled());
332
6.01M
  const ConsistentReadPoint* read_point = batcher_->read_point();
333
6.01M
  bool has_read_time = false;
334
6.01M
  if (read_point) {
335
5.85M
    req_.set_propagated_hybrid_time(read_point->Now().ToUint64());
336
    // Set read time for consistent read only if the table is transaction-enabled and
337
    // consistent read is required.
338
5.85M
    if (data.need_consistent_read &&
339
1.39M
        table()->InternalSchema().table_properties().is_transactional()) {
340
1.20M
      auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id());
341
1.20M
      if (read_time) {
342
936k
        has_read_time = true;
343
936k
        read_time.AddToPB(&req_);
344
936k
      }
345
1.20M
    }
346
5.85M
  }
347
6.01M
  if (!ops_.empty()) {
348
6.01M
    req_.set_batch_idx(ops_.front().batch_idx);
349
6.01M
  }
350
6.01M
  const auto& metadata = batcher_->in_flight_ops().metadata;
351
6.01M
  if (!metadata.transaction.transaction_id.IsNil()) {
352
781k
    SetMetadata(metadata, data.need_metadata, &req_);
353
781k
    bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION;
354
18.4E
    LOG_IF(DFATAL, has_read_time && serializable)
355
18.4E
        << "Read time should NOT be specified for serializable isolation: "
356
18.4E
        << read_point->GetReadTime().ToString();
357
781k
  }
358
6.01M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEEC2ERKNS1_12AsyncRpcDataENS_18YBConsistencyLevelE
Line
Count
Source
329
1.32M
    : AsyncRpc(data, consistency_level) {
330
1.32M
  req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id()));
331
1.32M
  req_.set_include_trace(IsTracingEnabled());
332
1.32M
  const ConsistentReadPoint* read_point = batcher_->read_point();
333
1.32M
  bool has_read_time = false;
334
1.32M
  if (read_point) {
335
1.23M
    req_.set_propagated_hybrid_time(read_point->Now().ToUint64());
336
    // Set read time for consistent read only if the table is transaction-enabled and
337
    // consistent read is required.
338
1.23M
    if (data.need_consistent_read &&
339
472k
        table()->InternalSchema().table_properties().is_transactional()) {
340
470k
      auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id());
341
470k
      if (read_time) {
342
356k
        has_read_time = true;
343
356k
        read_time.AddToPB(&req_);
344
356k
      }
345
470k
    }
346
1.23M
  }
347
1.32M
  if (!ops_.empty()) {
348
1.32M
    req_.set_batch_idx(ops_.front().batch_idx);
349
1.32M
  }
350
1.32M
  const auto& metadata = batcher_->in_flight_ops().metadata;
351
1.32M
  if (!metadata.transaction.transaction_id.IsNil()) {
352
413k
    SetMetadata(metadata, data.need_metadata, &req_);
353
413k
    bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION;
354
18.4E
    LOG_IF(DFATAL, has_read_time && serializable)
355
18.4E
        << "Read time should NOT be specified for serializable isolation: "
356
18.4E
        << read_point->GetReadTime().ToString();
357
413k
  }
358
1.32M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEEC2ERKNS1_12AsyncRpcDataENS_18YBConsistencyLevelE
Line
Count
Source
329
4.68M
    : AsyncRpc(data, consistency_level) {
330
4.68M
  req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id()));
331
4.68M
  req_.set_include_trace(IsTracingEnabled());
332
4.68M
  const ConsistentReadPoint* read_point = batcher_->read_point();
333
4.68M
  bool has_read_time = false;
334
4.68M
  if (read_point) {
335
4.62M
    req_.set_propagated_hybrid_time(read_point->Now().ToUint64());
336
    // Set read time for consistent read only if the table is transaction-enabled and
337
    // consistent read is required.
338
4.62M
    if (data.need_consistent_read &&
339
918k
        table()->InternalSchema().table_properties().is_transactional()) {
340
731k
      auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id());
341
731k
      if (read_time) {
342
579k
        has_read_time = true;
343
579k
        read_time.AddToPB(&req_);
344
579k
      }
345
731k
    }
346
4.62M
  }
347
4.69M
  if (!ops_.empty()) {
348
4.69M
    req_.set_batch_idx(ops_.front().batch_idx);
349
4.69M
  }
350
4.68M
  const auto& metadata = batcher_->in_flight_ops().metadata;
351
4.68M
  if (!metadata.transaction.transaction_id.IsNil()) {
352
367k
    SetMetadata(metadata, data.need_metadata, &req_);
353
367k
    bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION;
354
18.4E
    LOG_IF(DFATAL, has_read_time && serializable)
355
18.4E
        << "Read time should NOT be specified for serializable isolation: "
356
18.4E
        << read_point->GetReadTime().ToString();
357
367k
  }
358
4.68M
}
359
360
template <class Req, class Resp>
361
5.99M
AsyncRpcBase<Req, Resp>::~AsyncRpcBase() {
362
5.99M
  req_.release_tablet_id();
363
5.99M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEED2Ev
Line
Count
Source
361
1.32M
AsyncRpcBase<Req, Resp>::~AsyncRpcBase() {
362
1.32M
  req_.release_tablet_id();
363
1.32M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEED2Ev
Line
Count
Source
361
4.67M
AsyncRpcBase<Req, Resp>::~AsyncRpcBase() {
362
4.67M
  req_.release_tablet_id();
363
4.67M
}
364
365
template <class Req, class Resp>
366
6.00M
bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) {
367
6.00M
  if (!status.ok()) {
368
124k
    return false;
369
124k
  }
370
5.88M
  if (resp_.has_error()) {
371
0
    LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString()
372
0
                 << ". Requests not processed.";
373
    // If there is an error at the Rpc itself, there should be no individual responses.
374
    // All of them need to be marked as failed.
375
0
    Failed(StatusFromPB(resp_.error().status()));
376
0
    return false;
377
0
  }
378
5.88M
  auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_);
379
5.88M
  if (restart_read_time) {
380
42
    auto read_point = batcher_->read_point();
381
42
    if (read_point) {
382
42
      read_point->RestartRequired(req_.tablet_id(), restart_read_time);
383
42
    }
384
42
    Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(),
385
42
                  TransactionError(TransactionErrorCode::kReadRestartRequired)));
386
42
    return false;
387
42
  }
388
5.88M
  auto local_limit_ht = resp_.local_limit_ht();
389
5.88M
  if (local_limit_ht) {
390
575k
    auto read_point = batcher_->read_point();
391
575k
    if (read_point) {
392
574k
      read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht));
393
574k
    }
394
575k
  }
395
5.88M
  return true;
396
5.88M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE19CommonResponseCheckERKNS_6StatusE
Line
Count
Source
366
1.32M
bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) {
367
1.32M
  if (!status.ok()) {
368
80.6k
    return false;
369
80.6k
  }
370
1.24M
  if (resp_.has_error()) {
371
0
    LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString()
372
0
                 << ". Requests not processed.";
373
    // If there is an error at the Rpc itself, there should be no individual responses.
374
    // All of them need to be marked as failed.
375
0
    Failed(StatusFromPB(resp_.error().status()));
376
0
    return false;
377
0
  }
378
1.24M
  auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_);
379
1.24M
  if (restart_read_time) {
380
0
    auto read_point = batcher_->read_point();
381
0
    if (read_point) {
382
0
      read_point->RestartRequired(req_.tablet_id(), restart_read_time);
383
0
    }
384
0
    Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(),
385
0
                  TransactionError(TransactionErrorCode::kReadRestartRequired)));
386
0
    return false;
387
0
  }
388
1.24M
  auto local_limit_ht = resp_.local_limit_ht();
389
1.24M
  if (local_limit_ht) {
390
0
    auto read_point = batcher_->read_point();
391
0
    if (read_point) {
392
0
      read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht));
393
0
    }
394
0
  }
395
1.24M
  return true;
396
1.24M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE19CommonResponseCheckERKNS_6StatusE
Line
Count
Source
366
4.68M
bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) {
367
4.68M
  if (!status.ok()) {
368
43.5k
    return false;
369
43.5k
  }
370
4.64M
  if (resp_.has_error()) {
371
0
    LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString()
372
0
                 << ". Requests not processed.";
373
    // If there is an error at the Rpc itself, there should be no individual responses.
374
    // All of them need to be marked as failed.
375
0
    Failed(StatusFromPB(resp_.error().status()));
376
0
    return false;
377
0
  }
378
4.64M
  auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_);
379
4.64M
  if (restart_read_time) {
380
42
    auto read_point = batcher_->read_point();
381
42
    if (read_point) {
382
42
      read_point->RestartRequired(req_.tablet_id(), restart_read_time);
383
42
    }
384
42
    Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(),
385
42
                  TransactionError(TransactionErrorCode::kReadRestartRequired)));
386
42
    return false;
387
42
  }
388
4.64M
  auto local_limit_ht = resp_.local_limit_ht();
389
4.64M
  if (local_limit_ht) {
390
575k
    auto read_point = batcher_->read_point();
391
575k
    if (read_point) {
392
574k
      read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht));
393
574k
    }
394
575k
  }
395
4.64M
  return true;
396
4.64M
}
397
398
template <class Req, class Resp>
399
6.03M
void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) {
400
6.03M
  if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) {
401
24.9k
    ConsistentReadPoint* read_point = batcher_->read_point();
402
24.9k
    if (read_point && !read_point->GetReadTime()) {
403
0
      auto txn = batcher_->transaction();
404
      // If txn is not set, this is a consistent scan across multiple tablets of a
405
      // non-transactional YCQL table.
406
0
      if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) {
407
0
        read_point->SetCurrentReadTime();
408
0
        read_point->GetReadTime().AddToPB(&req_);
409
0
      }
410
0
    }
411
24.9k
  }
412
413
6.03M
  req_.set_rejection_score(batcher_->RejectionScore(attempt_num));
414
6.03M
  AsyncRpc::SendRpcToTserver(attempt_num);
415
6.03M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE16SendRpcToTserverEi
Line
Count
Source
399
1.32M
void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) {
400
1.32M
  if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) {
401
10.3k
    ConsistentReadPoint* read_point = batcher_->read_point();
402
10.3k
    if (read_point && !read_point->GetReadTime()) {
403
0
      auto txn = batcher_->transaction();
404
      // If txn is not set, this is a consistent scan across multiple tablets of a
405
      // non-transactional YCQL table.
406
0
      if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) {
407
0
        read_point->SetCurrentReadTime();
408
0
        read_point->GetReadTime().AddToPB(&req_);
409
0
      }
410
0
    }
411
10.3k
  }
412
413
1.32M
  req_.set_rejection_score(batcher_->RejectionScore(attempt_num));
414
1.32M
  AsyncRpc::SendRpcToTserver(attempt_num);
415
1.32M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE16SendRpcToTserverEi
Line
Count
Source
399
4.70M
void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) {
400
4.70M
  if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) {
401
14.5k
    ConsistentReadPoint* read_point = batcher_->read_point();
402
14.5k
    if (read_point && !read_point->GetReadTime()) {
403
0
      auto txn = batcher_->transaction();
404
      // If txn is not set, this is a consistent scan across multiple tablets of a
405
      // non-transactional YCQL table.
406
0
      if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) {
407
0
        read_point->SetCurrentReadTime();
408
0
        read_point->GetReadTime().AddToPB(&req_);
409
0
      }
410
0
    }
411
14.5k
  }
412
413
4.70M
  req_.set_rejection_score(batcher_->RejectionScore(attempt_num));
414
4.70M
  AsyncRpc::SendRpcToTserver(attempt_num);
415
4.70M
}
416
417
template <class Req, class Resp>
418
6.00M
void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) {
419
6.00M
  TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false));
420
6.00M
  if (resp_.has_trace_buffer()) {
421
0
    TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer());
422
0
  }
423
6.00M
  NotifyBatcher(status);
424
6.00M
  if (!CommonResponseCheck(status)) {
425
124k
    return;
426
124k
  }
427
5.88M
  SwapResponses();
428
5.88M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE26ProcessResponseFromTserverERKNS_6StatusE
Line
Count
Source
418
1.32M
void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) {
419
1.32M
  TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false));
420
1.32M
  if (resp_.has_trace_buffer()) {
421
0
    TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer());
422
0
  }
423
1.32M
  NotifyBatcher(status);
424
1.32M
  if (!CommonResponseCheck(status)) {
425
80.6k
    return;
426
80.6k
  }
427
1.24M
  SwapResponses();
428
1.24M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE26ProcessResponseFromTserverERKNS_6StatusE
Line
Count
Source
418
4.68M
void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) {
419
4.68M
  TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false));
420
4.68M
  if (resp_.has_trace_buffer()) {
421
0
    TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer());
422
0
  }
423
4.68M
  NotifyBatcher(status);
424
4.68M
  if (!CommonResponseCheck(status)) {
425
43.5k
    return;
426
43.5k
  }
427
4.63M
  SwapResponses();
428
4.63M
}
429
430
431
template <class Req, class Resp>
432
6.00M
FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() {
433
6.00M
  return {GetPropagatedHybridTime(resp_),
434
4.15M
          resp_.has_used_read_time() ? ReadHybridTime::FromPB(resp_.used_read_time())
435
1.84M
                                     : ReadHybridTime()};
436
6.00M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE20MakeFlushExtraResultEv
Line
Count
Source
432
1.32M
FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() {
433
1.32M
  return {GetPropagatedHybridTime(resp_),
434
110k
          resp_.has_used_read_time() ? ReadHybridTime::FromPB(resp_.used_read_time())
435
1.21M
                                     : ReadHybridTime()};
436
1.32M
}
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE20MakeFlushExtraResultEv
Line
Count
Source
432
4.68M
FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() {
433
4.68M
  return {GetPropagatedHybridTime(resp_),
434
4.04M
          resp_.has_used_read_time() ? ReadHybridTime::FromPB(resp_.used_read_time())
435
635k
                                     : ReadHybridTime()};
436
4.68M
}
437
438
template <class Op, class Req>
439
2.00M
void HandleExtraFields(Op* op, Req* req) {
440
2.00M
}
_ZN2yb6client8internal17HandleExtraFieldsINS0_14YBRedisWriteOpENS_7tserver14WriteRequestPBEEEvPT_PT0_
Line
Count
Source
439
61.5k
void HandleExtraFields(Op* op, Req* req) {
440
61.5k
}
_ZN2yb6client8internal17HandleExtraFieldsINS0_13YBRedisReadOpENS_7tserver13ReadRequestPBEEEvPT_PT0_
Line
Count
Source
439
43.0k
void HandleExtraFields(Op* op, Req* req) {
440
43.0k
}
_ZN2yb6client8internal17HandleExtraFieldsINS0_13YBPgsqlReadOpENS_7tserver13ReadRequestPBEEEvPT_PT0_
Line
Count
Source
439
1.89M
void HandleExtraFields(Op* op, Req* req) {
440
1.89M
}
441
442
3.02M
void HandleExtraFields(YBqlWriteOp* op, tserver::WriteRequestPB* req) {
443
3.02M
  if (op->write_time_for_backfill()) {
444
9.54k
    req->set_external_hybrid_time(op->write_time_for_backfill().ToUint64());
445
9.54k
    ReadHybridTime::SingleTime(op->write_time_for_backfill()).ToPB(req->mutable_read_time());
446
9.54k
  }
447
3.02M
}
448
449
2.12M
void HandleExtraFields(YBPgsqlWriteOp* op, tserver::WriteRequestPB* req) {
450
2.12M
  if (op->write_time()) {
451
226
    req->set_external_hybrid_time(op->write_time().ToUint64());
452
226
  }
453
2.12M
}
454
455
3.93M
void HandleExtraFields(YBqlReadOp* op, tserver::ReadRequestPB* req) {
456
3.93M
  if (op->read_time()) {
457
479
    op->read_time().AddToPB(req);
458
479
  }
459
3.93M
}
460
461
template <class OpType, class Req, class Out>
462
void FillOps(
463
6.00M
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
6.00M
  out->Reserve(narrow_cast<int>(ops.size()));
465
6.00M
  size_t idx = 0;
466
11.0M
  for (auto& op : ops) {
467
11.0M
    CHECK_EQ(op.yb_op->type(), expected_type);
468
11.0M
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
11.0M
    out->AddAllocated(concrete_op->mutable_request());
470
11.0M
    HandleExtraFields(concrete_op, req);
471
7.24k
    VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString();
472
11.0M
  }
473
6.00M
}
_ZN2yb6client8internal7FillOpsINS0_14YBRedisWriteOpENS_7tserver14WriteRequestPBEN6google8protobuf16RepeatedPtrFieldINS_19RedisWriteRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_
Line
Count
Source
463
61.5k
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
61.5k
  out->Reserve(narrow_cast<int>(ops.size()));
465
61.5k
  size_t idx = 0;
466
61.5k
  for (auto& op : ops) {
467
61.5k
    CHECK_EQ(op.yb_op->type(), expected_type);
468
61.5k
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
61.5k
    out->AddAllocated(concrete_op->mutable_request());
470
61.5k
    HandleExtraFields(concrete_op, req);
471
0
    VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString();
472
61.5k
  }
473
61.5k
}
_ZN2yb6client8internal7FillOpsINS0_11YBqlWriteOpENS_7tserver14WriteRequestPBEN6google8protobuf16RepeatedPtrFieldINS_16QLWriteRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_
Line
Count
Source
463
1.00M
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
1.00M
  out->Reserve(narrow_cast<int>(ops.size()));
465
1.00M
  size_t idx = 0;
466
3.02M
  for (auto& op : ops) {
467
3.02M
    CHECK_EQ(op.yb_op->type(), expected_type);
468
3.02M
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
3.02M
    out->AddAllocated(concrete_op->mutable_request());
470
3.02M
    HandleExtraFields(concrete_op, req);
471
489
    VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString();
472
3.02M
  }
473
1.00M
}
_ZN2yb6client8internal7FillOpsINS0_14YBPgsqlWriteOpENS_7tserver14WriteRequestPBEN6google8protobuf16RepeatedPtrFieldINS_19PgsqlWriteRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_
Line
Count
Source
463
261k
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
261k
  out->Reserve(narrow_cast<int>(ops.size()));
465
261k
  size_t idx = 0;
466
2.12M
  for (auto& op : ops) {
467
2.12M
    CHECK_EQ(op.yb_op->type(), expected_type);
468
2.12M
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
2.12M
    out->AddAllocated(concrete_op->mutable_request());
470
2.12M
    HandleExtraFields(concrete_op, req);
471
5
    VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString();
472
2.12M
  }
473
261k
}
_ZN2yb6client8internal7FillOpsINS0_13YBRedisReadOpENS_7tserver13ReadRequestPBEN6google8protobuf16RepeatedPtrFieldINS_18RedisReadRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_
Line
Count
Source
463
43.0k
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
43.0k
  out->Reserve(narrow_cast<int>(ops.size()));
465
43.0k
  size_t idx = 0;
466
43.0k
  for (auto& op : ops) {
467
43.0k
    CHECK_EQ(op.yb_op->type(), expected_type);
468
43.0k
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
43.0k
    out->AddAllocated(concrete_op->mutable_request());
470
43.0k
    HandleExtraFields(concrete_op, req);
471
0
    VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString();
472
43.0k
  }
473
43.0k
}
_ZN2yb6client8internal7FillOpsINS0_10YBqlReadOpENS_7tserver13ReadRequestPBEN6google8protobuf16RepeatedPtrFieldINS_15QLReadRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_
Line
Count
Source
463
3.93M
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
3.93M
  out->Reserve(narrow_cast<int>(ops.size()));
465
3.93M
  size_t idx = 0;
466
3.93M
  for (auto& op : ops) {
467
3.93M
    CHECK_EQ(op.yb_op->type(), expected_type);
468
3.93M
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
3.93M
    out->AddAllocated(concrete_op->mutable_request());
470
3.93M
    HandleExtraFields(concrete_op, req);
471
6.77k
    VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString();
472
3.93M
  }
473
3.93M
}
_ZN2yb6client8internal7FillOpsINS0_13YBPgsqlReadOpENS_7tserver13ReadRequestPBEN6google8protobuf16RepeatedPtrFieldINS_18PgsqlReadRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_
Line
Count
Source
463
708k
    const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) {
464
708k
  out->Reserve(narrow_cast<int>(ops.size()));
465
708k
  size_t idx = 0;
466
1.89M
  for (auto& op : ops) {
467
1.89M
    CHECK_EQ(op.yb_op->type(), expected_type);
468
1.89M
    auto* concrete_op = down_cast<OpType*>(op.yb_op.get());
469
1.89M
    out->AddAllocated(concrete_op->mutable_request());
470
1.89M
    HandleExtraFields(concrete_op, req);
471
18.4E
    VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString();
472
1.89M
  }
473
708k
}
474
475
template <class Repeated>
476
18.0M
void ReleaseOps(Repeated* repeated) {
477
18.0M
  auto size = repeated->size();
478
18.0M
  if (size) {
479
5.99M
    repeated->ExtractSubrange(0, size, nullptr);
480
5.99M
  }
481
18.0M
}
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_19RedisWriteRequestPBEEEEEvPT_
Line
Count
Source
476
1.32M
void ReleaseOps(Repeated* repeated) {
477
1.32M
  auto size = repeated->size();
478
1.32M
  if (size) {
479
61.5k
    repeated->ExtractSubrange(0, size, nullptr);
480
61.5k
  }
481
1.32M
}
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_16QLWriteRequestPBEEEEEvPT_
Line
Count
Source
476
1.32M
void ReleaseOps(Repeated* repeated) {
477
1.32M
  auto size = repeated->size();
478
1.32M
  if (size) {
479
1.00M
    repeated->ExtractSubrange(0, size, nullptr);
480
1.00M
  }
481
1.32M
}
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_19PgsqlWriteRequestPBEEEEEvPT_
Line
Count
Source
476
1.32M
void ReleaseOps(Repeated* repeated) {
477
1.32M
  auto size = repeated->size();
478
1.32M
  if (size) {
479
261k
    repeated->ExtractSubrange(0, size, nullptr);
480
261k
  }
481
1.32M
}
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_18RedisReadRequestPBEEEEEvPT_
Line
Count
Source
476
4.68M
void ReleaseOps(Repeated* repeated) {
477
4.68M
  auto size = repeated->size();
478
4.68M
  if (size) {
479
43.0k
    repeated->ExtractSubrange(0, size, nullptr);
480
43.0k
  }
481
4.68M
}
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_15QLReadRequestPBEEEEEvPT_
Line
Count
Source
476
4.67M
void ReleaseOps(Repeated* repeated) {
477
4.67M
  auto size = repeated->size();
478
4.67M
  if (size) {
479
3.92M
    repeated->ExtractSubrange(0, size, nullptr);
480
3.92M
  }
481
4.67M
}
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_18PgsqlReadRequestPBEEEEEvPT_
Line
Count
Source
476
4.67M
void ReleaseOps(Repeated* repeated) {
477
4.67M
  auto size = repeated->size();
478
4.67M
  if (size) {
479
707k
    repeated->ExtractSubrange(0, size, nullptr);
480
707k
  }
481
4.67M
}
482
483
WriteRpc::WriteRpc(const AsyncRpcData& data)
484
1.32M
    : AsyncRpcBase(data, YBConsistencyLevel::STRONG) {
485
1.32M
  TRACE_TO(trace_, "WriteRpc initiated");
486
1.32M
  VTRACE_TO(1, trace_, "Tablet $0 table $1", data.tablet->tablet_id(), table()->name().ToString());
487
488
  // Add the rows
489
1.32M
  switch (table()->table_type()) {
490
61.5k
    case YBTableType::REDIS_TABLE_TYPE:
491
61.5k
      FillOps<YBRedisWriteOp>(
492
61.5k
          ops_, YBOperation::Type::REDIS_WRITE, &req_, req_.mutable_redis_write_batch());
493
61.5k
      break;
494
1.00M
    case YBTableType::YQL_TABLE_TYPE:
495
1.00M
      FillOps<YBqlWriteOp>(
496
1.00M
          ops_, YBOperation::Type::QL_WRITE, &req_, req_.mutable_ql_write_batch());
497
1.00M
      break;
498
261k
    case YBTableType::PGSQL_TABLE_TYPE:
499
261k
      FillOps<YBPgsqlWriteOp>(
500
261k
          ops_, YBOperation::Type::PGSQL_WRITE, &req_, req_.mutable_pgsql_write_batch());
501
261k
      break;
502
0
    case YBTableType::UNKNOWN_TABLE_TYPE:
503
0
    case YBTableType::TRANSACTION_STATUS_TABLE_TYPE:
504
0
      LOG(DFATAL) << "Unsupported table type: " << table()->ToString();
505
0
      break;
506
1.32M
  }
507
508
18.4E
  VLOG(3) << "Created batch for " << data.tablet->tablet_id() << ":\n"
509
18.4E
          << req_.ShortDebugString();
510
511
1.32M
  const auto& client_id = batcher_->client_id();
512
1.32M
  if (!client_id.IsNil() && FLAGS_detect_duplicates_for_retryable_requests) {
513
1.32M
    auto temp = client_id.ToUInt64Pair();
514
1.32M
    req_.set_client_id1(temp.first);
515
1.32M
    req_.set_client_id2(temp.second);
516
1.32M
    auto request_pair = batcher_->NextRequestIdAndMinRunningRequestId(data.tablet->tablet_id());
517
1.32M
    req_.set_request_id(request_pair.first);
518
1.32M
    req_.set_min_running_request_id(request_pair.second);
519
1.32M
  }
520
1.32M
}
521
522
1.32M
WriteRpc::~WriteRpc() {
523
  // Check that we sent request id info, i.e. (client_id, request_id, min_running_request_id).
524
1.32M
  if (req_.has_client_id1()) {
525
1.32M
    batcher_->RequestFinished(tablet().tablet_id(), req_.request_id());
526
1.32M
  }
527
528
1.32M
  if (async_rpc_metrics_) {
529
1.27M
    scoped_refptr<Histogram> write_rpc_time = IsLocalCall() ?
530
884k
                                              async_rpc_metrics_->local_write_rpc_time :
531
389k
                                              async_rpc_metrics_->remote_write_rpc_time;
532
1.27M
    write_rpc_time->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_));
533
1.27M
  }
534
535
1.32M
  ReleaseOps(req_.mutable_redis_write_batch());
536
1.32M
  ReleaseOps(req_.mutable_ql_write_batch());
537
1.32M
  ReleaseOps(req_.mutable_pgsql_write_batch());
538
1.32M
}
539
540
1.32M
void WriteRpc::CallRemoteMethod() {
541
1.32M
  auto trace = trace_; // It is possible that we receive reply before returning from WriteAsync.
542
                       // Since send happens before we return from WriteAsync.
543
                       // So under heavy load it is possible that our request is handled and
544
                       // reply is received before WriteAsync returned.
545
1.32M
  TRACE_TO(trace, "SendRpcToTserver");
546
1.32M
  ADOPT_TRACE(trace.get());
547
548
1.32M
  tablet_invoker_.WriteAsync(req_, &resp_, PrepareController(),
549
1.32M
                             std::bind(&WriteRpc::Finished, this, Status::OK()));
550
1.32M
  TRACE_TO(trace, "RpcDispatched Asynchronously");
551
1.32M
}
552
553
1.24M
void WriteRpc::SwapResponses() {
554
1.24M
  int redis_idx = 0;
555
1.24M
  int ql_idx = 0;
556
1.24M
  int pgsql_idx = 0;
557
558
  // Retrieve Redis and QL responses and make sure we received all the responses back.
559
5.11M
  for (auto& op : ops_) {
560
5.11M
    YBOperation* yb_op = op.yb_op.get();
561
5.11M
    switch (yb_op->type()) {
562
61.5k
      case YBOperation::Type::REDIS_WRITE: {
563
61.5k
        if (redis_idx >= resp_.redis_response_batch().size()) {
564
0
          ++redis_idx;
565
0
          continue;
566
0
        }
567
        // Restore Redis write request PB and extract response.
568
61.5k
        auto* redis_op = down_cast<YBRedisWriteOp*>(yb_op);
569
61.5k
        redis_op->mutable_response()->Swap(resp_.mutable_redis_response_batch(redis_idx));
570
61.5k
        redis_idx++;
571
61.5k
        break;
572
61.5k
      }
573
2.95M
      case YBOperation::Type::QL_WRITE: {
574
2.95M
        if (ql_idx >= resp_.ql_response_batch().size()) {
575
0
          ++ql_idx;
576
0
          continue;
577
0
        }
578
        // Restore QL write request PB and extract response.
579
2.95M
        auto* ql_op = down_cast<YBqlWriteOp*>(yb_op);
580
2.95M
        ql_op->mutable_response()->Swap(resp_.mutable_ql_response_batch(ql_idx));
581
2.95M
        const auto& ql_response = ql_op->response();
582
2.95M
        if (ql_response.has_rows_data_sidecar()) {
583
273
          Slice rows_data = CHECK_RESULT(
584
273
              retrier().controller().GetSidecar(ql_response.rows_data_sidecar()));
585
273
          ql_op->mutable_rows_data()->assign(rows_data.cdata(), rows_data.size());
586
273
        }
587
2.95M
        ql_idx++;
588
2.95M
        break;
589
2.95M
      }
590
2.10M
      case YBOperation::Type::PGSQL_WRITE: {
591
2.10M
        if (pgsql_idx >= resp_.pgsql_response_batch().size()) {
592
0
          ++pgsql_idx;
593
0
          continue;
594
0
        }
595
        // Restore QL write request PB and extract response.
596
2.10M
        auto* pgsql_op = down_cast<YBPgsqlWriteOp*>(yb_op);
597
2.10M
        pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_response_batch(pgsql_idx));
598
2.10M
        const auto& pgsql_response = pgsql_op->response();
599
2.10M
        if (pgsql_response.has_rows_data_sidecar()) {
600
2.10M
          auto holder = CHECK_RESULT(
601
2.10M
              retrier().controller().GetSidecarHolder(pgsql_response.rows_data_sidecar()));
602
2.10M
          down_cast<YBPgsqlWriteOp*>(yb_op)->SetRowsData(holder.first, holder.second);
603
2.10M
        }
604
2.10M
        pgsql_idx++;
605
2.10M
        break;
606
2.10M
      }
607
0
      case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED;
608
0
      case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED;
609
0
      case YBOperation::Type::QL_READ:
610
0
        LOG(FATAL) << "Not a write operation " << op.yb_op->type();
611
0
        break;
612
5.11M
    }
613
5.11M
  }
614
615
1.24M
  if (redis_idx != resp_.redis_response_batch().size() ||
616
1.24M
      ql_idx != resp_.ql_response_batch().size() ||
617
1.24M
      pgsql_idx != resp_.pgsql_response_batch().size()) {
618
0
    LOG(ERROR) << Substitute("Write response count mismatch: "
619
0
                             "$0 Redis requests sent, $1 responses received. "
620
0
                             "$2 Apache CQL requests sent, $3 responses received. "
621
0
                             "$4 PostgreSQL requests sent, $5 responses received.",
622
0
                             redis_idx, resp_.redis_response_batch().size(),
623
0
                             ql_idx, resp_.ql_response_batch().size(),
624
0
                             pgsql_idx, resp_.pgsql_response_batch().size());
625
0
    auto status = STATUS(IllegalState, "Write response count mismatch");
626
0
    LOG(ERROR) << status << ", request: " << req_.ShortDebugString()
627
0
               << ", response: " << resp_.ShortDebugString();
628
0
    batcher_->AddOpCountMismatchError();
629
0
    Failed(status);
630
0
  }
631
1.24M
}
632
633
1.32M
void WriteRpc::NotifyBatcher(const Status& status) {
634
1.32M
  batcher_->ProcessWriteResponse(*this, status);
635
1.32M
}
636
637
0
bool WriteRpc::ShouldRetryExpiredRequest() {
638
0
  return req_.min_running_request_id() == kInitializeFromMinRunning;
639
0
}
640
641
ReadRpc::ReadRpc(const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level)
642
4.68M
    : AsyncRpcBase(data, yb_consistency_level) {
643
4.68M
  TRACE_TO(trace_, "ReadRpc initiated");
644
4.68M
  VTRACE_TO(1, trace_, "Tablet $0 table $1", data.tablet->tablet_id(), table()->name().ToString());
645
4.68M
  req_.set_consistency_level(yb_consistency_level);
646
4.68M
  req_.set_proxy_uuid(data.batcher->proxy_uuid());
647
648
4.68M
  switch (table()->table_type()) {
649
43.0k
    case YBTableType::REDIS_TABLE_TYPE:
650
43.0k
      FillOps<YBRedisReadOp>(
651
43.0k
          ops_, YBOperation::Type::REDIS_READ, &req_, req_.mutable_redis_batch());
652
43.0k
      break;
653
3.94M
    case YBTableType::YQL_TABLE_TYPE:
654
3.94M
      FillOps<YBqlReadOp>(
655
3.94M
          ops_, YBOperation::Type::QL_READ, &req_, req_.mutable_ql_batch());
656
3.94M
      break;
657
708k
    case YBTableType::PGSQL_TABLE_TYPE:
658
708k
      FillOps<YBPgsqlReadOp>(
659
708k
          ops_, YBOperation::Type::PGSQL_READ, &req_, req_.mutable_pgsql_batch());
660
708k
      break;
661
0
    case YBTableType::UNKNOWN_TABLE_TYPE:
662
0
    case YBTableType::TRANSACTION_STATUS_TABLE_TYPE:
663
0
      LOG(DFATAL) << "Unsupported table type: " << table()->ToString();
664
0
      break;
665
4.68M
  }
666
667
18.4E
  VLOG(3) << "Created batch for " << data.tablet->tablet_id() << ":\n"
668
18.4E
          << req_.ShortDebugString();
669
4.68M
}
670
671
4.68M
ReadRpc::~ReadRpc() {
672
  // Get locality metrics if enabled, but skip for system tables as those go to the master.
673
4.68M
  if (async_rpc_metrics_ && !table()->name().is_system()) {
674
4.43M
    scoped_refptr<Histogram> read_rpc_time = IsLocalCall() ?
675
3.75M
                                             async_rpc_metrics_->local_read_rpc_time :
676
678k
                                             async_rpc_metrics_->remote_read_rpc_time;
677
678
4.43M
    read_rpc_time->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_));
679
4.43M
  }
680
681
4.68M
  ReleaseOps(req_.mutable_redis_batch());
682
4.68M
  ReleaseOps(req_.mutable_ql_batch());
683
4.68M
  ReleaseOps(req_.mutable_pgsql_batch());
684
4.68M
}
685
686
4.70M
void ReadRpc::CallRemoteMethod() {
687
4.70M
  auto trace = trace_; // It is possible that we receive reply before returning from ReadAsync.
688
                       // Detailed explanation in WriteRpc::SendRpcToTserver.
689
4.70M
  TRACE_TO(trace, "SendRpcToTserver");
690
4.70M
  ADOPT_TRACE(trace.get());
691
692
4.70M
  tablet_invoker_.ReadAsync(req_, &resp_, PrepareController(),
693
4.70M
                            std::bind(&ReadRpc::Finished, this, Status::OK()));
694
4.70M
  TRACE_TO(trace, "RpcDispatched Asynchronously");
695
4.70M
}
696
697
4.64M
void ReadRpc::SwapResponses() {
698
4.64M
  int redis_idx = 0;
699
4.64M
  int ql_idx = 0;
700
4.64M
  int pgsql_idx = 0;
701
702
  // Retrieve Redis and QL responses and make sure we received all the responses back.
703
5.45M
  for (auto& op : ops_) {
704
5.45M
    YBOperation* yb_op = op.yb_op.get();
705
5.45M
    switch (yb_op->type()) {
706
42.9k
      case YBOperation::Type::REDIS_READ: {
707
42.9k
        if (redis_idx >= resp_.redis_batch().size()) {
708
0
          batcher_->AddOpCountMismatchError();
709
0
          return;
710
0
        }
711
        // Restore Redis read request PB and extract response.
712
42.9k
        auto* redis_op = down_cast<YBRedisReadOp*>(yb_op);
713
42.9k
        redis_op->mutable_response()->Swap(resp_.mutable_redis_batch(redis_idx));
714
42.9k
        redis_idx++;
715
42.9k
        break;
716
42.9k
      }
717
3.92M
      case YBOperation::Type::QL_READ: {
718
3.92M
        if (ql_idx >= resp_.ql_batch().size()) {
719
0
          batcher_->AddOpCountMismatchError();
720
0
          return;
721
0
        }
722
        // Restore QL read request PB and extract response.
723
3.92M
        auto* ql_op = down_cast<YBqlReadOp*>(yb_op);
724
3.92M
        ql_op->mutable_response()->Swap(resp_.mutable_ql_batch(ql_idx));
725
3.92M
        const auto& ql_response = ql_op->response();
726
3.92M
        if (ql_response.has_rows_data_sidecar()) {
727
3.91M
          Slice rows_data = CHECK_RESULT(retrier().controller().GetSidecar(
728
3.91M
              ql_response.rows_data_sidecar()));
729
3.91M
          ql_op->mutable_rows_data()->assign(rows_data.cdata(), rows_data.size());
730
3.91M
        }
731
3.92M
        ql_idx++;
732
3.92M
        break;
733
3.92M
      }
734
1.50M
      case YBOperation::Type::PGSQL_READ: {
735
1.50M
        if (pgsql_idx >= resp_.pgsql_batch().size()) {
736
0
          batcher_->AddOpCountMismatchError();
737
0
          return;
738
0
        }
739
        // Restore PGSQL read request PB and extract response.
740
1.50M
        auto* pgsql_op = down_cast<YBPgsqlReadOp*>(yb_op);
741
1.50M
        if (resp_.has_used_read_time()) {
742
949k
          pgsql_op->SetUsedReadTime(ReadHybridTime::FromPB(resp_.used_read_time()));
743
949k
        }
744
1.50M
        pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_batch(pgsql_idx));
745
1.50M
        const auto& pgsql_response = pgsql_op->response();
746
1.50M
        if (pgsql_response.has_rows_data_sidecar()) {
747
1.50M
          auto holder = CHECK_RESULT(
748
1.50M
              retrier().controller().GetSidecarHolder(pgsql_response.rows_data_sidecar()));
749
1.50M
          down_cast<YBPgsqlReadOp*>(yb_op)->SetRowsData(holder.first, holder.second);
750
1.50M
        }
751
1.50M
        pgsql_idx++;
752
1.50M
        break;
753
1.50M
      }
754
0
      case YBOperation::Type::PGSQL_WRITE: FALLTHROUGH_INTENDED;
755
0
      case YBOperation::Type::REDIS_WRITE: FALLTHROUGH_INTENDED;
756
0
      case YBOperation::Type::QL_WRITE:
757
0
        LOG(FATAL) << "Not a read operation " << op.yb_op->type();
758
0
        break;
759
5.45M
    }
760
5.45M
  }
761
762
4.64M
  if (redis_idx != resp_.redis_batch().size() ||
763
4.62M
      ql_idx != resp_.ql_batch().size() ||
764
4.63M
      pgsql_idx != resp_.pgsql_batch().size()) {
765
0
    LOG(ERROR) << Substitute("Read response count mismatch: "
766
0
                             "$0 Redis requests sent, $1 responses received. "
767
0
                             "$2 QL requests sent, $3 responses received. "
768
0
                             "$4 QL requests sent, $5 responses received.",
769
0
                             redis_idx, resp_.redis_batch().size(),
770
0
                             ql_idx, resp_.ql_batch().size(),
771
0
                             pgsql_idx, resp_.pgsql_batch().size());
772
0
    batcher_->AddOpCountMismatchError();
773
0
    Failed(STATUS(IllegalState, "Read response count mismatch"));
774
0
  }
775
776
4.64M
}
777
778
4.68M
void ReadRpc::NotifyBatcher(const Status& status) {
779
4.68M
  batcher_->ProcessReadResponse(*this, status);
780
4.68M
}
781
782
}  // namespace internal
783
}  // namespace client
784
}  // namespace yb