/Users/deen/code/yugabyte-db/src/yb/client/async_rpc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/async_rpc.h" |
15 | | |
16 | | #include "yb/client/batcher.h" |
17 | | #include "yb/client/client_error.h" |
18 | | #include "yb/client/in_flight_op.h" |
19 | | #include "yb/client/meta_cache.h" |
20 | | #include "yb/client/table.h" |
21 | | #include "yb/client/yb_op.h" |
22 | | #include "yb/client/yb_table_name.h" |
23 | | |
24 | | #include "yb/common/pgsql_error.h" |
25 | | #include "yb/common/schema.h" |
26 | | #include "yb/common/transaction.h" |
27 | | #include "yb/common/transaction_error.h" |
28 | | #include "yb/common/wire_protocol.h" |
29 | | |
30 | | #include "yb/gutil/casts.h" |
31 | | #include "yb/gutil/strings/substitute.h" |
32 | | |
33 | | #include "yb/rpc/rpc_controller.h" |
34 | | |
35 | | #include "yb/util/cast.h" |
36 | | #include "yb/util/flag_tags.h" |
37 | | #include "yb/util/logging.h" |
38 | | #include "yb/util/metrics.h" |
39 | | #include "yb/util/result.h" |
40 | | #include "yb/util/status_log.h" |
41 | | #include "yb/util/trace.h" |
42 | | #include "yb/util/yb_pg_errcodes.h" |
43 | | |
44 | | // TODO: do we need word Redis in following two metrics? ReadRpc and WriteRpc objects emitting |
45 | | // these metrics are used not only in Redis service. |
46 | | METRIC_DEFINE_coarse_histogram( |
47 | | server, handler_latency_yb_client_write_remote, "yb.client.Write remote call time", |
48 | | yb::MetricUnit::kMicroseconds, "Microseconds spent in the remote Write call "); |
49 | | METRIC_DEFINE_coarse_histogram( |
50 | | server, handler_latency_yb_client_read_remote, "yb.client.Read remote call time", |
51 | | yb::MetricUnit::kMicroseconds, "Microseconds spent in the remote Read call "); |
52 | | METRIC_DEFINE_coarse_histogram( |
53 | | server, handler_latency_yb_client_write_local, "yb.client.Write local call time", |
54 | | yb::MetricUnit::kMicroseconds, "Microseconds spent in the local Write call "); |
55 | | METRIC_DEFINE_coarse_histogram( |
56 | | server, handler_latency_yb_client_read_local, "yb.client.Read local call time", |
57 | | yb::MetricUnit::kMicroseconds, "Microseconds spent in the local Read call "); |
58 | | METRIC_DEFINE_coarse_histogram( |
59 | | server, handler_latency_yb_client_time_to_send, |
60 | | "Time taken for a Write/Read rpc to be sent to the server", yb::MetricUnit::kMicroseconds, |
61 | | "Microseconds spent before sending the request to the server"); |
62 | | |
63 | | METRIC_DEFINE_counter(server, consistent_prefix_successful_reads, |
64 | | "Number of consistent prefix reads that were served by the closest replica.", |
65 | | yb::MetricUnit::kRequests, |
66 | | "Number of consistent prefix reads that were served by the closest replica."); |
67 | | |
68 | | METRIC_DEFINE_counter(server, consistent_prefix_failed_reads, |
69 | | "Number of consistent prefix reads that failed to be served by the closest replica.", |
70 | | yb::MetricUnit::kRequests, |
71 | | "Number of consistent prefix reads that failed to be served by the closest replica."); |
72 | | |
73 | | DEFINE_int32(ybclient_print_trace_every_n, 0, |
74 | | "Controls the rate at which traces from ybclient are printed. Setting this to 0 " |
75 | | "disables printing the collected traces."); |
76 | | TAG_FLAG(ybclient_print_trace_every_n, advanced); |
77 | | TAG_FLAG(ybclient_print_trace_every_n, runtime); |
78 | | |
79 | | DEFINE_bool(forward_redis_requests, true, "If false, the redis op will not be served if it's not " |
80 | | "a local request. The op response will be set to the redis error " |
81 | | "'-MOVED partition_key 0.0.0.0:0'. This works with jedis which only looks at the MOVED " |
82 | | "part of the reply and ignores the rest. For now, if this flag is true, we will only " |
83 | | "attempt to read from leaders, so redis_allow_reads_from_followers will be ignored."); |
84 | | |
85 | | DEFINE_bool(detect_duplicates_for_retryable_requests, true, |
86 | | "Enable tracking of write requests that prevents the same write from being applied " |
87 | | "twice."); |
88 | | |
89 | | DEFINE_bool(ysql_forward_rpcs_to_local_tserver, false, |
90 | | "When true, forward the PGSQL rpcs to the local tServer."); |
91 | | |
92 | | DEFINE_CAPABILITY(PickReadTimeAtTabletServer, 0x8284d67b); |
93 | | |
94 | | DECLARE_bool(collect_end_to_end_traces); |
95 | | |
96 | | using namespace std::placeholders; |
97 | | |
98 | | namespace yb { |
99 | | |
100 | | using std::shared_ptr; |
101 | | using rpc::ErrorStatusPB; |
102 | | using rpc::Messenger; |
103 | | using rpc::Rpc; |
104 | | using rpc::RpcController; |
105 | | using tserver::WriteRequestPB; |
106 | | using tserver::WriteResponsePB; |
107 | | using tserver::WriteResponsePB_PerRowErrorPB; |
108 | | using strings::Substitute; |
109 | | |
110 | | namespace client { |
111 | | |
112 | | namespace internal { |
113 | | |
114 | 12.0M | bool IsTracingEnabled() { |
115 | 12.0M | return FLAGS_collect_end_to_end_traces; |
116 | 12.0M | } |
117 | | |
118 | | namespace { |
119 | | |
120 | 12.0M | bool LocalTabletServerOnly(const InFlightOps& ops) { |
121 | 12.0M | const auto op_type = ops.front().yb_op->type(); |
122 | 12.0M | return ((op_type == YBOperation::Type::REDIS_READ || op_type == YBOperation::Type::REDIS_WRITE11.9M ) && |
123 | 12.0M | !FLAGS_forward_redis_requests208k ); |
124 | 12.0M | } |
125 | | |
126 | | } |
127 | | |
128 | | AsyncRpcMetrics::AsyncRpcMetrics(const scoped_refptr<yb::MetricEntity>& entity) |
129 | | : remote_write_rpc_time(METRIC_handler_latency_yb_client_write_remote.Instantiate(entity)), |
130 | | remote_read_rpc_time(METRIC_handler_latency_yb_client_read_remote.Instantiate(entity)), |
131 | | local_write_rpc_time(METRIC_handler_latency_yb_client_write_local.Instantiate(entity)), |
132 | | local_read_rpc_time(METRIC_handler_latency_yb_client_read_local.Instantiate(entity)), |
133 | | time_to_send(METRIC_handler_latency_yb_client_time_to_send.Instantiate(entity)), |
134 | | consistent_prefix_successful_reads( |
135 | | METRIC_consistent_prefix_successful_reads.Instantiate(entity)), |
136 | 129k | consistent_prefix_failed_reads(METRIC_consistent_prefix_failed_reads.Instantiate(entity)) { |
137 | 129k | } |
138 | | |
139 | | AsyncRpc::AsyncRpc( |
140 | | const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level) |
141 | | : Rpc(data.batcher->deadline(), data.batcher->messenger(), &data.batcher->proxy_cache()), |
142 | | batcher_(data.batcher), |
143 | | ops_(data.ops), |
144 | | tablet_invoker_(LocalTabletServerOnly(ops_), |
145 | | yb_consistency_level == YBConsistencyLevel::CONSISTENT_PREFIX, |
146 | | data.batcher->client_, |
147 | | this, |
148 | | this, |
149 | | data.tablet, |
150 | | table(), |
151 | | mutable_retrier(), |
152 | | trace_.get()), |
153 | | start_(CoarseMonoClock::Now()), |
154 | 12.0M | async_rpc_metrics_(data.batcher->async_rpc_metrics()) { |
155 | 12.0M | mutable_retrier()->mutable_controller()->set_allow_local_calls_in_curr_thread( |
156 | 12.0M | data.allow_local_calls_in_curr_thread); |
157 | 12.0M | } |
158 | | |
159 | 12.0M | AsyncRpc::~AsyncRpc() { |
160 | 12.0M | if (trace_->must_print()) { |
161 | 0 | LOG(INFO) << ToString() << " took " << ToMicroseconds(CoarseMonoClock::Now() - start_) |
162 | 0 | << "us. Trace:\n" << trace_->DumpToString(true); |
163 | 12.0M | } else { |
164 | 12.0M | const auto print_trace_every_n = GetAtomicFlag(&FLAGS_ybclient_print_trace_every_n); |
165 | 12.0M | if (print_trace_every_n > 0) { |
166 | 0 | YB_LOG_EVERY_N(INFO, print_trace_every_n) |
167 | 0 | << ToString() << " took " |
168 | 0 | << ToMicroseconds(CoarseMonoClock::Now() - start_) |
169 | 0 | << "us. Trace:\n" << trace_->DumpToString(true); |
170 | 0 | } |
171 | 12.0M | } |
172 | 12.0M | } |
173 | | |
174 | 12.1M | void AsyncRpc::SendRpc() { |
175 | 12.1M | TRACE_TO(trace_, "SendRpc() called."); |
176 | | |
177 | 12.1M | retained_self_ = shared_from_this(); |
178 | | // For now, if this is a retry, execute this rpc on the leader even if |
179 | | // the consistency level is YBConsistencyLevel::CONSISTENT_PREFIX or |
180 | | // FLAGS_redis_allow_reads_from_followers is set to true. |
181 | | // TODO(hector): Temporarily blacklist the follower that couldn't serve the read so we can retry |
182 | | // on another follower. |
183 | 12.1M | if (async_rpc_metrics_ && num_attempts() > 112.0M && tablet_invoker_.is_consistent_prefix()34.9k ) { |
184 | 4.40k | IncrementCounter(async_rpc_metrics_->consistent_prefix_failed_reads); |
185 | 4.40k | } |
186 | 12.1M | tablet_invoker_.Execute(std::string(), num_attempts() > 1); |
187 | 12.1M | } |
188 | | |
189 | 183k | std::string AsyncRpc::ToString() const { |
190 | 183k | const auto& transaction = batcher_->in_flight_ops().metadata.transaction; |
191 | 183k | const auto subtransaction_opt = batcher_->in_flight_ops().metadata.subtransaction; |
192 | 183k | return Format("$0(tablet: $1, num_ops: $2, num_attempts: $3, txn: $4, subtxn: $5)", |
193 | 183k | ops_.front().yb_op->read_only() ? "Read"92.7k : "Write"90.9k , |
194 | 183k | tablet().tablet_id(), ops_.size(), num_attempts(), |
195 | 183k | transaction.transaction_id, |
196 | 183k | subtransaction_opt |
197 | 183k | ? Format("$0", subtransaction_opt->subtransaction_id)12.0k |
198 | 183k | : "[none]"171k ); |
199 | 183k | } |
200 | | |
201 | 36.6M | std::shared_ptr<const YBTable> AsyncRpc::table() const { |
202 | | // All of the ops for a given tablet obviously correspond to the same table, |
203 | | // so we'll just grab the table from the first. |
204 | 36.6M | return ops_[0].yb_op->table(); |
205 | 36.6M | } |
206 | | |
207 | 12.0M | void AsyncRpc::Finished(const Status& status) { |
208 | 12.0M | Status new_status = status; |
209 | 12.0M | if (tablet_invoker_.Done(&new_status)) { |
210 | 12.0M | if (tablet().is_split() || |
211 | 12.0M | ClientError(new_status) == ClientErrorCode::kTablePartitionListIsStale) { |
212 | 46 | ops_[0].yb_op->MarkTablePartitionListAsStale(); |
213 | 46 | } |
214 | 12.0M | if (async_rpc_metrics_ && status.ok()12.0M && tablet_invoker_.is_consistent_prefix()12.0M ) { |
215 | 22.0k | IncrementCounter(async_rpc_metrics_->consistent_prefix_successful_reads); |
216 | 22.0k | } |
217 | 12.0M | ProcessResponseFromTserver(new_status); |
218 | 12.0M | batcher_->Flushed(ops_, new_status, MakeFlushExtraResult()); |
219 | 12.0M | retained_self_.reset(); |
220 | 12.0M | } |
221 | 12.0M | } |
222 | | |
223 | 185k | void AsyncRpc::Failed(const Status& status) { |
224 | 185k | std::string error_message = status.message().ToBuffer(); |
225 | 185k | auto redis_error_code = status.IsInvalidCommand() || status.IsInvalidArgument()185k ? |
226 | 185k | RedisResponsePB_RedisStatusCode_PARSING_ERROR6 : RedisResponsePB_RedisStatusCode_SERVER_ERROR; |
227 | 996k | for (auto& op : ops_) { |
228 | 996k | YBOperation* yb_op = op.yb_op.get(); |
229 | 996k | switch (yb_op->type()) { |
230 | 201 | case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED; |
231 | 215 | case YBOperation::Type::REDIS_WRITE: { |
232 | 215 | RedisResponsePB* resp; |
233 | 215 | if (yb_op->type() == YBOperation::Type::REDIS_READ) { |
234 | 201 | resp = down_cast<YBRedisReadOp*>(yb_op)->mutable_response(); |
235 | 201 | } else { |
236 | 14 | resp = down_cast<YBRedisWriteOp*>(yb_op)->mutable_response(); |
237 | 14 | } |
238 | 215 | resp->Clear(); |
239 | | // If the tserver replied it is not the leader, respond that the key has moved. We do not |
240 | | // need to return the address of the new leader because the Redis client will refresh the |
241 | | // cluster map instead. |
242 | 215 | if (status.IsIllegalState()) { |
243 | 9 | resp->set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR); |
244 | 9 | resp->set_error_message(Substitute("MOVED $0 0.0.0.0:0", |
245 | 9 | down_cast<YBRedisOp*>(yb_op)->hash_code())); |
246 | 206 | } else { |
247 | 206 | resp->set_code(redis_error_code); |
248 | 206 | resp->set_error_message(error_message); |
249 | 206 | } |
250 | 215 | break; |
251 | 201 | } |
252 | 149 | case YBOperation::Type::QL_READ: FALLTHROUGH_INTENDED; |
253 | 60.3k | case YBOperation::Type::QL_WRITE: { |
254 | 60.3k | QLResponsePB* resp = down_cast<YBqlOp*>(yb_op)->mutable_response(); |
255 | 60.3k | resp->Clear(); |
256 | 60.3k | resp->set_status(status.IsTryAgain() ? QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR50.1k |
257 | 60.3k | : QLResponsePB::YQL_STATUS_RUNTIME_ERROR10.2k ); |
258 | 60.3k | resp->set_error_message(error_message); |
259 | 60.3k | break; |
260 | 149 | } |
261 | 886k | case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED; |
262 | 935k | case YBOperation::Type::PGSQL_WRITE: { |
263 | 935k | PgsqlResponsePB* resp = down_cast<YBPgsqlOp*>(yb_op)->mutable_response(); |
264 | 935k | resp->set_status(status.IsTryAgain()935k ? PgsqlResponsePB::PGSQL_STATUS_RESTART_REQUIRED_ERROR |
265 | 18.4E | : PgsqlResponsePB::PGSQL_STATUS_RUNTIME_ERROR); |
266 | 935k | resp->set_error_message(error_message); |
267 | 935k | const uint8_t* pg_err_ptr = status.ErrorData(PgsqlErrorTag::kCategory); |
268 | 935k | if (pg_err_ptr != nullptr) { |
269 | 335k | resp->set_pg_error_code(static_cast<uint32_t>(PgsqlErrorTag::Decode(pg_err_ptr))); |
270 | 600k | } else { |
271 | 600k | resp->set_pg_error_code(static_cast<uint32_t>(YBPgErrorCode::YB_PG_INTERNAL_ERROR)); |
272 | 600k | } |
273 | 935k | const uint8_t* txn_err_ptr = status.ErrorData(TransactionErrorTag::kCategory); |
274 | 935k | if (txn_err_ptr != nullptr) { |
275 | 933k | resp->set_txn_error_code(static_cast<uint16_t>(TransactionErrorTag::Decode(txn_err_ptr))); |
276 | 933k | } else { |
277 | 1.76k | resp->set_txn_error_code(static_cast<uint16_t>(TransactionErrorCode::kNone)); |
278 | 1.76k | } |
279 | 935k | break; |
280 | 886k | } |
281 | 0 | default: |
282 | 0 | LOG(FATAL) << "Unsupported operation " << yb_op->type(); |
283 | 0 | break; |
284 | 996k | } |
285 | 996k | } |
286 | 185k | } |
287 | | |
288 | 11.7M | bool AsyncRpc::IsLocalCall() const { |
289 | 11.7M | return tablet_invoker_.IsLocalCall(); |
290 | 11.7M | } |
291 | | |
292 | | namespace { |
293 | | |
294 | | template<class T> |
295 | | void SetMetadata(const InFlightOpsTransactionMetadata& metadata, |
296 | | bool need_full_metadata, |
297 | 1.59M | T* dest) { |
298 | 1.59M | auto* transaction = dest->mutable_transaction(); |
299 | 1.59M | if (need_full_metadata) { |
300 | 983k | metadata.transaction.ToPB(transaction); |
301 | 983k | } else { |
302 | 609k | metadata.transaction.TransactionIdToPB(transaction); |
303 | 609k | } |
304 | 1.59M | dest->set_deprecated_may_have_metadata(true); |
305 | | |
306 | 1.59M | if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()53.1k ) { |
307 | 52.8k | metadata.subtransaction->ToPB(dest->mutable_subtransaction()); |
308 | 52.8k | } |
309 | 1.59M | } async_rpc.cc:void yb::client::internal::(anonymous namespace)::SetMetadata<yb::docdb::KeyValueWriteBatchPB>(yb::client::internal::InFlightOpsTransactionMetadata const&, bool, yb::docdb::KeyValueWriteBatchPB*) Line | Count | Source | 297 | 691k | T* dest) { | 298 | 691k | auto* transaction = dest->mutable_transaction(); | 299 | 691k | if (need_full_metadata) { | 300 | 452k | metadata.transaction.ToPB(transaction); | 301 | 452k | } else { | 302 | 238k | metadata.transaction.TransactionIdToPB(transaction); | 303 | 238k | } | 304 | 691k | dest->set_deprecated_may_have_metadata(true); | 305 | | | 306 | 691k | if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()12.8k ) { | 307 | 12.8k | metadata.subtransaction->ToPB(dest->mutable_subtransaction()); | 308 | 12.8k | } | 309 | 691k | } |
async_rpc.cc:void yb::client::internal::(anonymous namespace)::SetMetadata<yb::tserver::ReadRequestPB>(yb::client::internal::InFlightOpsTransactionMetadata const&, bool, yb::tserver::ReadRequestPB*) Line | Count | Source | 297 | 901k | T* dest) { | 298 | 901k | auto* transaction = dest->mutable_transaction(); | 299 | 901k | if (need_full_metadata) { | 300 | 531k | metadata.transaction.ToPB(transaction); | 301 | 531k | } else { | 302 | 370k | metadata.transaction.TransactionIdToPB(transaction); | 303 | 370k | } | 304 | 901k | dest->set_deprecated_may_have_metadata(true); | 305 | | | 306 | 901k | if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()40.2k ) { | 307 | 39.9k | metadata.subtransaction->ToPB(dest->mutable_subtransaction()); | 308 | 39.9k | } | 309 | 901k | } |
|
310 | | |
311 | | void SetMetadata(const InFlightOpsTransactionMetadata& metadata, |
312 | | bool need_full_metadata, |
313 | 690k | tserver::WriteRequestPB* req) { |
314 | 690k | SetMetadata(metadata, need_full_metadata, req->mutable_write_batch()); |
315 | 690k | } |
316 | | |
317 | | } // namespace |
318 | | |
319 | 12.1M | void AsyncRpc::SendRpcToTserver(int attempt_num) { |
320 | 12.1M | if (async_rpc_metrics_) { |
321 | 12.0M | async_rpc_metrics_->time_to_send->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_)); |
322 | 12.0M | } |
323 | 12.1M | CallRemoteMethod(); |
324 | 12.1M | } |
325 | | |
326 | | template <class Req, class Resp> |
327 | | AsyncRpcBase<Req, Resp>::AsyncRpcBase( |
328 | | const AsyncRpcData& data, YBConsistencyLevel consistency_level) |
329 | 12.0M | : AsyncRpc(data, consistency_level) { |
330 | 12.0M | req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id())); |
331 | 12.0M | req_.set_include_trace(IsTracingEnabled()); |
332 | 12.0M | const ConsistentReadPoint* read_point = batcher_->read_point(); |
333 | 12.0M | bool has_read_time = false; |
334 | 12.0M | if (read_point) { |
335 | 11.8M | req_.set_propagated_hybrid_time(read_point->Now().ToUint64()); |
336 | | // Set read time for consistent read only if the table is transaction-enabled and |
337 | | // consistent read is required. |
338 | 11.8M | if (data.need_consistent_read && |
339 | 11.8M | table()->InternalSchema().table_properties().is_transactional()2.98M ) { |
340 | 2.78M | auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id()); |
341 | 2.78M | if (read_time) { |
342 | 2.21M | has_read_time = true; |
343 | 2.21M | read_time.AddToPB(&req_); |
344 | 2.21M | } |
345 | 2.78M | } |
346 | 11.8M | } |
347 | 12.0M | if (!ops_.empty()12.0M ) { |
348 | 12.0M | req_.set_batch_idx(ops_.front().batch_idx); |
349 | 12.0M | } |
350 | 12.0M | const auto& metadata = batcher_->in_flight_ops().metadata; |
351 | 12.0M | if (!metadata.transaction.transaction_id.IsNil()) { |
352 | 1.59M | SetMetadata(metadata, data.need_metadata, &req_); |
353 | 1.59M | bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION; |
354 | 1.59M | LOG_IF(DFATAL, has_read_time && serializable) |
355 | 40 | << "Read time should NOT be specified for serializable isolation: " |
356 | 40 | << read_point->GetReadTime().ToString(); |
357 | 1.59M | } |
358 | 12.0M | } yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::AsyncRpcBase(yb::client::internal::AsyncRpcData const&, yb::YBConsistencyLevel) Line | Count | Source | 329 | 2.54M | : AsyncRpc(data, consistency_level) { | 330 | 2.54M | req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id())); | 331 | 2.54M | req_.set_include_trace(IsTracingEnabled()); | 332 | 2.54M | const ConsistentReadPoint* read_point = batcher_->read_point(); | 333 | 2.54M | bool has_read_time = false; | 334 | 2.54M | if (read_point) { | 335 | 2.39M | req_.set_propagated_hybrid_time(read_point->Now().ToUint64()); | 336 | | // Set read time for consistent read only if the table is transaction-enabled and | 337 | | // consistent read is required. | 338 | 2.39M | if (data.need_consistent_read && | 339 | 2.39M | table()->InternalSchema().table_properties().is_transactional()859k ) { | 340 | 854k | auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id()); | 341 | 854k | if (read_time) { | 342 | 668k | has_read_time = true; | 343 | 668k | read_time.AddToPB(&req_); | 344 | 668k | } | 345 | 854k | } | 346 | 2.39M | } | 347 | 2.54M | if (!ops_.empty()) { | 348 | 2.54M | req_.set_batch_idx(ops_.front().batch_idx); | 349 | 2.54M | } | 350 | 2.54M | const auto& metadata = batcher_->in_flight_ops().metadata; | 351 | 2.54M | if (!metadata.transaction.transaction_id.IsNil()) { | 352 | 690k | SetMetadata(metadata, data.need_metadata, &req_); | 353 | 690k | bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION; | 354 | 18.4E | LOG_IF(DFATAL, has_read_time && serializable) | 355 | 18.4E | << "Read time should NOT be specified for serializable isolation: " | 356 | 18.4E | << read_point->GetReadTime().ToString(); | 357 | 690k | } | 358 | 2.54M | } |
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::AsyncRpcBase(yb::client::internal::AsyncRpcData const&, yb::YBConsistencyLevel) Line | Count | Source | 329 | 9.54M | : AsyncRpc(data, consistency_level) { | 330 | 9.54M | req_.set_allocated_tablet_id(const_cast<std::string*>(&tablet_invoker_.tablet()->tablet_id())); | 331 | 9.54M | req_.set_include_trace(IsTracingEnabled()); | 332 | 9.54M | const ConsistentReadPoint* read_point = batcher_->read_point(); | 333 | 9.54M | bool has_read_time = false; | 334 | 9.54M | if (read_point) { | 335 | 9.41M | req_.set_propagated_hybrid_time(read_point->Now().ToUint64()); | 336 | | // Set read time for consistent read only if the table is transaction-enabled and | 337 | | // consistent read is required. | 338 | 9.41M | if (data.need_consistent_read && | 339 | 9.41M | table()->InternalSchema().table_properties().is_transactional()2.12M ) { | 340 | 1.93M | auto read_time = read_point->GetReadTime(tablet_invoker_.tablet()->tablet_id()); | 341 | 1.93M | if (read_time) { | 342 | 1.54M | has_read_time = true; | 343 | 1.54M | read_time.AddToPB(&req_); | 344 | 1.54M | } | 345 | 1.93M | } | 346 | 9.41M | } | 347 | 9.54M | if (!ops_.empty()9.54M ) { | 348 | 9.54M | req_.set_batch_idx(ops_.front().batch_idx); | 349 | 9.54M | } | 350 | 9.54M | const auto& metadata = batcher_->in_flight_ops().metadata; | 351 | 9.54M | if (!metadata.transaction.transaction_id.IsNil()) { | 352 | 901k | SetMetadata(metadata, data.need_metadata, &req_); | 353 | 901k | bool serializable = metadata.transaction.isolation == IsolationLevel::SERIALIZABLE_ISOLATION; | 354 | 901k | LOG_IF(DFATAL, has_read_time && serializable) | 355 | 72 | << "Read time should NOT be specified for serializable isolation: " | 356 | 72 | << read_point->GetReadTime().ToString(); | 357 | 901k | } | 358 | 9.54M | } |
|
359 | | |
360 | | template <class Req, class Resp> |
361 | 12.0M | AsyncRpcBase<Req, Resp>::~AsyncRpcBase() { |
362 | 12.0M | req_.release_tablet_id(); |
363 | 12.0M | } yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::~AsyncRpcBase() Line | Count | Source | 361 | 2.54M | AsyncRpcBase<Req, Resp>::~AsyncRpcBase() { | 362 | 2.54M | req_.release_tablet_id(); | 363 | 2.54M | } |
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::~AsyncRpcBase() Line | Count | Source | 361 | 9.50M | AsyncRpcBase<Req, Resp>::~AsyncRpcBase() { | 362 | 9.50M | req_.release_tablet_id(); | 363 | 9.50M | } |
|
364 | | |
365 | | template <class Req, class Resp> |
366 | 12.0M | bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) { |
367 | 12.0M | if (!status.ok()) { |
368 | 183k | return false; |
369 | 183k | } |
370 | 11.8M | if (resp_.has_error()) { |
371 | 0 | LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString() |
372 | 0 | << ". Requests not processed."; |
373 | | // If there is an error at the Rpc itself, there should be no individual responses. |
374 | | // All of them need to be marked as failed. |
375 | 0 | Failed(StatusFromPB(resp_.error().status())); |
376 | 0 | return false; |
377 | 0 | } |
378 | 11.8M | auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_); |
379 | 11.8M | if (restart_read_time) { |
380 | 1.87k | auto read_point = batcher_->read_point(); |
381 | 1.87k | if (read_point) { |
382 | 1.87k | read_point->RestartRequired(req_.tablet_id(), restart_read_time); |
383 | 1.87k | } |
384 | 1.87k | Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(), |
385 | 1.87k | TransactionError(TransactionErrorCode::kReadRestartRequired))); |
386 | 1.87k | return false; |
387 | 1.87k | } |
388 | 11.8M | auto local_limit_ht = resp_.local_limit_ht(); |
389 | 11.8M | if (local_limit_ht) { |
390 | 1.54M | auto read_point = batcher_->read_point(); |
391 | 1.54M | if (read_point) { |
392 | 1.54M | read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht)); |
393 | 1.54M | } |
394 | 1.54M | } |
395 | 11.8M | return true; |
396 | 11.8M | } yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::CommonResponseCheck(yb::Status const&) Line | Count | Source | 366 | 2.54M | bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) { | 367 | 2.54M | if (!status.ok()) { | 368 | 90.7k | return false; | 369 | 90.7k | } | 370 | 2.45M | if (resp_.has_error()) { | 371 | 0 | LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString() | 372 | 0 | << ". Requests not processed."; | 373 | | // If there is an error at the Rpc itself, there should be no individual responses. | 374 | | // All of them need to be marked as failed. | 375 | 0 | Failed(StatusFromPB(resp_.error().status())); | 376 | 0 | return false; | 377 | 0 | } | 378 | 2.45M | auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_); | 379 | 2.45M | if (restart_read_time) { | 380 | 0 | auto read_point = batcher_->read_point(); | 381 | 0 | if (read_point) { | 382 | 0 | read_point->RestartRequired(req_.tablet_id(), restart_read_time); | 383 | 0 | } | 384 | 0 | Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(), | 385 | 0 | TransactionError(TransactionErrorCode::kReadRestartRequired))); | 386 | 0 | return false; | 387 | 0 | } | 388 | 2.45M | auto local_limit_ht = resp_.local_limit_ht(); | 389 | 2.45M | if (local_limit_ht) { | 390 | 0 | auto read_point = batcher_->read_point(); | 391 | 0 | if (read_point) { | 392 | 0 | read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht)); | 393 | 0 | } | 394 | 0 | } | 395 | 2.45M | return true; | 396 | 2.45M | } |
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::CommonResponseCheck(yb::Status const&) Line | Count | Source | 366 | 9.50M | bool AsyncRpcBase<Req, Resp>::CommonResponseCheck(const Status& status) { | 367 | 9.50M | if (!status.ok()) { | 368 | 92.7k | return false; | 369 | 92.7k | } | 370 | 9.41M | if (resp_.has_error()) { | 371 | 0 | LOG(WARNING) << ToString() << " has error:" << resp_.error().DebugString() | 372 | 0 | << ". Requests not processed."; | 373 | | // If there is an error at the Rpc itself, there should be no individual responses. | 374 | | // All of them need to be marked as failed. | 375 | 0 | Failed(StatusFromPB(resp_.error().status())); | 376 | 0 | return false; | 377 | 0 | } | 378 | 9.41M | auto restart_read_time = ReadHybridTime::FromRestartReadTimePB(resp_); | 379 | 9.41M | if (restart_read_time) { | 380 | 1.87k | auto read_point = batcher_->read_point(); | 381 | 1.87k | if (read_point) { | 382 | 1.87k | read_point->RestartRequired(req_.tablet_id(), restart_read_time); | 383 | 1.87k | } | 384 | 1.87k | Failed(STATUS(TryAgain, Format("Restart read required at: $0", restart_read_time), Slice(), | 385 | 1.87k | TransactionError(TransactionErrorCode::kReadRestartRequired))); | 386 | 1.87k | return false; | 387 | 1.87k | } | 388 | 9.41M | auto local_limit_ht = resp_.local_limit_ht(); | 389 | 9.41M | if (local_limit_ht) { | 390 | 1.54M | auto read_point = batcher_->read_point(); | 391 | 1.54M | if (read_point) { | 392 | 1.54M | read_point->UpdateLocalLimit(req_.tablet_id(), HybridTime(local_limit_ht)); | 393 | 1.54M | } | 394 | 1.54M | } | 395 | 9.41M | return true; | 396 | 9.41M | } |
|
397 | | |
398 | | template <class Req, class Resp> |
399 | 12.1M | void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) { |
400 | 12.1M | if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) { |
401 | 35.7k | ConsistentReadPoint* read_point = batcher_->read_point(); |
402 | 35.7k | if (read_point && !read_point->GetReadTime()) { |
403 | 1 | auto txn = batcher_->transaction(); |
404 | | // If txn is not set, this is a consistent scan across multiple tablets of a |
405 | | // non-transactional YCQL table. |
406 | 1 | if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) { |
407 | 1 | read_point->SetCurrentReadTime(); |
408 | 1 | read_point->GetReadTime().AddToPB(&req_); |
409 | 1 | } |
410 | 1 | } |
411 | 35.7k | } |
412 | | |
413 | 12.1M | req_.set_rejection_score(batcher_->RejectionScore(attempt_num)); |
414 | 12.1M | AsyncRpc::SendRpcToTserver(attempt_num); |
415 | 12.1M | } yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::SendRpcToTserver(int) Line | Count | Source | 399 | 2.55M | void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) { | 400 | 2.55M | if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) { | 401 | 13.5k | ConsistentReadPoint* read_point = batcher_->read_point(); | 402 | 13.5k | if (read_point && !read_point->GetReadTime()) { | 403 | 1 | auto txn = batcher_->transaction(); | 404 | | // If txn is not set, this is a consistent scan across multiple tablets of a | 405 | | // non-transactional YCQL table. | 406 | 1 | if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) { | 407 | 1 | read_point->SetCurrentReadTime(); | 408 | 1 | read_point->GetReadTime().AddToPB(&req_); | 409 | 1 | } | 410 | 1 | } | 411 | 13.5k | } | 412 | | | 413 | 2.55M | req_.set_rejection_score(batcher_->RejectionScore(attempt_num)); | 414 | 2.55M | AsyncRpc::SendRpcToTserver(attempt_num); | 415 | 2.55M | } |
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::SendRpcToTserver(int) Line | Count | Source | 399 | 9.56M | void AsyncRpcBase<Req, Resp>::SendRpcToTserver(int attempt_num) { | 400 | 9.56M | if (!tablet_invoker_.current_ts().HasCapability(CAPABILITY_PickReadTimeAtTabletServer)) { | 401 | 22.2k | ConsistentReadPoint* read_point = batcher_->read_point(); | 402 | 22.2k | if (read_point && !read_point->GetReadTime()) { | 403 | 0 | auto txn = batcher_->transaction(); | 404 | | // If txn is not set, this is a consistent scan across multiple tablets of a | 405 | | // non-transactional YCQL table. | 406 | 0 | if (!txn || txn->isolation() == IsolationLevel::SNAPSHOT_ISOLATION) { | 407 | 0 | read_point->SetCurrentReadTime(); | 408 | 0 | read_point->GetReadTime().AddToPB(&req_); | 409 | 0 | } | 410 | 0 | } | 411 | 22.2k | } | 412 | | | 413 | 9.56M | req_.set_rejection_score(batcher_->RejectionScore(attempt_num)); | 414 | 9.56M | AsyncRpc::SendRpcToTserver(attempt_num); | 415 | 9.56M | } |
|
416 | | |
417 | | template <class Req, class Resp> |
418 | 12.0M | void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) { |
419 | 12.0M | TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false)); |
420 | 12.0M | if (resp_.has_trace_buffer()) { |
421 | 0 | TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer()); |
422 | 0 | } |
423 | 12.0M | NotifyBatcher(status); |
424 | 12.0M | if (!CommonResponseCheck(status)) { |
425 | 185k | return; |
426 | 185k | } |
427 | 11.8M | SwapResponses(); |
428 | 11.8M | } yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::ProcessResponseFromTserver(yb::Status const&) Line | Count | Source | 418 | 2.54M | void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) { | 419 | 2.54M | TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false)); | 420 | 2.54M | if (resp_.has_trace_buffer()) { | 421 | 0 | TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer()); | 422 | 0 | } | 423 | 2.54M | NotifyBatcher(status); | 424 | 2.54M | if (!CommonResponseCheck(status)) { | 425 | 90.7k | return; | 426 | 90.7k | } | 427 | 2.45M | SwapResponses(); | 428 | 2.45M | } |
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::ProcessResponseFromTserver(yb::Status const&) Line | Count | Source | 418 | 9.52M | void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) { | 419 | 9.52M | TRACE_TO(trace_, "ProcessResponseFromTserver($0)", status.ToString(false)); | 420 | 9.52M | if (resp_.has_trace_buffer()) { | 421 | 0 | TRACE_TO(trace_, "Received from server: \n BEGIN\n$0 END.", resp_.trace_buffer()); | 422 | 0 | } | 423 | 9.52M | NotifyBatcher(status); | 424 | 9.52M | if (!CommonResponseCheck(status)) { | 425 | 94.6k | return; | 426 | 94.6k | } | 427 | 9.42M | SwapResponses(); | 428 | 9.42M | } |
|
429 | | |
430 | | |
431 | | template <class Req, class Resp> |
432 | 12.0M | FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() { |
433 | 12.0M | return {GetPropagatedHybridTime(resp_), |
434 | 12.0M | resp_.has_used_read_time() ? ReadHybridTime::FromPB(resp_.used_read_time())8.04M |
435 | 12.0M | : ReadHybridTime()4.01M }; |
436 | 12.0M | } yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::MakeFlushExtraResult() Line | Count | Source | 432 | 2.54M | FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() { | 433 | 2.54M | return {GetPropagatedHybridTime(resp_), | 434 | 2.54M | resp_.has_used_read_time() ? ReadHybridTime::FromPB(resp_.used_read_time())175k | 435 | 2.54M | : ReadHybridTime()2.37M }; | 436 | 2.54M | } |
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::MakeFlushExtraResult() Line | Count | Source | 432 | 9.51M | FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() { | 433 | 9.51M | return {GetPropagatedHybridTime(resp_), | 434 | 9.51M | resp_.has_used_read_time() ? ReadHybridTime::FromPB(resp_.used_read_time())7.87M | 435 | 9.51M | : ReadHybridTime()1.64M }; | 436 | 9.51M | } |
|
437 | | |
438 | | template <class Op, class Req> |
439 | 4.56M | void HandleExtraFields(Op* op, Req* req) { |
440 | 4.56M | } void yb::client::internal::HandleExtraFields<yb::client::YBRedisWriteOp, yb::tserver::WriteRequestPB>(yb::client::YBRedisWriteOp*, yb::tserver::WriteRequestPB*) Line | Count | Source | 439 | 123k | void HandleExtraFields(Op* op, Req* req) { | 440 | 123k | } |
void yb::client::internal::HandleExtraFields<yb::client::YBRedisReadOp, yb::tserver::ReadRequestPB>(yb::client::YBRedisReadOp*, yb::tserver::ReadRequestPB*) Line | Count | Source | 439 | 84.8k | void HandleExtraFields(Op* op, Req* req) { | 440 | 84.8k | } |
void yb::client::internal::HandleExtraFields<yb::client::YBPgsqlReadOp, yb::tserver::ReadRequestPB>(yb::client::YBPgsqlReadOp*, yb::tserver::ReadRequestPB*) Line | Count | Source | 439 | 4.36M | void HandleExtraFields(Op* op, Req* req) { | 440 | 4.36M | } |
|
441 | | |
442 | 3.99M | void HandleExtraFields(YBqlWriteOp* op, tserver::WriteRequestPB* req) { |
443 | 3.99M | if (op->write_time_for_backfill()) { |
444 | 4.91k | req->set_external_hybrid_time(op->write_time_for_backfill().ToUint64()); |
445 | 4.91k | ReadHybridTime::SingleTime(op->write_time_for_backfill()).ToPB(req->mutable_read_time()); |
446 | 4.91k | } |
447 | 3.99M | } |
448 | | |
449 | 7.21M | void HandleExtraFields(YBPgsqlWriteOp* op, tserver::WriteRequestPB* req) { |
450 | 7.21M | if (op->write_time()) { |
451 | 1.26k | req->set_external_hybrid_time(op->write_time().ToUint64()); |
452 | 1.26k | } |
453 | 7.21M | } |
454 | | |
455 | 7.53M | void HandleExtraFields(YBqlReadOp* op, tserver::ReadRequestPB* req) { |
456 | 7.53M | if (op->read_time()) { |
457 | 316 | op->read_time().AddToPB(req); |
458 | 316 | } |
459 | 7.53M | } |
460 | | |
461 | | template <class OpType, class Req, class Out> |
462 | | void FillOps( |
463 | 12.0M | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { |
464 | 12.0M | out->Reserve(narrow_cast<int>(ops.size())); |
465 | 12.0M | size_t idx = 0; |
466 | 23.3M | for (auto& op : ops) { |
467 | 23.3M | CHECK_EQ(op.yb_op->type(), expected_type); |
468 | 23.3M | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); |
469 | 23.3M | out->AddAllocated(concrete_op->mutable_request()); |
470 | 23.3M | HandleExtraFields(concrete_op, req); |
471 | 23.3M | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString()11.3k ; |
472 | 23.3M | } |
473 | 12.0M | } void yb::client::internal::FillOps<yb::client::YBRedisWriteOp, yb::tserver::WriteRequestPB, google::protobuf::RepeatedPtrField<yb::RedisWriteRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::WriteRequestPB*, google::protobuf::RepeatedPtrField<yb::RedisWriteRequestPB>*) Line | Count | Source | 463 | 123k | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 123k | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 123k | size_t idx = 0; | 466 | 123k | for (auto& op : ops) { | 467 | 123k | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 123k | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 123k | out->AddAllocated(concrete_op->mutable_request()); | 470 | 123k | HandleExtraFields(concrete_op, req); | 471 | 123k | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString()0 ; | 472 | 123k | } | 473 | 123k | } |
void yb::client::internal::FillOps<yb::client::YBqlWriteOp, yb::tserver::WriteRequestPB, google::protobuf::RepeatedPtrField<yb::QLWriteRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::WriteRequestPB*, google::protobuf::RepeatedPtrField<yb::QLWriteRequestPB>*) Line | Count | Source | 463 | 1.72M | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 1.72M | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 1.72M | size_t idx = 0; | 466 | 3.99M | for (auto& op : ops) { | 467 | 3.99M | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 3.99M | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 3.99M | out->AddAllocated(concrete_op->mutable_request()); | 470 | 3.99M | HandleExtraFields(concrete_op, req); | 471 | 3.99M | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString()672 ; | 472 | 3.99M | } | 473 | 1.72M | } |
void yb::client::internal::FillOps<yb::client::YBPgsqlWriteOp, yb::tserver::WriteRequestPB, google::protobuf::RepeatedPtrField<yb::PgsqlWriteRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::WriteRequestPB*, google::protobuf::RepeatedPtrField<yb::PgsqlWriteRequestPB>*) Line | Count | Source | 463 | 693k | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 693k | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 693k | size_t idx = 0; | 466 | 7.21M | for (auto& op : ops) { | 467 | 7.21M | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 7.21M | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 7.21M | out->AddAllocated(concrete_op->mutable_request()); | 470 | 7.21M | HandleExtraFields(concrete_op, req); | 471 | 7.21M | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString()52 ; | 472 | 7.21M | } | 473 | 693k | } |
void yb::client::internal::FillOps<yb::client::YBRedisReadOp, yb::tserver::ReadRequestPB, google::protobuf::RepeatedPtrField<yb::RedisReadRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::ReadRequestPB*, google::protobuf::RepeatedPtrField<yb::RedisReadRequestPB>*) Line | Count | Source | 463 | 84.8k | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 84.8k | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 84.8k | size_t idx = 0; | 466 | 84.8k | for (auto& op : ops) { | 467 | 84.8k | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 84.8k | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 84.8k | out->AddAllocated(concrete_op->mutable_request()); | 470 | 84.8k | HandleExtraFields(concrete_op, req); | 471 | 84.8k | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString()0 ; | 472 | 84.8k | } | 473 | 84.8k | } |
void yb::client::internal::FillOps<yb::client::YBqlReadOp, yb::tserver::ReadRequestPB, google::protobuf::RepeatedPtrField<yb::QLReadRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::ReadRequestPB*, google::protobuf::RepeatedPtrField<yb::QLReadRequestPB>*) Line | Count | Source | 463 | 7.53M | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 7.53M | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 7.53M | size_t idx = 0; | 466 | 7.53M | for (auto& op : ops) { | 467 | 7.53M | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 7.53M | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 7.53M | out->AddAllocated(concrete_op->mutable_request()); | 470 | 7.53M | HandleExtraFields(concrete_op, req); | 471 | 7.53M | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString()10.7k ; | 472 | 7.53M | } | 473 | 7.53M | } |
void yb::client::internal::FillOps<yb::client::YBPgsqlReadOp, yb::tserver::ReadRequestPB, google::protobuf::RepeatedPtrField<yb::PgsqlReadRequestPB> >(boost::iterator_range<std::__1::__wrap_iter<yb::client::internal::InFlightOp*> > const&, yb::client::YBOperation::Type, yb::tserver::ReadRequestPB*, google::protobuf::RepeatedPtrField<yb::PgsqlReadRequestPB>*) Line | Count | Source | 463 | 1.91M | const InFlightOps& ops, YBOperation::Type expected_type, Req* req, Out* out) { | 464 | 1.91M | out->Reserve(narrow_cast<int>(ops.size())); | 465 | 1.91M | size_t idx = 0; | 466 | 4.36M | for (auto& op : ops) { | 467 | 4.36M | CHECK_EQ(op.yb_op->type(), expected_type); | 468 | 4.36M | auto* concrete_op = down_cast<OpType*>(op.yb_op.get()); | 469 | 4.36M | out->AddAllocated(concrete_op->mutable_request()); | 470 | 4.36M | HandleExtraFields(concrete_op, req); | 471 | 18.4E | VLOG(4) << ++idx << ") encoded row: " << op.yb_op->ToString(); | 472 | 4.36M | } | 473 | 1.91M | } |
|
474 | | |
475 | | template <class Repeated> |
476 | 36.2M | void ReleaseOps(Repeated* repeated) { |
477 | 36.2M | auto size = repeated->size(); |
478 | 36.2M | if (size) { |
479 | 12.0M | repeated->ExtractSubrange(0, size, nullptr); |
480 | 12.0M | } |
481 | 36.2M | } void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::RedisWriteRequestPB> >(google::protobuf::RepeatedPtrField<yb::RedisWriteRequestPB>*) Line | Count | Source | 476 | 2.54M | void ReleaseOps(Repeated* repeated) { | 477 | 2.54M | auto size = repeated->size(); | 478 | 2.54M | if (size) { | 479 | 123k | repeated->ExtractSubrange(0, size, nullptr); | 480 | 123k | } | 481 | 2.54M | } |
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::QLWriteRequestPB> >(google::protobuf::RepeatedPtrField<yb::QLWriteRequestPB>*) Line | Count | Source | 476 | 2.54M | void ReleaseOps(Repeated* repeated) { | 477 | 2.54M | auto size = repeated->size(); | 478 | 2.54M | if (size) { | 479 | 1.73M | repeated->ExtractSubrange(0, size, nullptr); | 480 | 1.73M | } | 481 | 2.54M | } |
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::PgsqlWriteRequestPB> >(google::protobuf::RepeatedPtrField<yb::PgsqlWriteRequestPB>*) Line | Count | Source | 476 | 2.54M | void ReleaseOps(Repeated* repeated) { | 477 | 2.54M | auto size = repeated->size(); | 478 | 2.54M | if (size) { | 479 | 693k | repeated->ExtractSubrange(0, size, nullptr); | 480 | 693k | } | 481 | 2.54M | } |
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::RedisReadRequestPB> >(google::protobuf::RepeatedPtrField<yb::RedisReadRequestPB>*) Line | Count | Source | 476 | 9.53M | void ReleaseOps(Repeated* repeated) { | 477 | 9.53M | auto size = repeated->size(); | 478 | 9.53M | if (size) { | 479 | 84.8k | repeated->ExtractSubrange(0, size, nullptr); | 480 | 84.8k | } | 481 | 9.53M | } |
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::QLReadRequestPB> >(google::protobuf::RepeatedPtrField<yb::QLReadRequestPB>*) Line | Count | Source | 476 | 9.52M | void ReleaseOps(Repeated* repeated) { | 477 | 9.52M | auto size = repeated->size(); | 478 | 9.52M | if (size) { | 479 | 7.51M | repeated->ExtractSubrange(0, size, nullptr); | 480 | 7.51M | } | 481 | 9.52M | } |
void yb::client::internal::ReleaseOps<google::protobuf::RepeatedPtrField<yb::PgsqlReadRequestPB> >(google::protobuf::RepeatedPtrField<yb::PgsqlReadRequestPB>*) Line | Count | Source | 476 | 9.50M | void ReleaseOps(Repeated* repeated) { | 477 | 9.50M | auto size = repeated->size(); | 478 | 9.50M | if (size) { | 479 | 1.90M | repeated->ExtractSubrange(0, size, nullptr); | 480 | 1.90M | } | 481 | 9.50M | } |
|
482 | | |
483 | | WriteRpc::WriteRpc(const AsyncRpcData& data) |
484 | 2.54M | : AsyncRpcBase(data, YBConsistencyLevel::STRONG) { |
485 | 2.54M | TRACE_TO(trace_, "WriteRpc initiated"); |
486 | 2.54M | VTRACE_TO(1, trace_, "Tablet $0 table $1", data.tablet->tablet_id(), table()->name().ToString()); |
487 | | |
488 | | // Add the rows |
489 | 2.54M | switch (table()->table_type()) { |
490 | 123k | case YBTableType::REDIS_TABLE_TYPE: |
491 | 123k | FillOps<YBRedisWriteOp>( |
492 | 123k | ops_, YBOperation::Type::REDIS_WRITE, &req_, req_.mutable_redis_write_batch()); |
493 | 123k | break; |
494 | 1.73M | case YBTableType::YQL_TABLE_TYPE: |
495 | 1.73M | FillOps<YBqlWriteOp>( |
496 | 1.73M | ops_, YBOperation::Type::QL_WRITE, &req_, req_.mutable_ql_write_batch()); |
497 | 1.73M | break; |
498 | 693k | case YBTableType::PGSQL_TABLE_TYPE: |
499 | 693k | FillOps<YBPgsqlWriteOp>( |
500 | 693k | ops_, YBOperation::Type::PGSQL_WRITE, &req_, req_.mutable_pgsql_write_batch()); |
501 | 693k | break; |
502 | 0 | case YBTableType::UNKNOWN_TABLE_TYPE: |
503 | 0 | case YBTableType::TRANSACTION_STATUS_TABLE_TYPE: |
504 | 0 | LOG(DFATAL) << "Unsupported table type: " << table()->ToString(); |
505 | 0 | break; |
506 | 2.54M | } |
507 | | |
508 | 2.54M | VLOG(3) << "Created batch for " << data.tablet->tablet_id() << ":\n" |
509 | 56 | << req_.ShortDebugString(); |
510 | | |
511 | 2.54M | const auto& client_id = batcher_->client_id(); |
512 | 2.54M | if (!client_id.IsNil() && FLAGS_detect_duplicates_for_retryable_requests2.54M ) { |
513 | 2.54M | auto temp = client_id.ToUInt64Pair(); |
514 | 2.54M | req_.set_client_id1(temp.first); |
515 | 2.54M | req_.set_client_id2(temp.second); |
516 | 2.54M | auto request_pair = batcher_->NextRequestIdAndMinRunningRequestId(data.tablet->tablet_id()); |
517 | 2.54M | req_.set_request_id(request_pair.first); |
518 | 2.54M | req_.set_min_running_request_id(request_pair.second); |
519 | 2.54M | } |
520 | 2.54M | } |
521 | | |
522 | 2.54M | WriteRpc::~WriteRpc() { |
523 | | // Check that we sent request id info, i.e. (client_id, request_id, min_running_request_id). |
524 | 2.54M | if (req_.has_client_id1()) { |
525 | 2.54M | batcher_->RequestFinished(tablet().tablet_id(), req_.request_id()); |
526 | 2.54M | } |
527 | | |
528 | 2.54M | if (async_rpc_metrics_) { |
529 | 2.51M | scoped_refptr<Histogram> write_rpc_time = IsLocalCall() ? |
530 | 1.81M | async_rpc_metrics_->local_write_rpc_time : |
531 | 2.51M | async_rpc_metrics_->remote_write_rpc_time701k ; |
532 | 2.51M | write_rpc_time->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_)); |
533 | 2.51M | } |
534 | | |
535 | 2.54M | ReleaseOps(req_.mutable_redis_write_batch()); |
536 | 2.54M | ReleaseOps(req_.mutable_ql_write_batch()); |
537 | 2.54M | ReleaseOps(req_.mutable_pgsql_write_batch()); |
538 | 2.54M | } |
539 | | |
540 | 2.55M | void WriteRpc::CallRemoteMethod() { |
541 | 2.55M | auto trace = trace_; // It is possible that we receive reply before returning from WriteAsync. |
542 | | // Since send happens before we return from WriteAsync. |
543 | | // So under heavy load it is possible that our request is handled and |
544 | | // reply is received before WriteAsync returned. |
545 | 2.55M | TRACE_TO(trace, "SendRpcToTserver"); |
546 | 2.55M | ADOPT_TRACE(trace.get()); |
547 | | |
548 | 2.55M | tablet_invoker_.WriteAsync(req_, &resp_, PrepareController(), |
549 | 2.55M | std::bind(&WriteRpc::Finished, this, Status::OK())); |
550 | 2.55M | TRACE_TO(trace, "RpcDispatched Asynchronously"); |
551 | 2.55M | } |
552 | | |
553 | 2.45M | void WriteRpc::SwapResponses() { |
554 | 2.45M | int redis_idx = 0; |
555 | 2.45M | int ql_idx = 0; |
556 | 2.45M | int pgsql_idx = 0; |
557 | | |
558 | | // Retrieve Redis and QL responses and make sure we received all the responses back. |
559 | 11.2M | for (auto& op : ops_) { |
560 | 11.2M | YBOperation* yb_op = op.yb_op.get(); |
561 | 11.2M | switch (yb_op->type()) { |
562 | 123k | case YBOperation::Type::REDIS_WRITE: { |
563 | 123k | if (redis_idx >= resp_.redis_response_batch().size()) { |
564 | 0 | ++redis_idx; |
565 | 0 | continue; |
566 | 0 | } |
567 | | // Restore Redis write request PB and extract response. |
568 | 123k | auto* redis_op = down_cast<YBRedisWriteOp*>(yb_op); |
569 | 123k | redis_op->mutable_response()->Swap(resp_.mutable_redis_response_batch(redis_idx)); |
570 | 123k | redis_idx++; |
571 | 123k | break; |
572 | 123k | } |
573 | 3.93M | case YBOperation::Type::QL_WRITE: { |
574 | 3.93M | if (ql_idx >= resp_.ql_response_batch().size()) { |
575 | 0 | ++ql_idx; |
576 | 0 | continue; |
577 | 0 | } |
578 | | // Restore QL write request PB and extract response. |
579 | 3.93M | auto* ql_op = down_cast<YBqlWriteOp*>(yb_op); |
580 | 3.93M | ql_op->mutable_response()->Swap(resp_.mutable_ql_response_batch(ql_idx)); |
581 | 3.93M | const auto& ql_response = ql_op->response(); |
582 | 3.93M | if (ql_response.has_rows_data_sidecar()) { |
583 | 273 | Slice rows_data = CHECK_RESULT( |
584 | 273 | retrier().controller().GetSidecar(ql_response.rows_data_sidecar())); |
585 | 273 | ql_op->mutable_rows_data()->assign(rows_data.cdata(), rows_data.size()); |
586 | 273 | } |
587 | 3.93M | ql_idx++; |
588 | 3.93M | break; |
589 | 3.93M | } |
590 | 7.15M | case YBOperation::Type::PGSQL_WRITE: { |
591 | 7.15M | if (pgsql_idx >= resp_.pgsql_response_batch().size()) { |
592 | 0 | ++pgsql_idx; |
593 | 0 | continue; |
594 | 0 | } |
595 | | // Restore QL write request PB and extract response. |
596 | 7.15M | auto* pgsql_op = down_cast<YBPgsqlWriteOp*>(yb_op); |
597 | 7.15M | pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_response_batch(pgsql_idx)); |
598 | 7.15M | const auto& pgsql_response = pgsql_op->response(); |
599 | 7.15M | if (pgsql_response.has_rows_data_sidecar()) { |
600 | 7.14M | auto holder = CHECK_RESULT( |
601 | 7.14M | retrier().controller().GetSidecarHolder(pgsql_response.rows_data_sidecar())); |
602 | 7.14M | down_cast<YBPgsqlWriteOp*>(yb_op)->SetRowsData(holder.first, holder.second); |
603 | 7.14M | } |
604 | 7.15M | pgsql_idx++; |
605 | 7.15M | break; |
606 | 7.15M | } |
607 | 0 | case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED; |
608 | 0 | case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED; |
609 | 0 | case YBOperation::Type::QL_READ: |
610 | 0 | LOG(FATAL) << "Not a write operation " << op.yb_op->type(); |
611 | 0 | break; |
612 | 11.2M | } |
613 | 11.2M | } |
614 | | |
615 | 2.45M | if (redis_idx != resp_.redis_response_batch().size() || |
616 | 2.45M | ql_idx != resp_.ql_response_batch().size()2.45M || |
617 | 2.45M | pgsql_idx != resp_.pgsql_response_batch().size()2.45M ) { |
618 | 0 | LOG(ERROR) << Substitute("Write response count mismatch: " |
619 | 0 | "$0 Redis requests sent, $1 responses received. " |
620 | 0 | "$2 Apache CQL requests sent, $3 responses received. " |
621 | 0 | "$4 PostgreSQL requests sent, $5 responses received.", |
622 | 0 | redis_idx, resp_.redis_response_batch().size(), |
623 | 0 | ql_idx, resp_.ql_response_batch().size(), |
624 | 0 | pgsql_idx, resp_.pgsql_response_batch().size()); |
625 | 0 | auto status = STATUS(IllegalState, "Write response count mismatch"); |
626 | 0 | LOG(ERROR) << status << ", request: " << req_.ShortDebugString() |
627 | 0 | << ", response: " << resp_.ShortDebugString(); |
628 | 0 | batcher_->AddOpCountMismatchError(); |
629 | 0 | Failed(status); |
630 | 0 | } |
631 | 2.45M | } |
632 | | |
633 | 2.54M | void WriteRpc::NotifyBatcher(const Status& status) { |
634 | 2.54M | batcher_->ProcessWriteResponse(*this, status); |
635 | 2.54M | } |
636 | | |
637 | 0 | bool WriteRpc::ShouldRetryExpiredRequest() { |
638 | 0 | return req_.min_running_request_id() == kInitializeFromMinRunning; |
639 | 0 | } |
640 | | |
641 | | ReadRpc::ReadRpc(const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level) |
642 | 9.54M | : AsyncRpcBase(data, yb_consistency_level) { |
643 | 9.54M | TRACE_TO(trace_, "ReadRpc initiated"); |
644 | 9.54M | VTRACE_TO(1, trace_, "Tablet $0 table $1", data.tablet->tablet_id(), table()->name().ToString()); |
645 | 9.54M | req_.set_consistency_level(yb_consistency_level); |
646 | 9.54M | req_.set_proxy_uuid(data.batcher->proxy_uuid()); |
647 | | |
648 | 9.54M | switch (table()->table_type()) { |
649 | 84.8k | case YBTableType::REDIS_TABLE_TYPE: |
650 | 84.8k | FillOps<YBRedisReadOp>( |
651 | 84.8k | ops_, YBOperation::Type::REDIS_READ, &req_, req_.mutable_redis_batch()); |
652 | 84.8k | break; |
653 | 7.54M | case YBTableType::YQL_TABLE_TYPE: |
654 | 7.54M | FillOps<YBqlReadOp>( |
655 | 7.54M | ops_, YBOperation::Type::QL_READ, &req_, req_.mutable_ql_batch()); |
656 | 7.54M | break; |
657 | 1.91M | case YBTableType::PGSQL_TABLE_TYPE: |
658 | 1.91M | FillOps<YBPgsqlReadOp>( |
659 | 1.91M | ops_, YBOperation::Type::PGSQL_READ, &req_, req_.mutable_pgsql_batch()); |
660 | 1.91M | break; |
661 | 0 | case YBTableType::UNKNOWN_TABLE_TYPE: |
662 | 0 | case YBTableType::TRANSACTION_STATUS_TABLE_TYPE: |
663 | 0 | LOG(DFATAL) << "Unsupported table type: " << table()->ToString(); |
664 | 0 | break; |
665 | 9.54M | } |
666 | | |
667 | 18.4E | VLOG(3) << "Created batch for " << data.tablet->tablet_id() << ":\n" |
668 | 18.4E | << req_.ShortDebugString(); |
669 | 9.52M | } |
670 | | |
671 | 9.52M | ReadRpc::~ReadRpc() { |
672 | | // Get locality metrics if enabled, but skip for system tables as those go to the master. |
673 | 9.52M | if (async_rpc_metrics_ && !table()->name().is_system()9.51M ) { |
674 | 9.28M | scoped_refptr<Histogram> read_rpc_time = IsLocalCall() ? |
675 | 7.41M | async_rpc_metrics_->local_read_rpc_time : |
676 | 9.28M | async_rpc_metrics_->remote_read_rpc_time1.86M ; |
677 | | |
678 | 9.28M | read_rpc_time->Increment(ToMicroseconds(CoarseMonoClock::Now() - start_)); |
679 | 9.28M | } |
680 | | |
681 | 9.52M | ReleaseOps(req_.mutable_redis_batch()); |
682 | 9.52M | ReleaseOps(req_.mutable_ql_batch()); |
683 | 9.52M | ReleaseOps(req_.mutable_pgsql_batch()); |
684 | 9.52M | } |
685 | | |
686 | 9.56M | void ReadRpc::CallRemoteMethod() { |
687 | 9.56M | auto trace = trace_; // It is possible that we receive reply before returning from ReadAsync. |
688 | | // Detailed explanation in WriteRpc::SendRpcToTserver. |
689 | 9.56M | TRACE_TO(trace, "SendRpcToTserver"); |
690 | 9.56M | ADOPT_TRACE(trace.get()); |
691 | | |
692 | 9.56M | tablet_invoker_.ReadAsync(req_, &resp_, PrepareController(), |
693 | 9.56M | std::bind(&ReadRpc::Finished, this, Status::OK())); |
694 | 9.56M | TRACE_TO(trace, "RpcDispatched Asynchronously"); |
695 | 9.56M | } |
696 | | |
697 | 9.40M | void ReadRpc::SwapResponses() { |
698 | 9.40M | int redis_idx = 0; |
699 | 9.40M | int ql_idx = 0; |
700 | 9.40M | int pgsql_idx = 0; |
701 | | |
702 | | // Retrieve Redis and QL responses and make sure we received all the responses back. |
703 | 11.0M | for (auto& op : ops_) { |
704 | 11.0M | YBOperation* yb_op = op.yb_op.get(); |
705 | 11.0M | switch (yb_op->type()) { |
706 | 84.6k | case YBOperation::Type::REDIS_READ: { |
707 | 84.6k | if (redis_idx >= resp_.redis_batch().size()) { |
708 | 0 | batcher_->AddOpCountMismatchError(); |
709 | 0 | return; |
710 | 0 | } |
711 | | // Restore Redis read request PB and extract response. |
712 | 84.6k | auto* redis_op = down_cast<YBRedisReadOp*>(yb_op); |
713 | 84.6k | redis_op->mutable_response()->Swap(resp_.mutable_redis_batch(redis_idx)); |
714 | 84.6k | redis_idx++; |
715 | 84.6k | break; |
716 | 84.6k | } |
717 | 7.52M | case YBOperation::Type::QL_READ: { |
718 | 7.52M | if (ql_idx >= resp_.ql_batch().size()) { |
719 | 0 | batcher_->AddOpCountMismatchError(); |
720 | 0 | return; |
721 | 0 | } |
722 | | // Restore QL read request PB and extract response. |
723 | 7.52M | auto* ql_op = down_cast<YBqlReadOp*>(yb_op); |
724 | 7.52M | ql_op->mutable_response()->Swap(resp_.mutable_ql_batch(ql_idx)); |
725 | 7.52M | const auto& ql_response = ql_op->response(); |
726 | 7.52M | if (ql_response.has_rows_data_sidecar()) { |
727 | 7.50M | Slice rows_data = CHECK_RESULT(retrier().controller().GetSidecar( |
728 | 7.50M | ql_response.rows_data_sidecar())); |
729 | 7.50M | ql_op->mutable_rows_data()->assign(rows_data.cdata(), rows_data.size()); |
730 | 7.50M | } |
731 | 7.52M | ql_idx++; |
732 | 7.52M | break; |
733 | 7.52M | } |
734 | 3.47M | case YBOperation::Type::PGSQL_READ: { |
735 | 3.47M | if (pgsql_idx >= resp_.pgsql_batch().size()) { |
736 | 0 | batcher_->AddOpCountMismatchError(); |
737 | 0 | return; |
738 | 0 | } |
739 | | // Restore PGSQL read request PB and extract response. |
740 | 3.47M | auto* pgsql_op = down_cast<YBPgsqlReadOp*>(yb_op); |
741 | 3.47M | if (resp_.has_used_read_time()) { |
742 | 1.95M | pgsql_op->SetUsedReadTime(ReadHybridTime::FromPB(resp_.used_read_time())); |
743 | 1.95M | } |
744 | 3.47M | pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_batch(pgsql_idx)); |
745 | 3.47M | const auto& pgsql_response = pgsql_op->response(); |
746 | 3.47M | if (pgsql_response.has_rows_data_sidecar()3.47M ) { |
747 | 3.47M | auto holder = CHECK_RESULT( |
748 | 3.47M | retrier().controller().GetSidecarHolder(pgsql_response.rows_data_sidecar())); |
749 | 3.47M | down_cast<YBPgsqlReadOp*>(yb_op)->SetRowsData(holder.first, holder.second); |
750 | 3.47M | } |
751 | 3.47M | pgsql_idx++; |
752 | 3.47M | break; |
753 | 3.47M | } |
754 | 0 | case YBOperation::Type::PGSQL_WRITE: FALLTHROUGH_INTENDED; |
755 | 0 | case YBOperation::Type::REDIS_WRITE: FALLTHROUGH_INTENDED; |
756 | 0 | case YBOperation::Type::QL_WRITE: |
757 | 0 | LOG(FATAL) << "Not a read operation " << op.yb_op->type(); |
758 | 0 | break; |
759 | 11.0M | } |
760 | 11.0M | } |
761 | | |
762 | 9.43M | if (redis_idx != resp_.redis_batch().size() || |
763 | 9.43M | ql_idx != resp_.ql_batch().size()9.41M || |
764 | 9.43M | pgsql_idx != resp_.pgsql_batch().size()9.40M ) { |
765 | 0 | LOG(ERROR) << Substitute("Read response count mismatch: " |
766 | 0 | "$0 Redis requests sent, $1 responses received. " |
767 | 0 | "$2 QL requests sent, $3 responses received. " |
768 | 0 | "$4 QL requests sent, $5 responses received.", |
769 | 0 | redis_idx, resp_.redis_batch().size(), |
770 | 0 | ql_idx, resp_.ql_batch().size(), |
771 | 0 | pgsql_idx, resp_.pgsql_batch().size()); |
772 | 0 | batcher_->AddOpCountMismatchError(); |
773 | 0 | Failed(STATUS(IllegalState, "Read response count mismatch")); |
774 | 0 | } |
775 | | |
776 | 9.43M | } |
777 | | |
778 | 9.54M | void ReadRpc::NotifyBatcher(const Status& status) { |
779 | 9.54M | batcher_->ProcessReadResponse(*this, status); |
780 | 9.54M | } |
781 | | |
782 | | } // namespace internal |
783 | | } // namespace client |
784 | | } // namespace yb |