/Users/deen/code/yugabyte-db/src/yb/client/async_rpc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/async_rpc.h" |
15 | | |
16 | | #include "yb/client/batcher.h" |
17 | | #include "yb/client/client_error.h" |
18 | | #include "yb/client/in_flight_op.h" |
19 | | #include "yb/client/meta_cache.h" |
20 | | #include "yb/client/table.h" |
21 | | #include "yb/client/yb_op.h" |
22 | | #include "yb/client/yb_table_name.h" |
23 | | |
24 | | #include "yb/common/pgsql_error.h" |
25 | | #include "yb/common/schema.h" |
26 | | #include "yb/common/transaction.h" |
27 | | #include "yb/common/transaction_error.h" |
28 | | #include "yb/common/wire_protocol.h" |
29 | | |
30 | | #include "yb/gutil/casts.h" |
31 | | #include "yb/gutil/strings/substitute.h" |
32 | | |
33 | | #include "yb/rpc/rpc_controller.h" |
34 | | |
35 | | #include "yb/util/cast.h" |
36 | | #include "yb/util/flag_tags.h" |
37 | | #include "yb/util/logging.h" |
38 | | #include "yb/util/metrics.h" |
39 | | #include "yb/util/result.h" |
40 | | #include "yb/util/status_log.h" |
41 | | #include "yb/util/trace.h" |
42 | | #include "yb/util/yb_pg_errcodes.h" |
43 | | |
44 | | // TODO: do we need word Redis in following two metrics? ReadRpc and WriteRpc objects emitting |
45 | | // these metrics are used not only in Redis service. |
46 | | METRIC_DEFINE_coarse_histogram( |
47 | | server, handler_latency_yb_client_write_remote, "yb.client.Write remote call time", |
48 | | yb::MetricUnit::kMicroseconds, "Microseconds spent in the remote Write call "); |
49 | | METRIC_DEFINE_coarse_histogram( |
50 | | server, handler_latency_yb_client_read_remote, "yb.client.Read remote call time", |
51 | | yb::MetricUnit::kMicroseconds, "Microseconds spent in the remote Read call "); |
52 | | METRIC_DEFINE_coarse_histogram( |
53 | | server, handler_latency_yb_client_write_local, "yb.client.Write local call time", |
54 | | yb::MetricUnit::kMicroseconds, "Microseconds spent in the local Write call "); |
55 | | METRIC_DEFINE_coarse_histogram( |
56 | | server, handler_latency_yb_client_read_local, "yb.client.Read local call time", |
57 | | yb::MetricUnit::kMicroseconds, "Microseconds spent in the local Read call "); |
58 | | METRIC_DEFINE_coarse_histogram( |
59 | | server, handler_latency_yb_client_time_to_send, |
60 | | "Time taken for a Write/Read rpc to be sent to the server", yb::MetricUnit::kMicroseconds, |
61 | | "Microseconds spent before sending the request to the server"); |
62 | | |
63 | | METRIC_DEFINE_counter(server, consistent_prefix_successful_reads, |
64 | | "Number of consistent prefix reads that were served by the closest replica.", |
65 | | yb::MetricUnit::kRequests, |
66 | | "Number of consistent prefix reads that were served by the closest replica."); |
67 | | |
68 | | METRIC_DEFINE_counter(server, consistent_prefix_failed_reads, |
69 | | "Number of consistent prefix reads that failed to be served by the closest replica.", |
70 | | yb::MetricUnit::kRequests, |
71 | | "Number of consistent prefix reads that failed to be served by the closest replica."); |
72 | | |
73 | | DEFINE_int32(ybclient_print_trace_every_n, 0, |
74 | | "Controls the rate at which traces from ybclient are printed. Setting this to 0 " |
75 | | "disables printing the collected traces."); |
76 | | TAG_FLAG(ybclient_print_trace_every_n, advanced); |
77 | | TAG_FLAG(ybclient_print_trace_every_n, runtime); |
78 | | |
79 | | DEFINE_bool(forward_redis_requests, true, "If false, the redis op will not be served if it's not " |
80 | | "a local request. The op response will be set to the redis error " |
81 | | "'-MOVED partition_key 0.0.0.0:0'. This works with jedis which only looks at the MOVED " |
82 | | "part of the reply and ignores the rest. For now, if this flag is true, we will only " |
83 | | "attempt to read from leaders, so redis_allow_reads_from_followers will be ignored."); |
84 | | |
85 | | DEFINE_bool(detect_duplicates_for_retryable_requests, true, |
86 | | "Enable tracking of write requests that prevents the same write from being applied " |
87 | | "twice."); |
88 | | |
89 | | DEFINE_bool(ysql_forward_rpcs_to_local_tserver, false, |
90 | | "When true, forward the PGSQL rpcs to the local tServer."); |
91 | | |
92 | | DEFINE_CAPABILITY(PickReadTimeAtTabletServer, 0x8284d67b); |
93 | | |
94 | | DECLARE_bool(collect_end_to_end_traces); |
95 | | |
96 | | using namespace std::placeholders; |
97 | | |
98 | | namespace yb { |
99 | | |
100 | | using std::shared_ptr; |
101 | | using rpc::ErrorStatusPB; |
102 | | using rpc::Messenger; |
103 | | using rpc::Rpc; |
104 | | using rpc::RpcController; |
105 | | using tserver::WriteRequestPB; |
106 | | using tserver::WriteResponsePB; |
107 | | using tserver::WriteResponsePB_PerRowErrorPB; |
108 | | using strings::Substitute; |
109 | | |
110 | | namespace client { |
111 | | |
112 | | namespace internal { |
113 | | |
114 | 5.99M | bool IsTracingEnabled() { |
115 | 5.99M | return FLAGS_collect_end_to_end_traces; |
116 | 5.99M | } |
117 | | |
118 | | namespace { |
119 | | |
120 | 5.99M | bool LocalTabletServerOnly(const InFlightOps& ops) { |
121 | 5.99M | const auto op_type = ops.front().yb_op->type(); |
122 | 5.99M | return ((op_type == YBOperation::Type::REDIS_READ || op_type == YBOperation::Type::REDIS_WRITE) && |
123 | 104k | !FLAGS_forward_redis_requests); |
124 | 5.99M | } |
125 | | |
126 | | } |
127 | | |
128 | | AsyncRpcMetrics::AsyncRpcMetrics(const scoped_refptr<yb::MetricEntity>& entity) |
129 | | : remote_write_rpc_time(METRIC_handler_latency_yb_client_write_remote.Instantiate(entity)), |
130 | | remote_read_rpc_time(METRIC_handler_latency_yb_client_read_remote.Instantiate(entity)), |
131 | | local_write_rpc_time(METRIC_handler_latency_yb_client_write_local.Instantiate(entity)), |
132 | | local_read_rpc_time(METRIC_handler_latency_yb_client_read_local.Instantiate(entity)), |
133 | | time_to_send(METRIC_handler_latency_yb_client_time_to_send.Instantiate(entity)), |
134 | | consistent_prefix_successful_reads( |
135 | | METRIC_consistent_prefix_successful_reads.Instantiate(entity)), |
136 | 123k | consistent_prefix_failed_reads(METRIC_consistent_prefix_failed_reads.Instantiate(entity)) { |
137 | 123k | } |
138 | | |
139 | | AsyncRpc::AsyncRpc( |
140 | | const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level) |
141 | | : Rpc(data.batcher->deadline(), data.batcher->messenger(), &data.batcher->proxy_cache()), |
142 | | batcher_(data.batcher), |
143 | | ops_(data.ops), |
144 | | tablet_invoker_(LocalTabletServerOnly(ops_), |
145 | | yb_consistency_level == YBConsistencyLevel::CONSISTENT_PREFIX, |
146 | | data.batcher->client_, |
147 | | this, |
148 | | this, |
149 | | data.tablet, |
150 | | table(), |
151 | | mutable_retrier(), |
152 | | trace_.get()), |
153 | | start_(CoarseMonoClock::Now()), |
154 | 6.01M | async_rpc_metrics_(data.batcher->async_rpc_metrics()) { |
155 | 6.01M | mutable_retrier()->mutable_controller()->set_allow_local_calls_in_curr_thread( |
156 | 6.01M | data.allow_local_calls_in_curr_thread); |
157 | 6.01M | } |
158 | | |
159 | 6.00M | AsyncRpc::~AsyncRpc() { |
160 | 6.00M | if (trace_->must_print()) { |
161 | 0 | LOG(INFO) << ToString() << " took " << ToMicroseconds(CoarseMonoClock::Now() - start_) |
162 | 0 | << "us. Trace:\n" << trace_->DumpToString(true); |
163 | 6.00M | } else { |
164 | 6.00M | const auto print_trace_every_n = GetAtomicFlag(&FLAGS_ybclient_print_trace_every_n); |
165 | 6.00M | if (print_trace_every_n > 0) { |
166 | 0 | YB_LOG_EVERY_N(INFO, print_trace_every_n) |
167 | 0 | << ToString() << " took " |
168 | 0 | << ToMicroseconds(CoarseMonoClock::Now() - start_) |
169 | 0 | << "us. Trace:\n" << trace_->DumpToString(true); |
170 | 0 | } |
171 | 6.00M | } |
172 | 6.00M | } |
173 | | |
174 | 6.02M | void AsyncRpc::SendRpc() { |
175 | 6.02M | TRACE_TO(trace_, "SendRpc() called."); |
176 | | |
177 | 6.02M | retained_self_ = shared_from_this(); |
178 | | // For now, if this is a retry, execute this rpc on the leader even if |
179 | | // the consistency level is YBConsistencyLevel::CONSISTENT_PREFIX or |
180 | | // FLAGS_redis_allow_reads_from_followers is set to true. |
181 | | // TODO(hector): Temporarily blacklist the follower that couldn't serve the read so we can retry |
182 | | // on another follower. |
183 | 6.02M | if (async_rpc_metrics_ && num_attempts() > 1 && tablet_invoker_.is_consistent_prefix()) { |
184 | 2.23k | IncrementCounter(async_rpc_metrics_->consistent_prefix_failed_reads); |
185 | 2.23k | } |
186 | 6.02M | tablet_invoker_.Execute(std::string(), num_attempts() > 1); |
187 | 6.02M | } |
188 | | |
189 | 124k | std::string AsyncRpc::ToString() const { |
190 | 124k | const auto& transaction = batcher_->in_flight_ops().metadata.transaction; |
191 | 124k | const auto subtransaction_opt = batcher_->in_flight_ops().metadata.subtransaction; |
192 | 124k | return Format("$0(tablet: $1, num_ops: $2, num_attempts: $3, txn: $4, subtxn: $5)", |
193 | 81.1k | ops_.front().yb_op->read_only() ? "Read" : "Write", |
194 | 124k | tablet().tablet_id(), ops_.size(), num_attempts(), |
195 | 124k | transaction.transaction_id, |
196 | 124k | subtransaction_opt |
197 | 23.6k | ? Format("$0", subtransaction_opt->subtransaction_id) |
198 | 101k | : "[none]"); |
199 | 124k | } |
200 | | |
201 | 18.0M | std::shared_ptr<const YBTable> AsyncRpc::table() const { |
202 | | // All of the ops for a given tablet obviously correspond to the same table, |
203 | | // so we'll just grab the table from the first. |
204 | 18.0M | return ops_[0].yb_op->table(); |
205 | 18.0M | } |
206 | | |
207 | 6.01M | void AsyncRpc::Finished(const Status& status) { |
208 | 6.01M | Status new_status = status; |
209 | 6.01M | if (tablet_invoker_.Done(&new_status)) { |
210 | 5.98M | if (tablet().is_split() || |
211 | 6.00M | ClientError(new_status) == ClientErrorCode::kTablePartitionListIsStale) { |
212 | 31 | ops_[0].yb_op->MarkTablePartitionListAsStale(); |
213 | 31 | } |
214 | 5.98M | if (async_rpc_metrics_ && status.ok() && tablet_invoker_.is_consistent_prefix()) { |
215 | 11.8k | IncrementCounter(async_rpc_metrics_->consistent_prefix_successful_reads); |
216 | 11.8k | } |
217 | 5.98M | ProcessResponseFromTserver(new_status); |
218 | 5.98M | batcher_->Flushed(ops_, new_status, MakeFlushExtraResult()); |
219 | 5.98M | retained_self_.reset(); |
220 | 5.98M | } |
221 | 6.01M | } |
222 | | |
223 | 124k | void AsyncRpc::Failed(const Status& status) { |
224 | 124k | std::string error_message = status.message().ToBuffer(); |
225 | 124k | auto redis_error_code = status.IsInvalidCommand() || status.IsInvalidArgument() ? |
226 | 124k | RedisResponsePB_RedisStatusCode_PARSING_ERROR : RedisResponsePB_RedisStatusCode_SERVER_ERROR; |
227 | 494k | for (auto& op : ops_) { |
228 | 494k | YBOperation* yb_op = op.yb_op.get(); |
229 | 494k | switch (yb_op->type()) { |
230 | 84 | case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED; |
231 | 89 | case YBOperation::Type::REDIS_WRITE: { |
232 | 89 | RedisResponsePB* resp; |
233 | 89 | if (yb_op->type() == YBOperation::Type::REDIS_READ) { |
234 | 84 | resp = down_cast<YBRedisReadOp*>(yb_op)->mutable_response(); |
235 | 5 | } else { |
236 | 5 | resp = down_cast<YBRedisWriteOp*>(yb_op)->mutable_response(); |
237 | 5 | } |
238 | 89 | resp->Clear(); |
239 | | // If the tserver replied it is not the leader, respond that the key has moved. We do not |
240 | | // need to return the address of the new leader because the Redis client will refresh the |
241 | | // cluster map instead. |
242 | 89 | if (status.IsIllegalState()) { |
243 | 5 | resp->set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR); |
244 | 5 | resp->set_error_message(Substitute("MOVED $0 0.0.0.0:0", |
245 | 5 | down_cast<YBRedisOp*>(yb_op)->hash_code())); |
246 | 84 | } else { |
247 | 84 | resp->set_code(redis_error_code); |
248 | 84 | resp->set_error_message(error_message); |
249 | 84 | } |
250 | 89 | break; |
251 | 84 | } |
252 | 168 | case YBOperation::Type::QL_READ: FALLTHROUGH_INTENDED; |
253 | 72.7k | case YBOperation::Type::QL_WRITE: { |
254 | 72.7k | QLResponsePB* resp = down_cast<YBqlOp*>(yb_op)->mutable_response(); |
255 | 72.7k | resp->Clear(); |
256 | 62.5k | resp->set_status(status.IsTryAgain() ? QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR |
257 | 10.1k | : QLResponsePB::YQL_STATUS_RUNTIME_ERROR); |
258 | 72.7k | resp->set_error_message(error_message); |
259 | 72.7k | break; |
260 | 168 | } |
261 | 398k | case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED; |
262 | 421k | case YBOperation::Type::PGSQL_WRITE: { |
263 | 421k | PgsqlResponsePB* resp = down_cast<YBPgsqlOp*>(yb_op)->mutable_response(); |
264 | 421k | resp->set_status(status.IsTryAgain() ? PgsqlResponsePB::PGSQL_STATUS_RESTART_REQUIRED_ERROR |
265 | 18.4E | : PgsqlResponsePB::PGSQL_STATUS_RUNTIME_ERROR); |
266 | 421k | resp->set_error_message(error_message); |
267 | 421k | const uint8_t* pg_err_ptr = status.ErrorData(PgsqlErrorTag::kCategory); |
268 | 421k | if (pg_err_ptr != nullptr) { |
269 | 151k | resp->set_pg_error_code(static_cast<uint32_t>(PgsqlErrorTag::Decode(pg_err_ptr))); |
270 | 270k | } else { |
271 | 270k | resp->set_pg_error_code(static_cast<uint32_t>(YBPgErrorCode::YB_PG_INTERNAL_ERROR)); |
272 | 270k | } |
273 | 421k | const uint8_t* txn_err_ptr = status.ErrorData(TransactionErrorTag::kCategory); |
274 | 422k | if (txn_err_ptr != nullptr) { |
275 | 422k | resp->set_txn_error_code(static_cast<uint16_t>(TransactionErrorTag::Decode(txn_err_ptr))); |
276 | 18.4E | } else { |
277 | 18.4E | resp->set_txn_error_code(static_cast<uint16_t>(TransactionErrorCode::kNone)); |
278 | 18.4E | } |
279 | 421k | break; |
280 | 398k | } |
281 | 0 | default: |
282 | 0 | LOG(FATAL) << "Unsupported operation " << yb_op->type(); |
283 | 0 | break; |
284 | 494k | } |
285 | 494k | } |
286 | 124k | } |
287 | | |
288 | 5.70M | bool AsyncRpc::IsLocalCall() const { |
289 | 5.70M | return tablet_invoker_.IsLocalCall(); |
290 | 5.70M | } |
291 | | |
292 | | namespace { |
293 | | |
294 | | template<class T> |
295 | | void SetMetadata(const InFlightOpsTransactionMetadata& metadata, |
296 | | bool need_full_metadata, |
297 | 781k | T* dest) { |
298 | 781k | auto* transaction = dest->mutable_transaction(); |
299 | 781k | if (need_full_metadata) { |
300 | 553k | metadata.transaction.ToPB(transaction); |
301 | 228k | } else { |
302 | 228k | metadata.transaction.TransactionIdToPB(transaction); |
303 | 228k | } |
304 | 781k | dest->set_deprecated_may_have_metadata(true); |
305 | | |
306 | 781k | if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()) { |
307 | 63.4k | metadata.subtransaction->ToPB(dest->mutable_subtransaction()); |
308 | 63.4k | } |
309 | 781k | } async_rpc.cc:_ZN2yb6client8internal12_GLOBAL__N_111SetMetadataINS_5docdb20KeyValueWriteBatchPBEEEvRKNS1_30InFlightOpsTransactionMetadataEbPT_ Line | Count | Source | 297 | 414k | T* dest) { | 298 | 414k | auto* transaction = dest->mutable_transaction(); | 299 | 414k | if (need_full_metadata) { | 300 | 315k | metadata.transaction.ToPB(transaction); | 301 | 99.1k | } else { | 302 | 99.1k | metadata.transaction.TransactionIdToPB(transaction); | 303 | 99.1k | } | 304 | 414k | dest->set_deprecated_may_have_metadata(true); | 305 | | | 306 | 414k | if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()) { | 307 | 18.9k | metadata.subtransaction->ToPB(dest->mutable_subtransaction()); | 308 | 18.9k | } | 309 | 414k | } |
async_rpc.cc:_ZN2yb6client8internal12_GLOBAL__N_111SetMetadataINS_7tserver13ReadRequestPBEEEvRKNS1_30InFlightOpsTransactionMetadataEbPT_ Line | Count | Source | 297 | 367k | T* dest) { | 298 | 367k | auto* transaction = dest->mutable_transaction(); | 299 | 367k | if (need_full_metadata) { | 300 | 238k | metadata.transaction.ToPB(transaction); | 301 | 129k | } else { | 302 | 129k | metadata.transaction.TransactionIdToPB(transaction); | 303 | 129k | } | 304 | 367k | dest->set_deprecated_may_have_metadata(true); | 305 | | | 306 | 367k | if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()) { | 307 | 44.4k | metadata.subtransaction->ToPB(dest->mutable_subtransaction()); | 308 | 44.4k | } | 309 | 367k | } |
|
310 | | |
311 | | void SetMetadata(const InFlightOpsTransactionMetadata& metadata, |
312 | | bool need_full_metadata, |
313 | 413k | tserver::WriteRequestPB* req) { |
314 | 413k | SetMetadata(metadata, need_full_metadata, req->mutable_write_batch()); |
315 | 413k | } |
316 | | |
317 | | } // namespace |
318 | | |
319 | 6.02M | void AsyncRpc::SendRpcToTserver(int attempt_num) { |
320 | 6.02M | if (async_rpc_metrics_) { |
321 | 5.97M | async_rpc_metrics_->time_to_send->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_)); |
322 | 5.97M | } |
323 | 6.02M | CallRemoteMethod(); |
324 | 6.02M | } |
325 | | |
326 | | template <class Req, class Resp> |
327 | | AsyncRpcBase<Req, Resp>::AsyncRpcBase( |
328 | | const AsyncRpcData& data, YBConsistencyLevel consistency_level) |
329 | 6.01M | : AsyncRpc(data, consistency_level) { |
330 | 6.01M | req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id())); |
331 | 6.01M | req_.set_include_trace(IsTracingEnabled()); |
332 | 6.01M | const ConsistentReadPoint* read_point = batcher_->read_point(); |
333 | 6.01M | bool has_read_time = false; |
334 | 6.01M | if (read_point) { |
335 | 5.85M | req_.set_propagated_hybrid_time(read_point->Now().ToUint64()); |
336 | | // Set read time for consistent read only if the table is transaction-enabled and |
337 | | // consistent read is required. |
338 | 5.85M | if (data.need_consistent_read && |
339 | 1.39M | table()->InternalSchema().table_properties().is_transactional()) { |
340 | 1.20M | auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id()); |
341 | 1.20M | if (read_time) { |
342 | 936k | has_read_time = true; |
343 | 936k | read_time.AddToPB(&req_); |
344 | 936k | } |
345 | 1.20M | } |
346 | 5.85M | } |
347 | 6.01M | if (!ops_.empty()) { |
348 | 6.01M | req_.set_batch_idx(ops_.front().batch_idx); |
349 | 6.01M | } |
350 | 6.01M | const auto& metadata = batcher_->in_flight_ops().metadata; |
351 | 6.01M | if (!metadata.transaction.transaction_id.IsNil()) { |
352 | 781k | SetMetadata(metadata, data.need_metadata, &req_); |
353 | 781k | bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION; |
354 | 18.4E | LOG_IF(DFATAL, has_read_time && serializable) |
355 | 18.4E | << "Read time should NOT be specified for serializable isolation: " |
356 | 18.4E | << read_point->GetReadTime().ToString(); |
357 | 781k | } |
358 | 6.01M | } _ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEEC2ERKNS1_12AsyncRpcDataENS_18YBConsistencyLevelE Line | Count | Source | 329 | 1.32M | : AsyncRpc(data, consistency_level) { | 330 | 1.32M | req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id())); | 331 | 1.32M | req_.set_include_trace(IsTracingEnabled()); | 332 | 1.32M | const ConsistentReadPoint* read_point = batcher_->read_point(); | 333 | 1.32M | bool has_read_time = false; | 334 | 1.32M | if (read_point) { | 335 | 1.23M | req_.set_propagated_hybrid_time(read_point->Now().ToUint64()); | 336 | | // Set read time for consistent read only if the table is transaction-enabled and | 337 | | // consistent read is required. | 338 | 1.23M | if (data.need_consistent_read && | 339 | 472k | table()->InternalSchema().table_properties().is_transactional()) { | 340 | 470k | auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id()); | 341 | 470k | if (read_time) { | 342 | 356k | has_read_time = true; | 343 | 356k | read_time.AddToPB(&req_); | 344 | 356k | } | 345 | 470k | } | 346 | 1.23M | } | 347 | 1.32M | if (!ops_.empty()) { | 348 | 1.32M | req_.set_batch_idx(ops_.front().batch_idx); | 349 | 1.32M | } | 350 | 1.32M | const auto& metadata = batcher_->in_flight_ops().metadata; | 351 | 1.32M | if (!metadata.transaction.transaction_id.IsNil()) { | 352 | 413k | SetMetadata(metadata, data.need_metadata, &req_); | 353 | 413k | bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION; | 354 | 18.4E | LOG_IF(DFATAL, has_read_time && serializable) | 355 | 18.4E | << "Read time should NOT be specified for serializable isolation: " | 356 | 18.4E | << read_point->GetReadTime().ToString(); | 357 | 413k | } | 358 | 1.32M | } |
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEEC2ERKNS1_12AsyncRpcDataENS_18YBConsistencyLevelE Line | Count | Source | 329 | 4.68M | : AsyncRpc(data, consistency_level) { | 330 | 4.68M | req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id())); | 331 | 4.68M | req_.set_include_trace(IsTracingEnabled()); | 332 | 4.68M | const ConsistentReadPoint* read_point = batcher_->read_point(); | 333 | 4.68M | bool has_read_time = false; | 334 | 4.68M | if (read_point) { | 335 | 4.62M | req_.set_propagated_hybrid_time(read_point->Now().ToUint64()); | 336 | | // Set read time for consistent read only if the table is transaction-enabled and | 337 | | // consistent read is required. | 338 | 4.62M | if (data.need_consistent_read && | 339 | 918k | table()->InternalSchema().table_properties().is_transactional()) { | 340 | 731k | auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id()); | 341 | 731k | if (read_time) { | 342 | 579k | has_read_time = true; | 343 | 579k | read_time.AddToPB(&req_); | 344 | 579k | } | 345 | 731k | } | 346 | 4.62M | } | 347 | 4.69M | if (!ops_.empty()) { | 348 | 4.69M | req_.set_batch_idx(ops_.front().batch_idx); | 349 | 4.69M | } | 350 | 4.68M | const auto& metadata = batcher_->in_flight_ops().metadata; | 351 | 4.68M | if (!metadata.transaction.transaction_id.IsNil()) { | 352 | 367k | SetMetadata(metadata, data.need_metadata, &req_); | 353 | 367k | bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION; | 354 | 18.4E | LOG_IF(DFATAL, has_read_time && serializable) | 355 | 18.4E | << "Read time should NOT be specified for serializable isolation: " | 356 | 18.4E | << read_point->GetReadTime().ToString(); | 357 | 367k | } | 358 | 4.68M | } |
|
359 | | |
360 | | template <class Req, class Resp> |
361 | 5.99M | AsyncRpcBase<Req, Resp>::~AsyncRpcBase() { |
362 | 5.99M | req_.release_tablet_id(); |
363 | 5.99M | } _ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEED2Ev Line | Count | Source | 361 | 1.32M | AsyncRpcBase<Req, Resp>::~AsyncRpcBase() { | 362 | 1.32M | req_.release_tablet_id(); | 363 | 1.32M | } |
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEED2Ev Line | Count | Source | 361 | 4.67M | AsyncRpcBase<Req, Resp>::~AsyncRpcBase() { | 362 | 4.67M | req_.release_tablet_id(); | 363 | 4.67M | } |
|
364 | | |
365 | | template <class Req, class Resp> |
366 | 6.00M | bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) { |
367 | 6.00M | if (!status.ok()) { |
368 | 124k | return false; |
369 | 124k | } |
370 | 5.88M | if (resp_.has_error()) { |
371 | 0 | LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString() |
372 | 0 | << ". Requests not processed."; |
373 | | // If there is an error at the Rpc itself, there should be no individual responses. |
374 | | // All of them need to be marked as failed. |
375 | 0 | Failed(StatusFromPB(resp_.error().status())); |
376 | 0 | return false; |
377 | 0 | } |
378 | 5.88M | auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_); |
379 | 5.88M | if (restart_read_time) { |
380 | 42 | auto read_point = batcher_->read_point(); |
381 | 42 | if (read_point) { |
382 | 42 | read_point->RestartRequired(req_.tablet_id(), restart_read_time); |
383 | 42 | } |
384 | 42 | Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(), |
385 | 42 | TransactionError(TransactionErrorCode::kReadRestartRequired))); |
386 | 42 | return false; |
387 | 42 | } |
388 | 5.88M | auto local_limit_ht = resp_.local_limit_ht(); |
389 | 5.88M | if (local_limit_ht) { |
390 | 575k | auto read_point = batcher_->read_point(); |
391 | 575k | if (read_point) { |
392 | 574k | read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht)); |
393 | 574k | } |
394 | 575k | } |
395 | 5.88M | return true; |
396 | 5.88M | } _ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE19CommonResponseCheckERKNS_6StatusE Line | Count | Source | 366 | 1.32M | bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) { | 367 | 1.32M | if (!status.ok()) { | 368 | 80.6k | return false; | 369 | 80.6k | } | 370 | 1.24M | if (resp_.has_error()) { | 371 | 0 | LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString() | 372 | 0 | << ". Requests not processed."; | 373 | | // If there is an error at the Rpc itself, there should be no individual responses. | 374 | | // All of them need to be marked as failed. | 375 | 0 | Failed(StatusFromPB(resp_.error().status())); | 376 | 0 | return false; | 377 | 0 | } | 378 | 1.24M | auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_); | 379 | 1.24M | if (restart_read_time) { | 380 | 0 | auto read_point = batcher_->read_point(); | 381 | 0 | if (read_point) { | 382 | 0 | read_point->RestartRequired(req_.tablet_id(), restart_read_time); | 383 | 0 | } | 384 | 0 | Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(), | 385 | 0 | TransactionError(TransactionErrorCode::kReadRestartRequired))); | 386 | 0 | return false; | 387 | 0 | } | 388 | 1.24M | auto local_limit_ht = resp_.local_limit_ht(); | 389 | 1.24M | if (local_limit_ht) { | 390 | 0 | auto read_point = batcher_->read_point(); | 391 | 0 | if (read_point) { | 392 | 0 | read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht)); | 393 | 0 | } | 394 | 0 | } | 395 | 1.24M | return true; | 396 | 1.24M | } |
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE19CommonResponseCheckERKNS_6StatusE Line | Count | Source | 366 | 4.68M | bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) { | 367 | 4.68M | if (!status.ok()) { | 368 | 43.5k | return false; | 369 | 43.5k | } | 370 | 4.64M | if (resp_.has_error()) { | 371 | 0 | LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString() | 372 | 0 | << ". Requests not processed."; | 373 | | // If there is an error at the Rpc itself, there should be no individual responses. | 374 | | // All of them need to be marked as failed. | 375 | 0 | Failed(StatusFromPB(resp_.error().status())); | 376 | 0 | return false; | 377 | 0 | } | 378 | 4.64M | auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_); | 379 | 4.64M | if (restart_read_time) { | 380 | 42 | auto read_point = batcher_->read_point(); | 381 | 42 | if (read_point) { | 382 | 42 | read_point->RestartRequired(req_.tablet_id(), restart_read_time); | 383 | 42 | } | 384 | 42 | Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(), | 385 | 42 | TransactionError(TransactionErrorCode::kReadRestartRequired))); | 386 | 42 | return false; | 387 | 42 | } | 388 | 4.64M | auto local_limit_ht = resp_.local_limit_ht(); | 389 | 4.64M | if (local_limit_ht) { | 390 | 575k | auto read_point = batcher_->read_point(); | 391 | 575k | if (read_point) { | 392 | 574k | read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht)); | 393 | 574k | } | 394 | 575k | } | 395 | 4.64M | return true; | 396 | 4.64M | } |
|
397 | | |
398 | | template <class Req, class Resp> |
399 | 6.03M | void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) { |
400 | 6.03M | if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) { |
401 | 24.9k | ConsistentReadPoint* read_point = batcher_->read_point(); |
402 | 24.9k | if (read_point && !read_point->GetReadTime()) { |
403 | 0 | auto txn = batcher_->transaction(); |
404 | | // If txn is not set, this is a consistent scan across multiple tablets of a |
405 | | // non-transactional YCQL table. |
406 | 0 | if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) { |
407 | 0 | read_point->SetCurrentReadTime(); |
408 | 0 | read_point->GetReadTime().AddToPB(&req_); |
409 | 0 | } |
410 | 0 | } |
411 | 24.9k | } |
412 | | |
413 | 6.03M | req_.set_rejection_score(batcher_->RejectionScore(attempt_num)); |
414 | 6.03M | AsyncRpc::SendRpcToTserver(attempt_num); |
415 | 6.03M | } _ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE16SendRpcToTserverEi Line | Count | Source | 399 | 1.32M | void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) { | 400 | 1.32M | if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) { | 401 | 10.3k | ConsistentReadPoint* read_point = batcher_->read_point(); | 402 | 10.3k | if (read_point && !read_point->GetReadTime()) { | 403 | 0 | auto txn = batcher_->transaction(); | 404 | | // If txn is not set, this is a consistent scan across multiple tablets of a | 405 | | // non-transactional YCQL table. | 406 | 0 | if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) { | 407 | 0 | read_point->SetCurrentReadTime(); | 408 | 0 | read_point->GetReadTime().AddToPB(&req_); | 409 | 0 | } | 410 | 0 | } | 411 | 10.3k | } | 412 | | | 413 | 1.32M | req_.set_rejection_score(batcher_->RejectionScore(attempt_num)); | 414 | 1.32M | AsyncRpc::SendRpcToTserver(attempt_num); | 415 | 1.32M | } |
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE16SendRpcToTserverEi Line | Count | Source | 399 | 4.70M | void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) { | 400 | 4.70M | if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) { | 401 | 14.5k | ConsistentReadPoint* read_point = batcher_->read_point(); | 402 | 14.5k | if (read_point && !read_point->GetReadTime()) { | 403 | 0 | auto txn = batcher_->transaction(); | 404 | | // If txn is not set, this is a consistent scan across multiple tablets of a | 405 | | // non-transactional YCQL table. | 406 | 0 | if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) { | 407 | 0 | read_point->SetCurrentReadTime(); | 408 | 0 | read_point->GetReadTime().AddToPB(&req_); | 409 | 0 | } | 410 | 0 | } | 411 | 14.5k | } | 412 | | | 413 | 4.70M | req_.set_rejection_score(batcher_->RejectionScore(attempt_num)); | 414 | 4.70M | AsyncRpc::SendRpcToTserver(attempt_num); | 415 | 4.70M | } |
|
416 | | |
417 | | template <class Req, class Resp> |
418 | 6.00M | void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) { |
419 | 6.00M | TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false)); |
420 | 6.00M | if (resp_.has_trace_buffer()) { |
421 | 0 | TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer()); |
422 | 0 | } |
423 | 6.00M | NotifyBatcher(status); |
424 | 6.00M | if (!CommonResponseCheck(status)) { |
425 | 124k | return; |
426 | 124k | } |
427 | 5.88M | SwapResponses(); |
428 | 5.88M | } _ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE26ProcessResponseFromTserverERKNS_6StatusE Line | Count | Source | 418 | 1.32M | void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) { | 419 | 1.32M | TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false)); | 420 | 1.32M | if (resp_.has_trace_buffer()) { | 421 | 0 | TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer()); | 422 | 0 | } | 423 | 1.32M | NotifyBatcher(status); | 424 | 1.32M | if (!CommonResponseCheck(status)) { | 425 | 80.6k | return; | 426 | 80.6k | } | 427 | 1.24M | SwapResponses(); | 428 | 1.24M | } |
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE26ProcessResponseFromTserverERKNS_6StatusE Line | Count | Source | 418 | 4.68M | void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) { | 419 | 4.68M | TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false)); | 420 | 4.68M | if (resp_.has_trace_buffer()) { | 421 | 0 | TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer()); | 422 | 0 | } | 423 | 4.68M | NotifyBatcher(status); | 424 | 4.68M | if (!CommonResponseCheck(status)) { | 425 | 43.5k | return; | 426 | 43.5k | } | 427 | 4.63M | SwapResponses(); | 428 | 4.63M | } |
|
429 | | |
430 | | |
431 | | template <class Req, class Resp> |
432 | 6.00M | FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() { |
433 | 6.00M | return {GetPropagatedHybridTime(resp_), |
434 | 4.15M | resp_.has_used_read_time() ? ReadHybridTime::FromPB(resp_.used_read_time()) |
435 | 1.84M | : ReadHybridTime()}; |
436 | 6.00M | } _ZN2yb6client8internal12AsyncRpcBaseINS_7tserver14WriteRequestPBENS3_15WriteResponsePBEE20MakeFlushExtraResultEv Line | Count | Source | 432 | 1.32M | FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() { | 433 | 1.32M | return {GetPropagatedHybridTime(resp_), | 434 | 110k | resp_.has_used_read_time() ? ReadHybridTime::FromPB(resp_.used_read_time()) | 435 | 1.21M | : ReadHybridTime()}; | 436 | 1.32M | } |
_ZN2yb6client8internal12AsyncRpcBaseINS_7tserver13ReadRequestPBENS3_14ReadResponsePBEE20MakeFlushExtraResultEv Line | Count | Source | 432 | 4.68M | FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() { | 433 | 4.68M | return {GetPropagatedHybridTime(resp_), | 434 | 4.04M | resp_.has_used_read_time() ? ReadHybridTime::FromPB(resp_.used_read_time()) | 435 | 635k | : ReadHybridTime()}; | 436 | 4.68M | } |
|
437 | | |
438 | | template <class Op, class Req> |
439 | 2.00M | void HandleExtraFields(Op* op, Req* req) { |
440 | 2.00M | } _ZN2yb6client8internal17HandleExtraFieldsINS0_14YBRedisWriteOpENS_7tserver14WriteRequestPBEEEvPT_PT0_ Line | Count | Source | 439 | 61.5k | void HandleExtraFields(Op* op, Req* req) { | 440 | 61.5k | } |
_ZN2yb6client8internal17HandleExtraFieldsINS0_13YBRedisReadOpENS_7tserver13ReadRequestPBEEEvPT_PT0_ Line | Count | Source | 439 | 43.0k | void HandleExtraFields(Op* op, Req* req) { | 440 | 43.0k | } |
_ZN2yb6client8internal17HandleExtraFieldsINS0_13YBPgsqlReadOpENS_7tserver13ReadRequestPBEEEvPT_PT0_ Line | Count | Source | 439 | 1.89M | void HandleExtraFields(Op* op, Req* req) { | 440 | 1.89M | } |
|
441 | | |
442 | 3.02M | void HandleExtraFields(YBqlWriteOp* op, tserver::WriteRequestPB* req) { |
443 | 3.02M | if (op->write_time_for_backfill()) { |
444 | 9.54k | req->set_external_hybrid_time(op->write_time_for_backfill().ToUint64()); |
445 | 9.54k | ReadHybridTime::SingleTime(op->write_time_for_backfill()).ToPB(req->mutable_read_time()); |
446 | 9.54k | } |
447 | 3.02M | } |
448 | | |
449 | 2.12M | void HandleExtraFields(YBPgsqlWriteOp* op, tserver::WriteRequestPB* req) { |
450 | 2.12M | if (op->write_time()) { |
451 | 226 | req->set_external_hybrid_time(op->write_time().ToUint64()); |
452 | 226 | } |
453 | 2.12M | } |
454 | | |
455 | 3.93M | void HandleExtraFields(YBqlReadOp* op, tserver::ReadRequestPB* req) { |
456 | 3.93M | if (op->read_time()) { |
457 | 479 | op->read_time().AddToPB(req); |
458 | 479 | } |
459 | 3.93M | } |
460 | | |
461 | | template <class OpType, class Req, class Out> |
462 | | void FillOps( |
463 | 6.00M | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { |
464 | 6.00M | out->Reserve(narrow_cast<int>(ops.size())); |
465 | 6.00M | size_t idx = 0; |
466 | 11.0M | for (auto& op : ops) { |
467 | 11.0M | CHECK_EQ(op.yb_op->type(), expected_type); |
468 | 11.0M | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); |
469 | 11.0M | out->AddAllocated(concrete_op->mutable_request()); |
470 | 11.0M | HandleExtraFields(concrete_op, req); |
471 | 7.24k | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString(); |
472 | 11.0M | } |
473 | 6.00M | } _ZN2yb6client8internal7FillOpsINS0_14YBRedisWriteOpENS_7tserver14WriteRequestPBEN6google8protobuf16RepeatedPtrFieldINS_19RedisWriteRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_ Line | Count | Source | 463 | 61.5k | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 61.5k | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 61.5k | size_t idx = 0; | 466 | 61.5k | for (auto& op : ops) { | 467 | 61.5k | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 61.5k | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 61.5k | out->AddAllocated(concrete_op->mutable_request()); | 470 | 61.5k | HandleExtraFields(concrete_op, req); | 471 | 0 | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString(); | 472 | 61.5k | } | 473 | 61.5k | } |
_ZN2yb6client8internal7FillOpsINS0_11YBqlWriteOpENS_7tserver14WriteRequestPBEN6google8protobuf16RepeatedPtrFieldINS_16QLWriteRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_ Line | Count | Source | 463 | 1.00M | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 1.00M | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 1.00M | size_t idx = 0; | 466 | 3.02M | for (auto& op : ops) { | 467 | 3.02M | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 3.02M | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 3.02M | out->AddAllocated(concrete_op->mutable_request()); | 470 | 3.02M | HandleExtraFields(concrete_op, req); | 471 | 489 | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString(); | 472 | 3.02M | } | 473 | 1.00M | } |
_ZN2yb6client8internal7FillOpsINS0_14YBPgsqlWriteOpENS_7tserver14WriteRequestPBEN6google8protobuf16RepeatedPtrFieldINS_19PgsqlWriteRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_ Line | Count | Source | 463 | 261k | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 261k | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 261k | size_t idx = 0; | 466 | 2.12M | for (auto& op : ops) { | 467 | 2.12M | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 2.12M | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 2.12M | out->AddAllocated(concrete_op->mutable_request()); | 470 | 2.12M | HandleExtraFields(concrete_op, req); | 471 | 5 | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString(); | 472 | 2.12M | } | 473 | 261k | } |
_ZN2yb6client8internal7FillOpsINS0_13YBRedisReadOpENS_7tserver13ReadRequestPBEN6google8protobuf16RepeatedPtrFieldINS_18RedisReadRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_ Line | Count | Source | 463 | 43.0k | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 43.0k | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 43.0k | size_t idx = 0; | 466 | 43.0k | for (auto& op : ops) { | 467 | 43.0k | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 43.0k | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 43.0k | out->AddAllocated(concrete_op->mutable_request()); | 470 | 43.0k | HandleExtraFields(concrete_op, req); | 471 | 0 | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString(); | 472 | 43.0k | } | 473 | 43.0k | } |
_ZN2yb6client8internal7FillOpsINS0_10YBqlReadOpENS_7tserver13ReadRequestPBEN6google8protobuf16RepeatedPtrFieldINS_15QLReadRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_ Line | Count | Source | 463 | 3.93M | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 3.93M | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 3.93M | size_t idx = 0; | 466 | 3.93M | for (auto& op : ops) { | 467 | 3.93M | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 3.93M | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 3.93M | out->AddAllocated(concrete_op->mutable_request()); | 470 | 3.93M | HandleExtraFields(concrete_op, req); | 471 | 6.77k | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString(); | 472 | 3.93M | } | 473 | 3.93M | } |
_ZN2yb6client8internal7FillOpsINS0_13YBPgsqlReadOpENS_7tserver13ReadRequestPBEN6google8protobuf16RepeatedPtrFieldINS_18PgsqlReadRequestPBEEEEEvRKN5boost14iterator_rangeINSt3__111__wrap_iterIPNS1_10InFlightOpEEEEENS0_11YBOperation4TypeEPT0_PT1_ Line | Count | Source | 463 | 708k | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 708k | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 708k | size_t idx = 0; | 466 | 1.89M | for (auto& op : ops) { | 467 | 1.89M | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 1.89M | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 1.89M | out->AddAllocated(concrete_op->mutable_request()); | 470 | 1.89M | HandleExtraFields(concrete_op, req); | 471 | 18.4E | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString(); | 472 | 1.89M | } | 473 | 708k | } |
|
474 | | |
475 | | template <class Repeated> |
476 | 18.0M | void ReleaseOps(Repeated* repeated) { |
477 | 18.0M | auto size = repeated->size(); |
478 | 18.0M | if (size) { |
479 | 5.99M | repeated->ExtractSubrange(0, size, nullptr); |
480 | 5.99M | } |
481 | 18.0M | } _ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_19RedisWriteRequestPBEEEEEvPT_ Line | Count | Source | 476 | 1.32M | void ReleaseOps(Repeated* repeated) { | 477 | 1.32M | auto size = repeated->size(); | 478 | 1.32M | if (size) { | 479 | 61.5k | repeated->ExtractSubrange(0, size, nullptr); | 480 | 61.5k | } | 481 | 1.32M | } |
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_16QLWriteRequestPBEEEEEvPT_ Line | Count | Source | 476 | 1.32M | void ReleaseOps(Repeated* repeated) { | 477 | 1.32M | auto size = repeated->size(); | 478 | 1.32M | if (size) { | 479 | 1.00M | repeated->ExtractSubrange(0, size, nullptr); | 480 | 1.00M | } | 481 | 1.32M | } |
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_19PgsqlWriteRequestPBEEEEEvPT_ Line | Count | Source | 476 | 1.32M | void ReleaseOps(Repeated* repeated) { | 477 | 1.32M | auto size = repeated->size(); | 478 | 1.32M | if (size) { | 479 | 261k | repeated->ExtractSubrange(0, size, nullptr); | 480 | 261k | } | 481 | 1.32M | } |
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_18RedisReadRequestPBEEEEEvPT_ Line | Count | Source | 476 | 4.68M | void ReleaseOps(Repeated* repeated) { | 477 | 4.68M | auto size = repeated->size(); | 478 | 4.68M | if (size) { | 479 | 43.0k | repeated->ExtractSubrange(0, size, nullptr); | 480 | 43.0k | } | 481 | 4.68M | } |
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_15QLReadRequestPBEEEEEvPT_ Line | Count | Source | 476 | 4.67M | void ReleaseOps(Repeated* repeated) { | 477 | 4.67M | auto size = repeated->size(); | 478 | 4.67M | if (size) { | 479 | 3.92M | repeated->ExtractSubrange(0, size, nullptr); | 480 | 3.92M | } | 481 | 4.67M | } |
_ZN2yb6client8internal10ReleaseOpsIN6google8protobuf16RepeatedPtrFieldINS_18PgsqlReadRequestPBEEEEEvPT_ Line | Count | Source | 476 | 4.67M | void ReleaseOps(Repeated* repeated) { | 477 | 4.67M | auto size = repeated->size(); | 478 | 4.67M | if (size) { | 479 | 707k | repeated->ExtractSubrange(0, size, nullptr); | 480 | 707k | } | 481 | 4.67M | } |
|
482 | | |
483 | | WriteRpc::WriteRpc(const AsyncRpcData& data) |
484 | 1.32M | : AsyncRpcBase(data, YBConsistencyLevel::STRONG) { |
485 | 1.32M | TRACE_TO(trace_, "WriteRpc initiated"); |
486 | 1.32M | VTRACE_TO(1, trace_, "Tablet $0 table $1", data.tablet->tablet_id(), table()->name().ToString()); |
487 | | |
488 | | // Add the rows |
489 | 1.32M | switch (table()->table_type()) { |
490 | 61.5k | case YBTableType::REDIS_TABLE_TYPE: |
491 | 61.5k | FillOps<YBRedisWriteOp>( |
492 | 61.5k | ops_, YBOperation::Type::REDIS_WRITE, &req_, req_.mutable_redis_write_batch()); |
493 | 61.5k | break; |
494 | 1.00M | case YBTableType::YQL_TABLE_TYPE: |
495 | 1.00M | FillOps<YBqlWriteOp>( |
496 | 1.00M | ops_, YBOperation::Type::QL_WRITE, &req_, req_.mutable_ql_write_batch()); |
497 | 1.00M | break; |
498 | 261k | case YBTableType::PGSQL_TABLE_TYPE: |
499 | 261k | FillOps<YBPgsqlWriteOp>( |
500 | 261k | ops_, YBOperation::Type::PGSQL_WRITE, &req_, req_.mutable_pgsql_write_batch()); |
501 | 261k | break; |
502 | 0 | case YBTableType::UNKNOWN_TABLE_TYPE: |
503 | 0 | case YBTableType::TRANSACTION_STATUS_TABLE_TYPE: |
504 | 0 | LOG(DFATAL) << "Unsupported table type: " << table()->ToString(); |
505 | 0 | break; |
506 | 1.32M | } |
507 | | |
508 | 18.4E | VLOG(3) << "Created batch for " << data.tablet->tablet_id() << ":\n" |
509 | 18.4E | << req_.ShortDebugString(); |
510 | | |
511 | 1.32M | const auto& client_id = batcher_->client_id(); |
512 | 1.32M | if (!client_id.IsNil() && FLAGS_detect_duplicates_for_retryable_requests) { |
513 | 1.32M | auto temp = client_id.ToUInt64Pair(); |
514 | 1.32M | req_.set_client_id1(temp.first); |
515 | 1.32M | req_.set_client_id2(temp.second); |
516 | 1.32M | auto request_pair = batcher_->NextRequestIdAndMinRunningRequestId(data.tablet->tablet_id()); |
517 | 1.32M | req_.set_request_id(request_pair.first); |
518 | 1.32M | req_.set_min_running_request_id(request_pair.second); |
519 | 1.32M | } |
520 | 1.32M | } |
521 | | |
522 | 1.32M | WriteRpc::~WriteRpc() { |
523 | | // Check that we sent request id info, i.e. (client_id, request_id, min_running_request_id). |
524 | 1.32M | if (req_.has_client_id1()) { |
525 | 1.32M | batcher_->RequestFinished(tablet().tablet_id(), req_.request_id()); |
526 | 1.32M | } |
527 | | |
528 | 1.32M | if (async_rpc_metrics_) { |
529 | 1.27M | scoped_refptr<Histogram> write_rpc_time = IsLocalCall() ? |
530 | 884k | async_rpc_metrics_->local_write_rpc_time : |
531 | 389k | async_rpc_metrics_->remote_write_rpc_time; |
532 | 1.27M | write_rpc_time->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_)); |
533 | 1.27M | } |
534 | | |
535 | 1.32M | ReleaseOps(req_.mutable_redis_write_batch()); |
536 | 1.32M | ReleaseOps(req_.mutable_ql_write_batch()); |
537 | 1.32M | ReleaseOps(req_.mutable_pgsql_write_batch()); |
538 | 1.32M | } |
539 | | |
540 | 1.32M | void WriteRpc::CallRemoteMethod() { |
541 | 1.32M | auto trace = trace_; // It is possible that we receive reply before returning from WriteAsync. |
542 | | // Since send happens before we return from WriteAsync. |
543 | | // So under heavy load it is possible that our request is handled and |
544 | | // reply is received before WriteAsync returned. |
545 | 1.32M | TRACE_TO(trace, "SendRpcToTserver"); |
546 | 1.32M | ADOPT_TRACE(trace.get()); |
547 | | |
548 | 1.32M | tablet_invoker_.WriteAsync(req_, &resp_, PrepareController(), |
549 | 1.32M | std::bind(&WriteRpc::Finished, this, Status::OK())); |
550 | 1.32M | TRACE_TO(trace, "RpcDispatched Asynchronously"); |
551 | 1.32M | } |
552 | | |
553 | 1.24M | void WriteRpc::SwapResponses() { |
554 | 1.24M | int redis_idx = 0; |
555 | 1.24M | int ql_idx = 0; |
556 | 1.24M | int pgsql_idx = 0; |
557 | | |
558 | | // Retrieve Redis and QL responses and make sure we received all the responses back. |
559 | 5.11M | for (auto& op : ops_) { |
560 | 5.11M | YBOperation* yb_op = op.yb_op.get(); |
561 | 5.11M | switch (yb_op->type()) { |
562 | 61.5k | case YBOperation::Type::REDIS_WRITE: { |
563 | 61.5k | if (redis_idx >= resp_.redis_response_batch().size()) { |
564 | 0 | ++redis_idx; |
565 | 0 | continue; |
566 | 0 | } |
567 | | // Restore Redis write request PB and extract response. |
568 | 61.5k | auto* redis_op = down_cast<YBRedisWriteOp*>(yb_op); |
569 | 61.5k | redis_op->mutable_response()->Swap(resp_.mutable_redis_response_batch(redis_idx)); |
570 | 61.5k | redis_idx++; |
571 | 61.5k | break; |
572 | 61.5k | } |
573 | 2.95M | case YBOperation::Type::QL_WRITE: { |
574 | 2.95M | if (ql_idx >= resp_.ql_response_batch().size()) { |
575 | 0 | ++ql_idx; |
576 | 0 | continue; |
577 | 0 | } |
578 | | // Restore QL write request PB and extract response. |
579 | 2.95M | auto* ql_op = down_cast<YBqlWriteOp*>(yb_op); |
580 | 2.95M | ql_op->mutable_response()->Swap(resp_.mutable_ql_response_batch(ql_idx)); |
581 | 2.95M | const auto& ql_response = ql_op->response(); |
582 | 2.95M | if (ql_response.has_rows_data_sidecar()) { |
583 | 273 | Slice rows_data = CHECK_RESULT( |
584 | 273 | retrier().controller().GetSidecar(ql_response.rows_data_sidecar())); |
585 | 273 | ql_op->mutable_rows_data()->assign(rows_data.cdata(), rows_data.size()); |
586 | 273 | } |
587 | 2.95M | ql_idx++; |
588 | 2.95M | break; |
589 | 2.95M | } |
590 | 2.10M | case YBOperation::Type::PGSQL_WRITE: { |
591 | 2.10M | if (pgsql_idx >= resp_.pgsql_response_batch().size()) { |
592 | 0 | ++pgsql_idx; |
593 | 0 | continue; |
594 | 0 | } |
595 | | // Restore QL write request PB and extract response. |
596 | 2.10M | auto* pgsql_op = down_cast<YBPgsqlWriteOp*>(yb_op); |
597 | 2.10M | pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_response_batch(pgsql_idx)); |
598 | 2.10M | const auto& pgsql_response = pgsql_op->response(); |
599 | 2.10M | if (pgsql_response.has_rows_data_sidecar()) { |
600 | 2.10M | auto holder = CHECK_RESULT( |
601 | 2.10M | retrier().controller().GetSidecarHolder(pgsql_response.rows_data_sidecar())); |
602 | 2.10M | down_cast<YBPgsqlWriteOp*>(yb_op)->SetRowsData(holder.first, holder.second); |
603 | 2.10M | } |
604 | 2.10M | pgsql_idx++; |
605 | 2.10M | break; |
606 | 2.10M | } |
607 | 0 | case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED; |
608 | 0 | case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED; |
609 | 0 | case YBOperation::Type::QL_READ: |
610 | 0 | LOG(FATAL) << "Not a write operation " << op.yb_op->type(); |
611 | 0 | break; |
612 | 5.11M | } |
613 | 5.11M | } |
614 | | |
615 | 1.24M | if (redis_idx != resp_.redis_response_batch().size() || |
616 | 1.24M | ql_idx != resp_.ql_response_batch().size() || |
617 | 1.24M | pgsql_idx != resp_.pgsql_response_batch().size()) { |
618 | 0 | LOG(ERROR) << Substitute("Write response count mismatch: " |
619 | 0 | "$0 Redis requests sent, $1 responses received. " |
620 | 0 | "$2 Apache CQL requests sent, $3 responses received. " |
621 | 0 | "$4 PostgreSQL requests sent, $5 responses received.", |
622 | 0 | redis_idx, resp_.redis_response_batch().size(), |
623 | 0 | ql_idx, resp_.ql_response_batch().size(), |
624 | 0 | pgsql_idx, resp_.pgsql_response_batch().size()); |
625 | 0 | auto status = STATUS(IllegalState, "Write response count mismatch"); |
626 | 0 | LOG(ERROR) << status << ", request: " << req_.ShortDebugString() |
627 | 0 | << ", response: " << resp_.ShortDebugString(); |
628 | 0 | batcher_->AddOpCountMismatchError(); |
629 | 0 | Failed(status); |
630 | 0 | } |
631 | 1.24M | } |
632 | | |
633 | 1.32M | void WriteRpc::NotifyBatcher(const Status& status) { |
634 | 1.32M | batcher_->ProcessWriteResponse(*this, status); |
635 | 1.32M | } |
636 | | |
637 | 0 | bool WriteRpc::ShouldRetryExpiredRequest() { |
638 | 0 | return req_.min_running_request_id() == kInitializeFromMinRunning; |
639 | 0 | } |
640 | | |
641 | | ReadRpc::ReadRpc(const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level) |
642 | 4.68M | : AsyncRpcBase(data, yb_consistency_level) { |
643 | 4.68M | TRACE_TO(trace_, "ReadRpc initiated"); |
644 | 4.68M | VTRACE_TO(1, trace_, "Tablet $0 table $1", data.tablet->tablet_id(), table()->name().ToString()); |
645 | 4.68M | req_.set_consistency_level(yb_consistency_level); |
646 | 4.68M | req_.set_proxy_uuid(data.batcher->proxy_uuid()); |
647 | | |
648 | 4.68M | switch (table()->table_type()) { |
649 | 43.0k | case YBTableType::REDIS_TABLE_TYPE: |
650 | 43.0k | FillOps<YBRedisReadOp>( |
651 | 43.0k | ops_, YBOperation::Type::REDIS_READ, &req_, req_.mutable_redis_batch()); |
652 | 43.0k | break; |
653 | 3.94M | case YBTableType::YQL_TABLE_TYPE: |
654 | 3.94M | FillOps<YBqlReadOp>( |
655 | 3.94M | ops_, YBOperation::Type::QL_READ, &req_, req_.mutable_ql_batch()); |
656 | 3.94M | break; |
657 | 708k | case YBTableType::PGSQL_TABLE_TYPE: |
658 | 708k | FillOps<YBPgsqlReadOp>( |
659 | 708k | ops_, YBOperation::Type::PGSQL_READ, &req_, req_.mutable_pgsql_batch()); |
660 | 708k | break; |
661 | 0 | case YBTableType::UNKNOWN_TABLE_TYPE: |
662 | 0 | case YBTableType::TRANSACTION_STATUS_TABLE_TYPE: |
663 | 0 | LOG(DFATAL) << "Unsupported table type: " << table()->ToString(); |
664 | 0 | break; |
665 | 4.68M | } |
666 | | |
667 | 18.4E | VLOG(3) << "Created batch for " << data.tablet->tablet_id() << ":\n" |
668 | 18.4E | << req_.ShortDebugString(); |
669 | 4.68M | } |
670 | | |
671 | 4.68M | ReadRpc::~ReadRpc() { |
672 | | // Get locality metrics if enabled, but skip for system tables as those go to the master. |
673 | 4.68M | if (async_rpc_metrics_ && !table()->name().is_system()) { |
674 | 4.43M | scoped_refptr<Histogram> read_rpc_time = IsLocalCall() ? |
675 | 3.75M | async_rpc_metrics_->local_read_rpc_time : |
676 | 678k | async_rpc_metrics_->remote_read_rpc_time; |
677 | | |
678 | 4.43M | read_rpc_time->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_)); |
679 | 4.43M | } |
680 | | |
681 | 4.68M | ReleaseOps(req_.mutable_redis_batch()); |
682 | 4.68M | ReleaseOps(req_.mutable_ql_batch()); |
683 | 4.68M | ReleaseOps(req_.mutable_pgsql_batch()); |
684 | 4.68M | } |
685 | | |
686 | 4.70M | void ReadRpc::CallRemoteMethod() { |
687 | 4.70M | auto trace = trace_; // It is possible that we receive reply before returning from ReadAsync. |
688 | | // Detailed explanation in WriteRpc::SendRpcToTserver. |
689 | 4.70M | TRACE_TO(trace, "SendRpcToTserver"); |
690 | 4.70M | ADOPT_TRACE(trace.get()); |
691 | | |
692 | 4.70M | tablet_invoker_.ReadAsync(req_, &resp_, PrepareController(), |
693 | 4.70M | std::bind(&ReadRpc::Finished, this, Status::OK())); |
694 | 4.70M | TRACE_TO(trace, "RpcDispatched Asynchronously"); |
695 | 4.70M | } |
696 | | |
697 | 4.64M | void ReadRpc::SwapResponses() { |
698 | 4.64M | int redis_idx = 0; |
699 | 4.64M | int ql_idx = 0; |
700 | 4.64M | int pgsql_idx = 0; |
701 | | |
702 | | // Retrieve Redis and QL responses and make sure we received all the responses back. |
703 | 5.45M | for (auto& op : ops_) { |
704 | 5.45M | YBOperation* yb_op = op.yb_op.get(); |
705 | 5.45M | switch (yb_op->type()) { |
706 | 42.9k | case YBOperation::Type::REDIS_READ: { |
707 | 42.9k | if (redis_idx >= resp_.redis_batch().size()) { |
708 | 0 | batcher_->AddOpCountMismatchError(); |
709 | 0 | return; |
710 | 0 | } |
711 | | // Restore Redis read request PB and extract response. |
712 | 42.9k | auto* redis_op = down_cast<YBRedisReadOp*>(yb_op); |
713 | 42.9k | redis_op->mutable_response()->Swap(resp_.mutable_redis_batch(redis_idx)); |
714 | 42.9k | redis_idx++; |
715 | 42.9k | break; |
716 | 42.9k | } |
717 | 3.92M | case YBOperation::Type::QL_READ: { |
718 | 3.92M | if (ql_idx >= resp_.ql_batch().size()) { |
719 | 0 | batcher_->AddOpCountMismatchError(); |
720 | 0 | return; |
721 | 0 | } |
722 | | // Restore QL read request PB and extract response. |
723 | 3.92M | auto* ql_op = down_cast<YBqlReadOp*>(yb_op); |
724 | 3.92M | ql_op->mutable_response()->Swap(resp_.mutable_ql_batch(ql_idx)); |
725 | 3.92M | const auto& ql_response = ql_op->response(); |
726 | 3.92M | if (ql_response.has_rows_data_sidecar()) { |
727 | 3.91M | Slice rows_data = CHECK_RESULT(retrier().controller().GetSidecar( |
728 | 3.91M | ql_response.rows_data_sidecar())); |
729 | 3.91M | ql_op->mutable_rows_data()->assign(rows_data.cdata(), rows_data.size()); |
730 | 3.91M | } |
731 | 3.92M | ql_idx++; |
732 | 3.92M | break; |
733 | 3.92M | } |
734 | 1.50M | case YBOperation::Type::PGSQL_READ: { |
735 | 1.50M | if (pgsql_idx >= resp_.pgsql_batch().size()) { |
736 | 0 | batcher_->AddOpCountMismatchError(); |
737 | 0 | return; |
738 | 0 | } |
739 | | // Restore PGSQL read request PB and extract response. |
740 | 1.50M | auto* pgsql_op = down_cast<YBPgsqlReadOp*>(yb_op); |
741 | 1.50M | if (resp_.has_used_read_time()) { |
742 | 949k | pgsql_op->SetUsedReadTime(ReadHybridTime::FromPB(resp_.used_read_time())); |
743 | 949k | } |
744 | 1.50M | pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_batch(pgsql_idx)); |
745 | 1.50M | const auto& pgsql_response = pgsql_op->response(); |
746 | 1.50M | if (pgsql_response.has_rows_data_sidecar()) { |
747 | 1.50M | auto holder = CHECK_RESULT( |
748 | 1.50M | retrier().controller().GetSidecarHolder(pgsql_response.rows_data_sidecar())); |
749 | 1.50M | down_cast<YBPgsqlReadOp*>(yb_op)->SetRowsData(holder.first, holder.second); |
750 | 1.50M | } |
751 | 1.50M | pgsql_idx++; |
752 | 1.50M | break; |
753 | 1.50M | } |
754 | 0 | case YBOperation::Type::PGSQL_WRITE: FALLTHROUGH_INTENDED; |
755 | 0 | case YBOperation::Type::REDIS_WRITE: FALLTHROUGH_INTENDED; |
756 | 0 | case YBOperation::Type::QL_WRITE: |
757 | 0 | LOG(FATAL) << "Not a read operation " << op.yb_op->type(); |
758 | 0 | break; |
759 | 5.45M | } |
760 | 5.45M | } |
761 | | |
762 | 4.64M | if (redis_idx != resp_.redis_batch().size() || |
763 | 4.62M | ql_idx != resp_.ql_batch().size() || |
764 | 4.63M | pgsql_idx != resp_.pgsql_batch().size()) { |
765 | 0 | LOG(ERROR) << Substitute("Read response count mismatch: " |
766 | 0 | "$0 Redis requests sent, $1 responses received. " |
767 | 0 | "$2 QL requests sent, $3 responses received. " |
768 | 0 | "$4 QL requests sent, $5 responses received.", |
769 | 0 | redis_idx, resp_.redis_batch().size(), |
770 | 0 | ql_idx, resp_.ql_batch().size(), |
771 | 0 | pgsql_idx, resp_.pgsql_batch().size()); |
772 | 0 | batcher_->AddOpCountMismatchError(); |
773 | 0 | Failed(STATUS(IllegalState, "Read response count mismatch")); |
774 | 0 | } |
775 | | |
776 | 4.64M | } |
777 | | |
778 | 4.68M | void ReadRpc::NotifyBatcher(const Status& status) { |
779 | 4.68M | batcher_->ProcessReadResponse(*this, status); |
780 | 4.68M | } |
781 | | |
782 | | } // namespace internal |
783 | | } // namespace client |
784 | | } // namespace yb |