/Users/deen/code/yugabyte-db/src/yb/tserver/read_query.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tserver/read_query.h" |
15 | | |
16 | | #include "yb/common/row_mark.h" |
17 | | #include "yb/common/transaction.h" |
18 | | |
19 | | #include "yb/gutil/bind.h" |
20 | | |
21 | | #include "yb/tablet/operations/write_operation.h" |
22 | | #include "yb/tablet/read_result.h" |
23 | | #include "yb/tablet/tablet.h" |
24 | | #include "yb/tablet/tablet_metadata.h" |
25 | | #include "yb/tablet/tablet_metrics.h" |
26 | | #include "yb/tablet/transaction_participant.h" |
27 | | #include "yb/tablet/write_query.h" |
28 | | |
29 | | #include "yb/tserver/service_util.h" |
30 | | #include "yb/tserver/tablet_server_interface.h" |
31 | | #include "yb/tserver/ts_tablet_manager.h" |
32 | | #include "yb/tserver/tserver.pb.h" |
33 | | |
34 | | #include "yb/util/countdown_latch.h" |
35 | | #include "yb/util/debug/trace_event.h" |
36 | | #include "yb/util/flag_tags.h" |
37 | | #include "yb/util/metrics.h" |
38 | | #include "yb/util/scope_exit.h" |
39 | | #include "yb/util/trace.h" |
40 | | |
41 | | using namespace std::literals; |
42 | | |
43 | | DEFINE_test_flag(int32, transactional_read_delay_ms, 0, |
44 | | "Amount of time to delay between transaction status check and reading start."); |
45 | | |
46 | | DEFINE_test_flag(int32, simulate_time_out_failures_msecs, 0, "If greater than 0, we will randomly " |
47 | | "mark read requests as timed out and sleep for the specificed amount of time by " |
48 | | "this flag to simulate time out failures. The requester will mark the timed out " |
49 | | "replica as failed, and its periodic refresh mechanism for the lookup cache will " |
50 | | "mark them as available."); |
51 | | |
52 | | DEFINE_test_flag(bool, assert_reads_served_by_follower, false, "If set, we verify that the " |
53 | | "consistency level is CONSISTENT_PREFIX, and that this server is not the leader " |
54 | | "for the tablet"); |
55 | | |
56 | | DEFINE_bool(parallelize_read_ops, true, |
57 | | "Controls whether multiple (Redis) read ops that are present in a operation " |
58 | | "should be executed in parallel."); |
59 | | TAG_FLAG(parallelize_read_ops, advanced); |
60 | | TAG_FLAG(parallelize_read_ops, runtime); |
61 | | |
62 | | namespace yb { |
63 | | namespace tserver { |
64 | | |
65 | | namespace { |
66 | | |
67 | | void HandleRedisReadRequestAsync( |
68 | | tablet::AbstractTablet* tablet, |
69 | | CoarseTimePoint deadline, |
70 | | const ReadHybridTime& read_time, |
71 | | const RedisReadRequestPB& redis_read_request, |
72 | | RedisResponsePB* response, |
73 | | const std::function<void(const Status& s)>& status_cb |
74 | 84.6k | ) { |
75 | 84.6k | status_cb(tablet->HandleRedisReadRequest(deadline, read_time, redis_read_request, response)); |
76 | 84.6k | } |
77 | | |
78 | | class ReadQuery : public std::enable_shared_from_this<ReadQuery>, public rpc::ThreadPoolTask { |
79 | | public: |
80 | | ReadQuery( |
81 | | TabletServerIf* server, ReadTabletProvider* read_tablet_provider, const ReadRequestPB* req, |
82 | | ReadResponsePB* resp, rpc::RpcContext context) |
83 | | : server_(*server), read_tablet_provider_(*read_tablet_provider), req_(req), resp_(resp), |
84 | 9.56M | context_(std::move(context)) {} |
85 | | |
86 | 9.56M | void Perform() { |
87 | 9.56M | RespondIfFailed(DoPerform()); |
88 | 9.56M | } |
89 | | |
90 | 10.0M | void RespondIfFailed(const Status& status) { |
91 | 10.0M | if (!status.ok()) { |
92 | 23.4k | RespondFailure(status); |
93 | 23.4k | } |
94 | 10.0M | } |
95 | | |
96 | 115k | void RespondFailure(const Status& status) { |
97 | 115k | SetupErrorAndRespond(resp_->mutable_error(), status, &context_); |
98 | 115k | } |
99 | | |
100 | 9.54M | virtual ~ReadQuery() = default; |
101 | | |
102 | | private: |
103 | | CHECKED_STATUS DoPerform(); |
104 | | |
105 | | // Picks read based for specified read context. |
106 | | CHECKED_STATUS DoPickReadTime(server::Clock* clock); |
107 | | |
108 | | bool transactional() const; |
109 | | |
110 | | tablet::Tablet* tablet() const; |
111 | | |
112 | | ReadHybridTime FormRestartReadHybridTime(const HybridTime& restart_time) const; |
113 | | |
114 | | CHECKED_STATUS PickReadTime(server::Clock* clock); |
115 | | |
116 | | bool IsForBackfill() const; |
117 | | |
118 | | // Read implementation. If restart is required returns restart time, in case of success |
119 | | // returns invalid ReadHybridTime. Otherwise returns error status. |
120 | | Result<ReadHybridTime> DoRead(); |
121 | | Result<ReadHybridTime> DoReadImpl(); |
122 | | |
123 | | CHECKED_STATUS Complete(); |
124 | | |
125 | | void UpdateConsistentPrefixMetrics(); |
126 | | |
127 | | // Used when we write intents during read, i.e. for serializable isolation. |
128 | | // We cannot proceed with read from completion callback, to avoid holding |
129 | | // replica state lock for too long. |
130 | | // So ThreadPool is used to proceed with read. |
131 | 222k | void Run() override { |
132 | 222k | auto status = PickReadTime(server_.Clock()); |
133 | 222k | if (status.ok()) { |
134 | 222k | status = Complete(); |
135 | 222k | } |
136 | 222k | RespondIfFailed(status); |
137 | 222k | } |
138 | | |
139 | 222k | void Done(const Status& status) override { |
140 | 222k | RespondIfFailed(status); |
141 | 222k | retained_self_ = nullptr; |
142 | 222k | } |
143 | | |
144 | | TabletServerIf& server_; |
145 | | ReadTabletProvider& read_tablet_provider_; |
146 | | const ReadRequestPB* req_; |
147 | | ReadResponsePB* resp_; |
148 | | rpc::RpcContext context_; |
149 | | |
150 | | std::shared_ptr<tablet::AbstractTablet> abstract_tablet_; |
151 | | |
152 | | ReadHybridTime read_time_; |
153 | | HybridTime safe_ht_to_read_; |
154 | | ReadHybridTime used_read_time_; |
155 | | tablet::RequireLease require_lease_ = tablet::RequireLease::kFalse; |
156 | | HostPortPB host_port_pb_; |
157 | | bool allow_retry_ = false; |
158 | | RequestScope request_scope_; |
159 | | std::shared_ptr<ReadQuery> retained_self_; |
160 | | }; |
161 | | |
162 | 17.4M | bool ReadQuery::transactional() const { |
163 | 17.4M | return abstract_tablet_->IsTransactionalRequest(req_->pgsql_batch_size() > 0); |
164 | 17.4M | } |
165 | | |
166 | 11.5M | tablet::Tablet* ReadQuery::tablet() const { |
167 | 11.5M | return down_cast<tablet::Tablet*>(abstract_tablet_.get()); |
168 | 11.5M | } |
169 | | |
170 | 1.87k | ReadHybridTime ReadQuery::FormRestartReadHybridTime(const HybridTime& restart_time) const { |
171 | 1.87k | DCHECK_GT(restart_time, read_time_.read); |
172 | 18.4E | VLOG(1) << "Restart read required at: " << restart_time << ", original: " << read_time_; |
173 | 1.87k | auto result = read_time_; |
174 | 1.87k | result.read = std::min(std::max(restart_time, safe_ht_to_read_), read_time_.global_limit); |
175 | 1.87k | result.local_limit = std::min(safe_ht_to_read_, read_time_.global_limit); |
176 | 1.87k | return result; |
177 | 1.87k | } |
178 | | |
179 | 9.45M | CHECKED_STATUS ReadQuery::PickReadTime(server::Clock* clock) { |
180 | 9.45M | auto result = DoPickReadTime(clock); |
181 | 9.45M | if (!result.ok()) { |
182 | 0 | TRACE(result.ToString()); |
183 | 0 | } |
184 | 9.45M | return result; |
185 | 9.45M | } |
186 | | |
187 | 9.45M | bool ReadQuery::IsForBackfill() const { |
188 | 9.45M | if (req_->pgsql_batch_size() > 0) { |
189 | 1.81M | if (req_->pgsql_batch(0).is_for_backfill()) { |
190 | | // Currently, read requests for backfill should only come by themselves, not in batches. |
191 | 2.33k | DCHECK_EQ(req_->pgsql_batch_size(), 1); |
192 | 2.33k | return true; |
193 | 2.33k | } |
194 | 1.81M | } |
195 | | // YCQL doesn't send read RPCs for scanning the indexed table and instead directly reads using |
196 | | // iterator, so there's no equivalent logic for YCQL here. |
197 | 9.45M | return false; |
198 | 9.45M | } |
199 | | |
200 | 9.55M | CHECKED_STATUS ReadQuery::DoPerform() { |
201 | 9.55M | TRACE("Start Read"); |
202 | 9.55M | TRACE_EVENT1("tserver", "TabletServiceImpl::Read", "tablet_id", req_->tablet_id()); |
203 | 9.55M | VLOG(2) << "Received Read RPC: " << req_->DebugString()2.69k ; |
204 | | // Unfortunately, determining the isolation level is not as straightforward as it seems. All but |
205 | | // the first request to a given tablet by a particular transaction assume that the tablet already |
206 | | // has the transaction metadata, including the isolation level, and those requests expect us to |
207 | | // retrieve the isolation level from that metadata. Failure to do so was the cause of a |
208 | | // serialization anomaly tested by TestOneOrTwoAdmins |
209 | | // (https://github.com/yugabyte/yugabyte-db/issues/1572). |
210 | | |
211 | 9.55M | bool serializable_isolation = false; |
212 | 9.55M | TabletPeerTablet peer_tablet; |
213 | 9.55M | if (req_->has_transaction()) { |
214 | 901k | IsolationLevel isolation_level; |
215 | 901k | if (req_->transaction().has_isolation()) { |
216 | | // This must be the first request to this tablet by this particular transaction. |
217 | 531k | isolation_level = req_->transaction().isolation(); |
218 | 531k | } else { |
219 | 370k | peer_tablet = VERIFY_RESULT(LookupTabletPeer( |
220 | 0 | server_.tablet_peer_lookup(), req_->tablet_id())); |
221 | 369k | isolation_level = VERIFY_RESULT(peer_tablet.tablet->GetIsolationLevelFromPB(*req_)); |
222 | 369k | } |
223 | 901k | serializable_isolation = isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION; |
224 | | |
225 | 901k | if (PREDICT_FALSE(FLAGS_TEST_transactional_read_delay_ms > 0)) { |
226 | 0 | LOG(INFO) << "Delaying transactional read for " |
227 | 0 | << FLAGS_TEST_transactional_read_delay_ms << " ms."; |
228 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_transactional_read_delay_ms)); |
229 | 0 | } |
230 | | |
231 | | #if defined(DUMP_READ) |
232 | | if (req->pgsql_batch().size() > 0) { |
233 | | LOG(INFO) << CHECK_RESULT(FullyDecodeTransactionId(req->transaction().transaction_id())) |
234 | | << " READ: " << req->pgsql_batch(0).partition_column_values(0).value().int32_value() |
235 | | << ", " << isolation_level; |
236 | | } |
237 | | #endif |
238 | 901k | } |
239 | | |
240 | | // Get the most restrictive row mark present in the batch of PostgreSQL requests. |
241 | | // TODO: rather handle individual row marks once we start batching read requests (issue #2495) |
242 | 9.55M | RowMarkType batch_row_mark = RowMarkType::ROW_MARK_ABSENT; |
243 | 9.55M | if (!req_->pgsql_batch().empty()) { |
244 | 1.91M | uint64_t last_breaking_catalog_version = 0; // unset. |
245 | 4.36M | for (const auto& pg_req : req_->pgsql_batch()) { |
246 | | // For postgres requests check that the syscatalog version matches. |
247 | 4.36M | if (pg_req.has_ysql_catalog_version()) { |
248 | 3.01M | if (last_breaking_catalog_version == 0) { |
249 | | // Initialize last breaking version if not yet set. |
250 | 565k | server_.get_ysql_catalog_version( |
251 | 565k | nullptr /* current_version */, &last_breaking_catalog_version); |
252 | 565k | } |
253 | 3.01M | if (pg_req.ysql_catalog_version() < last_breaking_catalog_version) { |
254 | 7 | return STATUS( |
255 | 7 | QLError, "The catalog snapshot used for this transaction has been invalidated", |
256 | 7 | TabletServerError(TabletServerErrorPB::MISMATCHED_SCHEMA)); |
257 | 7 | } |
258 | 3.01M | } |
259 | 4.36M | RowMarkType current_row_mark = GetRowMarkTypeFromPB(pg_req); |
260 | 4.36M | if (IsValidRowMarkType(current_row_mark)) { |
261 | 10.8k | if (!req_->has_transaction()) { |
262 | 0 | return STATUS( |
263 | 0 | NotSupported, "Read request with row mark types must be part of a transaction", |
264 | 0 | TabletServerError(TabletServerErrorPB::OPERATION_NOT_SUPPORTED)); |
265 | 0 | } |
266 | 10.8k | batch_row_mark = GetStrongestRowMarkType({current_row_mark, batch_row_mark}); |
267 | 10.8k | } |
268 | 4.36M | } |
269 | 1.91M | } |
270 | 9.55M | const bool has_row_mark = IsValidRowMarkType(batch_row_mark); |
271 | | |
272 | 9.55M | LeaderTabletPeer leader_peer; |
273 | | |
274 | 9.55M | if (serializable_isolation || has_row_mark9.23M ) { |
275 | | // At this point we expect that we don't have pure read serializable transactions, and |
276 | | // always write read intents to detect conflicts with other writes. |
277 | 314k | leader_peer = VERIFY_RESULT314k (314k LookupLeaderTablet( |
278 | 0 | server_.tablet_peer_lookup(), req_->tablet_id(), std::move(peer_tablet))); |
279 | | // Serializable read adds intents, i.e. writes data. |
280 | | // We should check for memory pressure in this case. |
281 | 314k | RETURN_NOT_OK(CheckWriteThrottling(req_->rejection_score(), leader_peer.peer.get())); |
282 | 314k | abstract_tablet_ = leader_peer.peer->shared_tablet(); |
283 | 9.23M | } else { |
284 | 9.23M | abstract_tablet_ = VERIFY_RESULT9.21M (9.21M read_tablet_provider_.GetTabletForRead( |
285 | 0 | req_->tablet_id(), std::move(peer_tablet.tablet_peer), |
286 | 0 | req_->consistency_level(), AllowSplitTablet::kFalse)); |
287 | 0 | leader_peer.leader_term = OpId::kUnknownTerm; |
288 | 9.21M | } |
289 | | |
290 | 9.52M | if (PREDICT_FALSE(FLAGS_TEST_assert_reads_served_by_follower)) { |
291 | 4.00k | if (req_->consistency_level() == YBConsistencyLevel::STRONG) { |
292 | 0 | LOG(FATAL) << "--TEST_assert_reads_served_by_follower is true but consistency level is " |
293 | 0 | "invalid: YBConsistencyLevel::STRONG"; |
294 | 0 | } |
295 | 4.00k | tablet::TabletPeerPtr tablet_peer; |
296 | 4.00k | RETURN_NOT_OK(server_.tablet_peer_lookup()->GetTabletPeer(req_->tablet_id(), &tablet_peer)); |
297 | 4.00k | if (CheckPeerIsLeader(*tablet_peer).ok()) { |
298 | 0 | LOG(FATAL) << "--TEST_assert_reads_served_by_follower is true but read is being served by " |
299 | 0 | << " peer " << tablet_peer->permanent_uuid() |
300 | 0 | << " which is the leader for tablet " << req_->tablet_id(); |
301 | 0 | } |
302 | 4.00k | } |
303 | | |
304 | 9.52M | if (!abstract_tablet_->system() && tablet()->metadata()->hidden()9.24M ) { |
305 | 0 | return STATUS(NotFound, "Tablet not found", req_->tablet_id()); |
306 | 0 | } |
307 | | |
308 | 9.52M | if (FLAGS_TEST_simulate_time_out_failures_msecs > 0 && RandomUniformInt(0, 10) < 2999 ) { |
309 | 199 | LOG(INFO) << "Marking request as timed out for test: " << req_->ShortDebugString(); |
310 | 199 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_simulate_time_out_failures_msecs)); |
311 | 199 | return STATUS(TimedOut, "timed out for test"); |
312 | 199 | } |
313 | | |
314 | 9.52M | if (server_.Clock()) { |
315 | 9.52M | server::UpdateClock(*req_, server_.Clock()); |
316 | 9.52M | } |
317 | | |
318 | | // safe_ht_to_read is used only for read restart, so if read_time is valid, then we would respond |
319 | | // with "restart required". |
320 | 9.52M | read_time_ = ReadHybridTime::FromReadTimePB(*req_); |
321 | | |
322 | 9.52M | allow_retry_ = !read_time_; |
323 | 9.52M | require_lease_ = tablet::RequireLease(req_->consistency_level() == YBConsistencyLevel::STRONG); |
324 | | // TODO: should check all the tables referenced by the requests to decide if it is transactional. |
325 | 9.52M | const bool transactional = this->transactional(); |
326 | | // Should not pick read time for serializable isolation, since it is picked after read intents |
327 | | // are added. Also conflict resolution for serializable isolation should be done without read time |
328 | | // specified. So we use max hybrid time for conflict resolution in such case. |
329 | | // It was implemented as part of #655. |
330 | 9.52M | if (!serializable_isolation) { |
331 | 9.23M | RETURN_NOT_OK(PickReadTime(server_.Clock())); |
332 | 9.23M | } |
333 | | |
334 | 9.52M | if (transactional) { |
335 | | // Serial number is used to check whether this operation was initiated before |
336 | | // transaction status request. So we should initialize it as soon as possible. |
337 | 1.90M | request_scope_ = RequestScope(tablet()->transaction_participant()); |
338 | 1.90M | read_time_.serial_no = request_scope_.request_id(); |
339 | 1.90M | } |
340 | | |
341 | 9.52M | const auto& remote_address = context_.remote_address(); |
342 | 9.52M | host_port_pb_.set_host(remote_address.address().to_string()); |
343 | 9.52M | host_port_pb_.set_port(remote_address.port()); |
344 | | |
345 | 9.52M | if (serializable_isolation || has_row_mark9.20M ) { |
346 | 314k | auto deadline = context_.GetClientDeadline(); |
347 | 314k | auto query = std::make_unique<tablet::WriteQuery>( |
348 | 314k | leader_peer.leader_term, deadline, leader_peer.peer.get(), |
349 | 314k | leader_peer.peer->tablet(), nullptr /* response */, |
350 | 314k | docdb::OperationKind::kRead); |
351 | | |
352 | 314k | auto& write = *query->operation().AllocateRequest(); |
353 | 314k | auto& write_batch = *write.mutable_write_batch(); |
354 | 314k | *write_batch.mutable_transaction() = req_->transaction(); |
355 | 314k | if (has_row_mark) { |
356 | 10.8k | write_batch.set_row_mark_type(batch_row_mark); |
357 | 10.8k | query->set_read_time(read_time_); |
358 | 10.8k | } |
359 | 314k | write.set_unused_tablet_id(""); // For backward compatibility. |
360 | 314k | write_batch.set_deprecated_may_have_metadata(true); |
361 | 314k | write.set_batch_idx(req_->batch_idx()); |
362 | | // TODO(dtxn) write request id |
363 | | |
364 | 314k | RETURN_NOT_OK(leader_peer.peer->tablet()->CreateReadIntents( |
365 | 314k | req_->transaction(), req_->subtransaction(), req_->ql_batch(), req_->pgsql_batch(), |
366 | 314k | &write_batch)); |
367 | | |
368 | 314k | query->AdjustYsqlQueryTransactionality(req_->pgsql_batch_size()); |
369 | | |
370 | 314k | query->set_callback([peer = leader_peer.peer, self = shared_from_this()](const Status& status) { |
371 | 314k | if (!status.ok()) { |
372 | 91.8k | self->RespondFailure(status); |
373 | 222k | } else { |
374 | 222k | self->retained_self_ = self; |
375 | 222k | peer->Enqueue(self.get()); |
376 | 222k | } |
377 | 314k | }); |
378 | 314k | leader_peer.peer->WriteAsync(std::move(query)); |
379 | 314k | return Status::OK(); |
380 | 314k | } |
381 | | |
382 | 9.21M | if (req_->consistency_level() == YBConsistencyLevel::CONSISTENT_PREFIX) { |
383 | 21.9k | if (abstract_tablet_) { |
384 | 21.9k | tablet()->metrics()->consistent_prefix_read_requests->Increment(); |
385 | 21.9k | } |
386 | 21.9k | } |
387 | | |
388 | 9.21M | return Complete(); |
389 | 9.52M | } |
390 | | |
391 | 9.45M | CHECKED_STATUS ReadQuery::DoPickReadTime(server::Clock* clock) { |
392 | 9.45M | if (!read_time_) { |
393 | 7.90M | safe_ht_to_read_ = VERIFY_RESULT(abstract_tablet_->SafeTime(require_lease_)); |
394 | | // If the read time is not specified, then it is a single-shard read. |
395 | | // So we should restart it in server in case of failure. |
396 | 0 | read_time_.read = safe_ht_to_read_; |
397 | 7.90M | if (transactional()) { |
398 | 296k | read_time_.global_limit = clock->MaxGlobalNow(); |
399 | 296k | read_time_.local_limit = std::min(safe_ht_to_read_, read_time_.global_limit); |
400 | | |
401 | 296k | VLOG(1) << "Read time: " << read_time_.ToString()3 ; |
402 | 7.60M | } else { |
403 | 7.60M | read_time_.local_limit = read_time_.read; |
404 | 7.60M | read_time_.global_limit = read_time_.read; |
405 | 7.60M | } |
406 | 7.90M | } else { |
407 | 1.55M | safe_ht_to_read_ = VERIFY_RESULT(abstract_tablet_->SafeTime( |
408 | 1.55M | require_lease_, read_time_.read, context_.GetClientDeadline())); |
409 | 1.55M | } |
410 | 9.45M | return Status::OK(); |
411 | 9.45M | } |
412 | | |
413 | 9.41M | CHECKED_STATUS ReadQuery::Complete() { |
414 | 9.42M | for (;;) { |
415 | 9.42M | resp_->Clear(); |
416 | 9.42M | context_.ResetRpcSidecars(); |
417 | 9.42M | VLOG(1) << "Read time: " << read_time_ << ", safe: " << safe_ht_to_read_1.88k ; |
418 | 9.42M | const auto result = VERIFY_RESULT9.41M (DoRead());9.41M |
419 | 9.41M | if (allow_retry_ && read_time_7.89M && read_time_ == result7.87M ) { |
420 | 0 | YB_LOG_EVERY_N_SECS(DFATAL, 5) |
421 | 0 | << __func__ << ", restarting read with the same read time: " << result << THROTTLE_MSG; |
422 | 0 | allow_retry_ = false; |
423 | 0 | } |
424 | 9.41M | read_time_ = result; |
425 | | // If read was successful, then restart time is invalid. Finishing. |
426 | | // (If a read restart was requested, then read_time would be set to the time at which we have |
427 | | // to restart.) |
428 | 9.42M | if (!read_time_9.41M ) { |
429 | | // allow_retry means that that the read time was not set in the request and therefore we can |
430 | | // retry read restarts on the tablet server. |
431 | 9.42M | if (!allow_retry_) { |
432 | 1.54M | auto local_limit = std::min(safe_ht_to_read_, used_read_time_.global_limit); |
433 | 1.54M | resp_->set_local_limit_ht(local_limit.ToUint64()); |
434 | 1.54M | } |
435 | 9.42M | break; |
436 | 9.42M | } |
437 | 18.4E | if (!allow_retry_) { |
438 | | // If the read time is specified, then we read as part of a transaction. So we should restart |
439 | | // whole transaction. In this case we report restart time and abort reading. |
440 | 1.87k | resp_->Clear(); |
441 | 1.87k | auto restart_read_time = resp_->mutable_restart_read_time(); |
442 | 1.87k | restart_read_time->set_read_ht(read_time_.read.ToUint64()); |
443 | 1.87k | restart_read_time->set_deprecated_max_of_read_time_and_local_limit_ht( |
444 | 1.87k | read_time_.local_limit.ToUint64()); |
445 | 1.87k | restart_read_time->set_local_limit_ht(read_time_.local_limit.ToUint64()); |
446 | | // Global limit is ignored by caller, so we don't set it. |
447 | 1.87k | tablet()->metrics()->restart_read_requests->Increment(); |
448 | 1.87k | break; |
449 | 1.87k | } |
450 | | |
451 | 18.4E | if (CoarseMonoClock::now() > context_.GetClientDeadline()) { |
452 | 0 | TRACE("Read timed out"); |
453 | 0 | return STATUS(TimedOut, "Read timed out"); |
454 | 0 | } |
455 | 18.4E | } |
456 | 9.41M | if (req_->include_trace() && Trace::CurrentTrace() != nullptr0 ) { |
457 | 0 | resp_->set_trace_buffer(Trace::CurrentTrace()->DumpToString(true)); |
458 | 0 | } |
459 | | |
460 | | // In case read time was not specified (i.e. allow_retry is true) |
461 | | // we just picked a read time and we should communicate it back to the caller. |
462 | 9.41M | if (allow_retry_) { |
463 | 7.88M | used_read_time_.ToPB(resp_->mutable_used_read_time()); |
464 | 7.88M | } |
465 | | |
466 | | // Useful when debugging transactions |
467 | | #if defined(DUMP_READ) |
468 | | if (read_context->req->has_transaction() && read_context->req->pgsql_batch().size() == 1 && |
469 | | read_context->req->pgsql_batch()[0].partition_column_values().size() == 1 && |
470 | | read_context->resp->pgsql_batch().size() == 1 && |
471 | | read_context->resp->pgsql_batch()[0].rows_data_sidecar() == 0) { |
472 | | auto txn_id = CHECK_RESULT(FullyDecodeTransactionId( |
473 | | read_context->req->transaction().transaction_id())); |
474 | | auto value_slice = read_context->context->RpcSidecar(0).as_slice(); |
475 | | auto num = BigEndian::Load64(value_slice.data()); |
476 | | std::string result; |
477 | | if (num == 0) { |
478 | | result = "<NONE>"; |
479 | | } else if (num == 1) { |
480 | | auto len = BigEndian::Load64(value_slice.data() + 14) - 1; |
481 | | result = Slice(value_slice.data() + 22, len).ToBuffer(); |
482 | | } else { |
483 | | result = value_slice.ToDebugHexString(); |
484 | | } |
485 | | auto key = read_context->req->pgsql_batch(0).partition_column_values(0).value().int32_value(); |
486 | | LOG(INFO) << txn_id << " READ DONE: " << key << " = " << result; |
487 | | } |
488 | | #endif |
489 | | |
490 | 9.41M | MakeRpcOperationCompletionCallback<ReadResponsePB>( |
491 | 9.41M | std::move(context_), resp_, server_.Clock())(Status::OK()); |
492 | 9.41M | TRACE("Done Read"); |
493 | | |
494 | 9.41M | return Status::OK(); |
495 | 9.41M | } |
496 | | |
497 | 9.44M | Result<ReadHybridTime> ReadQuery::DoRead() { |
498 | 9.44M | Result<ReadHybridTime> result{ReadHybridTime()}; |
499 | 9.44M | { |
500 | 9.44M | LongOperationTracker long_operation_tracker("Read", 1s); |
501 | 9.44M | result = DoReadImpl(); |
502 | 9.44M | } |
503 | | // Check transaction is still alive in case read was successful |
504 | | // and data has been written earlier into current tablet in context of current transaction. |
505 | 9.44M | const auto* transaction = |
506 | 9.44M | req_->has_transaction() ? &req_->transaction()809k : nullptr8.63M ; |
507 | 9.44M | if (result.ok() && transaction9.44M && transaction->isolation() == NON_TRANSACTIONAL809k ) { |
508 | 369k | const auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(transaction->transaction_id())); |
509 | 0 | auto& txn_participant = *tablet()->transaction_participant(); |
510 | 369k | RETURN_NOT_OK(txn_participant.CheckAborted(txn_id)); |
511 | 369k | } |
512 | 9.44M | return result; |
513 | 9.44M | } |
514 | | |
515 | 9.45M | Result<ReadHybridTime> ReadQuery::DoReadImpl() { |
516 | 9.45M | ReadHybridTime read_time; |
517 | 9.45M | tablet::ScopedReadOperation read_tx; |
518 | 9.45M | if (IsForBackfill()) { |
519 | 2.33k | read_time = read_time_; |
520 | 9.45M | } else { |
521 | 9.45M | read_tx = VERIFY_RESULT( |
522 | 0 | tablet::ScopedReadOperation::Create(abstract_tablet_.get(), require_lease_, read_time_)); |
523 | 0 | read_time = read_tx.read_time(); |
524 | 9.45M | } |
525 | 9.45M | used_read_time_ = read_time; |
526 | 9.45M | if (!req_->redis_batch().empty()) { |
527 | | // Assert the primary table is a redis table. |
528 | 84.6k | DCHECK_EQ(abstract_tablet_->table_type(), TableType::REDIS_TABLE_TYPE); |
529 | 84.6k | auto count = req_->redis_batch_size(); |
530 | 84.6k | std::vector<Status> rets(count); |
531 | 84.6k | CountDownLatch latch(count); |
532 | 169k | for (int idx = 0; idx < count; idx++84.6k ) { |
533 | 84.6k | const RedisReadRequestPB& redis_read_req = req_->redis_batch(idx); |
534 | 84.6k | Status &failed_status_ = rets[idx]; |
535 | 84.6k | auto cb = [&latch, &failed_status_] (const Status &status) -> void { |
536 | 84.6k | if (!status.ok()) |
537 | 0 | failed_status_ = status; |
538 | 84.6k | latch.CountDown(1); |
539 | 84.6k | }; |
540 | 84.6k | auto func = Bind( |
541 | 84.6k | &HandleRedisReadRequestAsync, |
542 | 84.6k | Unretained(abstract_tablet_.get()), |
543 | 84.6k | context_.GetClientDeadline(), |
544 | 84.6k | read_time, |
545 | 84.6k | redis_read_req, |
546 | 84.6k | Unretained(resp_->add_redis_batch()), |
547 | 84.6k | cb); |
548 | | |
549 | 84.6k | Status s; |
550 | 84.6k | bool run_async = FLAGS_parallelize_read_ops && (idx != count - 1); |
551 | 84.6k | if (run_async) { |
552 | 8 | s = server_.tablet_manager()->read_pool()->SubmitClosure(func); |
553 | 8 | } |
554 | | |
555 | 84.6k | if (!s.ok() || !run_async) { |
556 | 84.6k | func.Run(); |
557 | 84.6k | } |
558 | 84.6k | } |
559 | 84.6k | latch.Wait(); |
560 | 84.6k | std::vector<Status> failed; |
561 | 84.6k | for (auto& status : rets) { |
562 | 84.6k | if (!status.ok()) { |
563 | 0 | failed.push_back(status); |
564 | 0 | } |
565 | 84.6k | } |
566 | 84.6k | if (failed.size() == 0) { |
567 | | // TODO(dtxn) implement read restart for Redis. |
568 | 84.6k | return ReadHybridTime(); |
569 | 84.6k | } else if (0 failed.size() == 10 ) { |
570 | 0 | return failed[0]; |
571 | 0 | } else { |
572 | 0 | return STATUS(Combined, VectorToString(failed)); |
573 | 0 | } |
574 | 84.6k | } |
575 | | |
576 | 9.36M | if (!req_->ql_batch().empty()) { |
577 | | // Assert the primary table is a YQL table. |
578 | 7.54M | DCHECK_EQ(abstract_tablet_->table_type(), TableType::YQL_TABLE_TYPE); |
579 | 7.54M | ReadRequestPB* mutable_req = const_cast<ReadRequestPB*>(req_); |
580 | 7.54M | for (QLReadRequestPB& ql_read_req : *mutable_req->mutable_ql_batch()) { |
581 | | // Update the remote endpoint. |
582 | 7.54M | ql_read_req.set_allocated_remote_endpoint(&host_port_pb_); |
583 | 7.54M | ql_read_req.set_allocated_proxy_uuid(mutable_req->mutable_proxy_uuid()); |
584 | 7.54M | auto se = ScopeExit([&ql_read_req] { |
585 | 7.53M | ql_read_req.release_remote_endpoint(); |
586 | 7.53M | ql_read_req.release_proxy_uuid(); |
587 | 7.53M | }); |
588 | | |
589 | 7.54M | tablet::QLReadRequestResult result; |
590 | 7.54M | TRACE("Start HandleQLReadRequest"); |
591 | 7.54M | RETURN_NOT_OK(abstract_tablet_->HandleQLReadRequest( |
592 | 7.54M | context_.GetClientDeadline(), read_time, ql_read_req, req_->transaction(), &result)); |
593 | 7.54M | TRACE("Done HandleQLReadRequest"); |
594 | 7.54M | if (result.restart_read_ht.is_valid()) { |
595 | 21 | return FormRestartReadHybridTime(result.restart_read_ht); |
596 | 21 | } |
597 | 7.54M | result.response.set_rows_data_sidecar( |
598 | 7.54M | narrow_cast<int32_t>(context_.AddRpcSidecar(result.rows_data))); |
599 | 7.54M | resp_->add_ql_batch()->Swap(&result.response); |
600 | 7.54M | } |
601 | 7.54M | return ReadHybridTime(); |
602 | 7.54M | } |
603 | | |
604 | 1.82M | if (!req_->pgsql_batch().empty()) { |
605 | 1.81M | ReadRequestPB* mutable_req = const_cast<ReadRequestPB*>(req_); |
606 | 1.81M | size_t total_num_rows_read = 0; |
607 | 3.47M | for (PgsqlReadRequestPB& pgsql_read_req : *mutable_req->mutable_pgsql_batch()) { |
608 | 3.47M | tablet::PgsqlReadRequestResult result; |
609 | 3.47M | TRACE("Start HandlePgsqlReadRequest"); |
610 | 3.47M | size_t num_rows_read; |
611 | 3.47M | RETURN_NOT_OK(abstract_tablet_->HandlePgsqlReadRequest( |
612 | 3.47M | context_.GetClientDeadline(), read_time, |
613 | 3.47M | !allow_retry_ /* is_explicit_request_read_time */, pgsql_read_req, req_->transaction(), |
614 | 3.47M | req_->subtransaction(), &result, &num_rows_read)); |
615 | | |
616 | 3.47M | total_num_rows_read += num_rows_read; |
617 | | |
618 | 3.47M | TRACE("Done HandlePgsqlReadRequest"); |
619 | 3.47M | if (result.restart_read_ht.is_valid()) { |
620 | 1.85k | return FormRestartReadHybridTime(result.restart_read_ht); |
621 | 1.85k | } |
622 | 3.47M | result.response.set_rows_data_sidecar( |
623 | 3.47M | narrow_cast<int32_t>(context_.AddRpcSidecar(result.rows_data))); |
624 | 3.47M | resp_->add_pgsql_batch()->Swap(&result.response); |
625 | 3.47M | } |
626 | | |
627 | 1.81M | if (req_->consistency_level() == YBConsistencyLevel::CONSISTENT_PREFIX && |
628 | 1.81M | total_num_rows_read > 093 ) { |
629 | 84 | tablet()->metrics()->pgsql_consistent_prefix_read_rows->IncrementBy(total_num_rows_read); |
630 | 84 | } |
631 | 1.81M | return ReadHybridTime(); |
632 | 1.81M | } |
633 | | |
634 | 4.05k | if (abstract_tablet_->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
635 | 0 | return STATUS(NotSupported, "Transaction status table does not support read"); |
636 | 0 | } |
637 | | |
638 | 4.05k | return ReadHybridTime(); |
639 | 4.05k | } |
640 | | |
641 | | } // namespace |
642 | | |
643 | | void PerformRead( |
644 | | TabletServerIf* server, ReadTabletProvider* read_tablet_provider, |
645 | 9.55M | const ReadRequestPB* req, ReadResponsePB* resp, rpc::RpcContext context) { |
646 | 9.55M | auto read_query = std::make_shared<ReadQuery>( |
647 | 9.55M | server, read_tablet_provider, req, resp, std::move(context)); |
648 | 9.55M | read_query->Perform(); |
649 | 9.55M | } |
650 | | |
651 | | } // namespace tserver |
652 | | } // namespace yb |