YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/read_query.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tserver/read_query.h"
15
16
#include "yb/common/row_mark.h"
17
#include "yb/common/transaction.h"
18
19
#include "yb/gutil/bind.h"
20
21
#include "yb/tablet/operations/write_operation.h"
22
#include "yb/tablet/read_result.h"
23
#include "yb/tablet/tablet.h"
24
#include "yb/tablet/tablet_metadata.h"
25
#include "yb/tablet/tablet_metrics.h"
26
#include "yb/tablet/transaction_participant.h"
27
#include "yb/tablet/write_query.h"
28
29
#include "yb/tserver/service_util.h"
30
#include "yb/tserver/tablet_server_interface.h"
31
#include "yb/tserver/ts_tablet_manager.h"
32
#include "yb/tserver/tserver.pb.h"
33
34
#include "yb/util/countdown_latch.h"
35
#include "yb/util/debug/trace_event.h"
36
#include "yb/util/flag_tags.h"
37
#include "yb/util/metrics.h"
38
#include "yb/util/scope_exit.h"
39
#include "yb/util/trace.h"
40
41
using namespace std::literals;
42
43
DEFINE_test_flag(int32, transactional_read_delay_ms, 0,
44
                 "Amount of time to delay between transaction status check and reading start.");
45
46
DEFINE_test_flag(int32, simulate_time_out_failures_msecs, 0, "If greater than 0, we will randomly "
47
                 "mark read requests as timed out and sleep for the specificed amount of time by "
48
                 "this flag to simulate time out failures. The requester will mark the timed out "
49
                 "replica as failed, and its periodic refresh mechanism for the lookup cache will "
50
                 "mark them as available.");
51
52
DEFINE_test_flag(bool, assert_reads_served_by_follower, false, "If set, we verify that the "
53
                 "consistency level is CONSISTENT_PREFIX, and that this server is not the leader "
54
                 "for the tablet");
55
56
DEFINE_bool(parallelize_read_ops, true,
57
            "Controls whether multiple (Redis) read ops that are present in a operation "
58
            "should be executed in parallel.");
59
TAG_FLAG(parallelize_read_ops, advanced);
60
TAG_FLAG(parallelize_read_ops, runtime);
61
62
namespace yb {
63
namespace tserver {
64
65
namespace {
66
67
void HandleRedisReadRequestAsync(
68
    tablet::AbstractTablet* tablet,
69
    CoarseTimePoint deadline,
70
    const ReadHybridTime& read_time,
71
    const RedisReadRequestPB& redis_read_request,
72
    RedisResponsePB* response,
73
    const std::function<void(const Status& s)>& status_cb
74
84.6k
) {
75
84.6k
  status_cb(tablet->HandleRedisReadRequest(deadline, read_time, redis_read_request, response));
76
84.6k
}
77
78
class ReadQuery : public std::enable_shared_from_this<ReadQuery>, public rpc::ThreadPoolTask {
79
 public:
80
  ReadQuery(
81
      TabletServerIf* server, ReadTabletProvider* read_tablet_provider, const ReadRequestPB* req,
82
      ReadResponsePB* resp, rpc::RpcContext context)
83
      : server_(*server), read_tablet_provider_(*read_tablet_provider), req_(req), resp_(resp),
84
9.56M
        context_(std::move(context)) {}
85
86
9.56M
  void Perform() {
87
9.56M
    RespondIfFailed(DoPerform());
88
9.56M
  }
89
90
10.0M
  void RespondIfFailed(const Status& status) {
91
10.0M
    if (!status.ok()) {
92
23.4k
      RespondFailure(status);
93
23.4k
    }
94
10.0M
  }
95
96
115k
  void RespondFailure(const Status& status) {
97
115k
    SetupErrorAndRespond(resp_->mutable_error(), status, &context_);
98
115k
  }
99
100
9.54M
  virtual ~ReadQuery() = default;
101
102
 private:
103
  CHECKED_STATUS DoPerform();
104
105
  // Picks read based for specified read context.
106
  CHECKED_STATUS DoPickReadTime(server::Clock* clock);
107
108
  bool transactional() const;
109
110
  tablet::Tablet* tablet() const;
111
112
  ReadHybridTime FormRestartReadHybridTime(const HybridTime& restart_time) const;
113
114
  CHECKED_STATUS PickReadTime(server::Clock* clock);
115
116
  bool IsForBackfill() const;
117
118
  // Read implementation. If restart is required returns restart time, in case of success
119
  // returns invalid ReadHybridTime. Otherwise returns error status.
120
  Result<ReadHybridTime> DoRead();
121
  Result<ReadHybridTime> DoReadImpl();
122
123
  CHECKED_STATUS Complete();
124
125
  void UpdateConsistentPrefixMetrics();
126
127
  // Used when we write intents during read, i.e. for serializable isolation.
128
  // We cannot proceed with read from completion callback, to avoid holding
129
  // replica state lock for too long.
130
  // So ThreadPool is used to proceed with read.
131
222k
  void Run() override {
132
222k
    auto status = PickReadTime(server_.Clock());
133
222k
    if (status.ok()) {
134
222k
      status = Complete();
135
222k
    }
136
222k
    RespondIfFailed(status);
137
222k
  }
138
139
222k
  void Done(const Status& status) override {
140
222k
    RespondIfFailed(status);
141
222k
    retained_self_ = nullptr;
142
222k
  }
143
144
  TabletServerIf& server_;
145
  ReadTabletProvider& read_tablet_provider_;
146
  const ReadRequestPB* req_;
147
  ReadResponsePB* resp_;
148
  rpc::RpcContext context_;
149
150
  std::shared_ptr<tablet::AbstractTablet> abstract_tablet_;
151
152
  ReadHybridTime read_time_;
153
  HybridTime safe_ht_to_read_;
154
  ReadHybridTime used_read_time_;
155
  tablet::RequireLease require_lease_ = tablet::RequireLease::kFalse;
156
  HostPortPB host_port_pb_;
157
  bool allow_retry_ = false;
158
  RequestScope request_scope_;
159
  std::shared_ptr<ReadQuery> retained_self_;
160
};
161
162
17.4M
bool ReadQuery::transactional() const {
163
17.4M
  return abstract_tablet_->IsTransactionalRequest(req_->pgsql_batch_size() > 0);
164
17.4M
}
165
166
11.5M
tablet::Tablet* ReadQuery::tablet() const {
167
11.5M
  return down_cast<tablet::Tablet*>(abstract_tablet_.get());
168
11.5M
}
169
170
1.87k
ReadHybridTime ReadQuery::FormRestartReadHybridTime(const HybridTime& restart_time) const {
171
1.87k
  DCHECK_GT(restart_time, read_time_.read);
172
18.4E
  VLOG(1) << "Restart read required at: " << restart_time << ", original: " << read_time_;
173
1.87k
  auto result = read_time_;
174
1.87k
  result.read = std::min(std::max(restart_time, safe_ht_to_read_), read_time_.global_limit);
175
1.87k
  result.local_limit = std::min(safe_ht_to_read_, read_time_.global_limit);
176
1.87k
  return result;
177
1.87k
}
178
179
9.45M
CHECKED_STATUS ReadQuery::PickReadTime(server::Clock* clock) {
180
9.45M
  auto result = DoPickReadTime(clock);
181
9.45M
  if (!result.ok()) {
182
0
    TRACE(result.ToString());
183
0
  }
184
9.45M
  return result;
185
9.45M
}
186
187
9.45M
bool ReadQuery::IsForBackfill() const {
188
9.45M
  if (req_->pgsql_batch_size() > 0) {
189
1.81M
    if (req_->pgsql_batch(0).is_for_backfill()) {
190
      // Currently, read requests for backfill should only come by themselves, not in batches.
191
2.33k
      DCHECK_EQ(req_->pgsql_batch_size(), 1);
192
2.33k
      return true;
193
2.33k
    }
194
1.81M
  }
195
  // YCQL doesn't send read RPCs for scanning the indexed table and instead directly reads using
196
  // iterator, so there's no equivalent logic for YCQL here.
197
9.45M
  return false;
198
9.45M
}
199
200
9.55M
CHECKED_STATUS ReadQuery::DoPerform() {
201
9.55M
  TRACE("Start Read");
202
9.55M
  TRACE_EVENT1("tserver", "TabletServiceImpl::Read", "tablet_id", req_->tablet_id());
203
9.55M
  VLOG
(2) << "Received Read RPC: " << req_->DebugString()2.69k
;
204
  // Unfortunately, determining the isolation level is not as straightforward as it seems. All but
205
  // the first request to a given tablet by a particular transaction assume that the tablet already
206
  // has the transaction metadata, including the isolation level, and those requests expect us to
207
  // retrieve the isolation level from that metadata. Failure to do so was the cause of a
208
  // serialization anomaly tested by TestOneOrTwoAdmins
209
  // (https://github.com/yugabyte/yugabyte-db/issues/1572).
210
211
9.55M
  bool serializable_isolation = false;
212
9.55M
  TabletPeerTablet peer_tablet;
213
9.55M
  if (req_->has_transaction()) {
214
901k
    IsolationLevel isolation_level;
215
901k
    if (req_->transaction().has_isolation()) {
216
      // This must be the first request to this tablet by this particular transaction.
217
531k
      isolation_level = req_->transaction().isolation();
218
531k
    } else {
219
370k
      peer_tablet = VERIFY_RESULT(LookupTabletPeer(
220
0
          server_.tablet_peer_lookup(), req_->tablet_id()));
221
369k
      isolation_level = VERIFY_RESULT(peer_tablet.tablet->GetIsolationLevelFromPB(*req_));
222
369k
    }
223
901k
    serializable_isolation = isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION;
224
225
901k
    if (PREDICT_FALSE(FLAGS_TEST_transactional_read_delay_ms > 0)) {
226
0
      LOG(INFO) << "Delaying transactional read for "
227
0
                << FLAGS_TEST_transactional_read_delay_ms << " ms.";
228
0
      SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_transactional_read_delay_ms));
229
0
    }
230
231
#if defined(DUMP_READ)
232
    if (req->pgsql_batch().size() > 0) {
233
      LOG(INFO) << CHECK_RESULT(FullyDecodeTransactionId(req->transaction().transaction_id()))
234
                << " READ: " << req->pgsql_batch(0).partition_column_values(0).value().int32_value()
235
                << ", " << isolation_level;
236
    }
237
#endif
238
901k
  }
239
240
  // Get the most restrictive row mark present in the batch of PostgreSQL requests.
241
  // TODO: rather handle individual row marks once we start batching read requests (issue #2495)
242
9.55M
  RowMarkType batch_row_mark = RowMarkType::ROW_MARK_ABSENT;
243
9.55M
  if (!req_->pgsql_batch().empty()) {
244
1.91M
    uint64_t last_breaking_catalog_version = 0; // unset.
245
4.36M
    for (const auto& pg_req : req_->pgsql_batch()) {
246
      // For postgres requests check that the syscatalog version matches.
247
4.36M
      if (pg_req.has_ysql_catalog_version()) {
248
3.01M
        if (last_breaking_catalog_version == 0) {
249
          // Initialize last breaking version if not yet set.
250
565k
          server_.get_ysql_catalog_version(
251
565k
              nullptr /* current_version */, &last_breaking_catalog_version);
252
565k
        }
253
3.01M
        if (pg_req.ysql_catalog_version() < last_breaking_catalog_version) {
254
7
          return STATUS(
255
7
              QLError, "The catalog snapshot used for this transaction has been invalidated",
256
7
              TabletServerError(TabletServerErrorPB::MISMATCHED_SCHEMA));
257
7
        }
258
3.01M
      }
259
4.36M
      RowMarkType current_row_mark = GetRowMarkTypeFromPB(pg_req);
260
4.36M
      if (IsValidRowMarkType(current_row_mark)) {
261
10.8k
        if (!req_->has_transaction()) {
262
0
          return STATUS(
263
0
              NotSupported, "Read request with row mark types must be part of a transaction",
264
0
              TabletServerError(TabletServerErrorPB::OPERATION_NOT_SUPPORTED));
265
0
        }
266
10.8k
        batch_row_mark = GetStrongestRowMarkType({current_row_mark, batch_row_mark});
267
10.8k
      }
268
4.36M
    }
269
1.91M
  }
270
9.55M
  const bool has_row_mark = IsValidRowMarkType(batch_row_mark);
271
272
9.55M
  LeaderTabletPeer leader_peer;
273
274
9.55M
  if (serializable_isolation || 
has_row_mark9.23M
) {
275
    // At this point we expect that we don't have pure read serializable transactions, and
276
    // always write read intents to detect conflicts with other writes.
277
314k
    leader_peer = 
VERIFY_RESULT314k
(314k
LookupLeaderTablet(
278
0
        server_.tablet_peer_lookup(), req_->tablet_id(), std::move(peer_tablet)));
279
    // Serializable read adds intents, i.e. writes data.
280
    // We should check for memory pressure in this case.
281
314k
    RETURN_NOT_OK(CheckWriteThrottling(req_->rejection_score(), leader_peer.peer.get()));
282
314k
    abstract_tablet_ = leader_peer.peer->shared_tablet();
283
9.23M
  } else {
284
9.23M
    abstract_tablet_ = 
VERIFY_RESULT9.21M
(9.21M
read_tablet_provider_.GetTabletForRead(
285
0
        req_->tablet_id(), std::move(peer_tablet.tablet_peer),
286
0
        req_->consistency_level(), AllowSplitTablet::kFalse));
287
0
    leader_peer.leader_term = OpId::kUnknownTerm;
288
9.21M
  }
289
290
9.52M
  if (PREDICT_FALSE(FLAGS_TEST_assert_reads_served_by_follower)) {
291
4.00k
    if (req_->consistency_level() == YBConsistencyLevel::STRONG) {
292
0
      LOG(FATAL) << "--TEST_assert_reads_served_by_follower is true but consistency level is "
293
0
                    "invalid: YBConsistencyLevel::STRONG";
294
0
    }
295
4.00k
    tablet::TabletPeerPtr tablet_peer;
296
4.00k
    RETURN_NOT_OK(server_.tablet_peer_lookup()->GetTabletPeer(req_->tablet_id(), &tablet_peer));
297
4.00k
    if (CheckPeerIsLeader(*tablet_peer).ok()) {
298
0
      LOG(FATAL) << "--TEST_assert_reads_served_by_follower is true but read is being served by "
299
0
                 << " peer " << tablet_peer->permanent_uuid()
300
0
                 << " which is the leader for tablet " << req_->tablet_id();
301
0
    }
302
4.00k
  }
303
304
9.52M
  if (!abstract_tablet_->system() && 
tablet()->metadata()->hidden()9.24M
) {
305
0
    return STATUS(NotFound, "Tablet not found", req_->tablet_id());
306
0
  }
307
308
9.52M
  if (FLAGS_TEST_simulate_time_out_failures_msecs > 0 && 
RandomUniformInt(0, 10) < 2999
) {
309
199
    LOG(INFO) << "Marking request as timed out for test: " << req_->ShortDebugString();
310
199
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_simulate_time_out_failures_msecs));
311
199
    return STATUS(TimedOut, "timed out for test");
312
199
  }
313
314
9.52M
  if (server_.Clock()) {
315
9.52M
    server::UpdateClock(*req_, server_.Clock());
316
9.52M
  }
317
318
  // safe_ht_to_read is used only for read restart, so if read_time is valid, then we would respond
319
  // with "restart required".
320
9.52M
  read_time_ = ReadHybridTime::FromReadTimePB(*req_);
321
322
9.52M
  allow_retry_ = !read_time_;
323
9.52M
  require_lease_ = tablet::RequireLease(req_->consistency_level() == YBConsistencyLevel::STRONG);
324
  // TODO: should check all the tables referenced by the requests to decide if it is transactional.
325
9.52M
  const bool transactional = this->transactional();
326
  // Should not pick read time for serializable isolation, since it is picked after read intents
327
  // are added. Also conflict resolution for serializable isolation should be done without read time
328
  // specified. So we use max hybrid time for conflict resolution in such case.
329
  // It was implemented as part of #655.
330
9.52M
  if (!serializable_isolation) {
331
9.23M
    RETURN_NOT_OK(PickReadTime(server_.Clock()));
332
9.23M
  }
333
334
9.52M
  if (transactional) {
335
    // Serial number is used to check whether this operation was initiated before
336
    // transaction status request. So we should initialize it as soon as possible.
337
1.90M
    request_scope_ = RequestScope(tablet()->transaction_participant());
338
1.90M
    read_time_.serial_no = request_scope_.request_id();
339
1.90M
  }
340
341
9.52M
  const auto& remote_address = context_.remote_address();
342
9.52M
  host_port_pb_.set_host(remote_address.address().to_string());
343
9.52M
  host_port_pb_.set_port(remote_address.port());
344
345
9.52M
  if (serializable_isolation || 
has_row_mark9.20M
) {
346
314k
    auto deadline = context_.GetClientDeadline();
347
314k
    auto query = std::make_unique<tablet::WriteQuery>(
348
314k
        leader_peer.leader_term, deadline, leader_peer.peer.get(),
349
314k
        leader_peer.peer->tablet(), nullptr /* response */,
350
314k
        docdb::OperationKind::kRead);
351
352
314k
    auto& write = *query->operation().AllocateRequest();
353
314k
    auto& write_batch = *write.mutable_write_batch();
354
314k
    *write_batch.mutable_transaction() = req_->transaction();
355
314k
    if (has_row_mark) {
356
10.8k
      write_batch.set_row_mark_type(batch_row_mark);
357
10.8k
      query->set_read_time(read_time_);
358
10.8k
    }
359
314k
    write.set_unused_tablet_id(""); // For backward compatibility.
360
314k
    write_batch.set_deprecated_may_have_metadata(true);
361
314k
    write.set_batch_idx(req_->batch_idx());
362
    // TODO(dtxn) write request id
363
364
314k
    RETURN_NOT_OK(leader_peer.peer->tablet()->CreateReadIntents(
365
314k
        req_->transaction(), req_->subtransaction(), req_->ql_batch(), req_->pgsql_batch(),
366
314k
        &write_batch));
367
368
314k
    query->AdjustYsqlQueryTransactionality(req_->pgsql_batch_size());
369
370
314k
    query->set_callback([peer = leader_peer.peer, self = shared_from_this()](const Status& status) {
371
314k
      if (!status.ok()) {
372
91.8k
        self->RespondFailure(status);
373
222k
      } else {
374
222k
        self->retained_self_ = self;
375
222k
        peer->Enqueue(self.get());
376
222k
      }
377
314k
    });
378
314k
    leader_peer.peer->WriteAsync(std::move(query));
379
314k
    return Status::OK();
380
314k
  }
381
382
9.21M
  if (req_->consistency_level() == YBConsistencyLevel::CONSISTENT_PREFIX) {
383
21.9k
    if (abstract_tablet_) {
384
21.9k
      tablet()->metrics()->consistent_prefix_read_requests->Increment();
385
21.9k
    }
386
21.9k
  }
387
388
9.21M
  return Complete();
389
9.52M
}
390
391
9.45M
CHECKED_STATUS ReadQuery::DoPickReadTime(server::Clock* clock) {
392
9.45M
  if (!read_time_) {
393
7.90M
    safe_ht_to_read_ = VERIFY_RESULT(abstract_tablet_->SafeTime(require_lease_));
394
    // If the read time is not specified, then it is a single-shard read.
395
    // So we should restart it in server in case of failure.
396
0
    read_time_.read = safe_ht_to_read_;
397
7.90M
    if (transactional()) {
398
296k
      read_time_.global_limit = clock->MaxGlobalNow();
399
296k
      read_time_.local_limit = std::min(safe_ht_to_read_, read_time_.global_limit);
400
401
296k
      VLOG
(1) << "Read time: " << read_time_.ToString()3
;
402
7.60M
    } else {
403
7.60M
      read_time_.local_limit = read_time_.read;
404
7.60M
      read_time_.global_limit = read_time_.read;
405
7.60M
    }
406
7.90M
  } else {
407
1.55M
    safe_ht_to_read_ = VERIFY_RESULT(abstract_tablet_->SafeTime(
408
1.55M
        require_lease_, read_time_.read, context_.GetClientDeadline()));
409
1.55M
  }
410
9.45M
  return Status::OK();
411
9.45M
}
412
413
9.41M
CHECKED_STATUS ReadQuery::Complete() {
414
9.42M
  for (;;) {
415
9.42M
    resp_->Clear();
416
9.42M
    context_.ResetRpcSidecars();
417
9.42M
    VLOG
(1) << "Read time: " << read_time_ << ", safe: " << safe_ht_to_read_1.88k
;
418
9.42M
    const auto result = 
VERIFY_RESULT9.41M
(DoRead());9.41M
419
9.41M
    if (allow_retry_ && 
read_time_7.89M
&&
read_time_ == result7.87M
) {
420
0
      YB_LOG_EVERY_N_SECS(DFATAL, 5)
421
0
          << __func__ << ", restarting read with the same read time: " << result << THROTTLE_MSG;
422
0
      allow_retry_ = false;
423
0
    }
424
9.41M
    read_time_ = result;
425
    // If read was successful, then restart time is invalid. Finishing.
426
    // (If a read restart was requested, then read_time would be set to the time at which we have
427
    // to restart.)
428
9.42M
    if (
!read_time_9.41M
) {
429
      // allow_retry means that that the read time was not set in the request and therefore we can
430
      // retry read restarts on the tablet server.
431
9.42M
      if (!allow_retry_) {
432
1.54M
        auto local_limit = std::min(safe_ht_to_read_, used_read_time_.global_limit);
433
1.54M
        resp_->set_local_limit_ht(local_limit.ToUint64());
434
1.54M
      }
435
9.42M
      break;
436
9.42M
    }
437
18.4E
    if (!allow_retry_) {
438
      // If the read time is specified, then we read as part of a transaction. So we should restart
439
      // whole transaction. In this case we report restart time and abort reading.
440
1.87k
      resp_->Clear();
441
1.87k
      auto restart_read_time = resp_->mutable_restart_read_time();
442
1.87k
      restart_read_time->set_read_ht(read_time_.read.ToUint64());
443
1.87k
      restart_read_time->set_deprecated_max_of_read_time_and_local_limit_ht(
444
1.87k
          read_time_.local_limit.ToUint64());
445
1.87k
      restart_read_time->set_local_limit_ht(read_time_.local_limit.ToUint64());
446
      // Global limit is ignored by caller, so we don't set it.
447
1.87k
      tablet()->metrics()->restart_read_requests->Increment();
448
1.87k
      break;
449
1.87k
    }
450
451
18.4E
    if (CoarseMonoClock::now() > context_.GetClientDeadline()) {
452
0
      TRACE("Read timed out");
453
0
      return STATUS(TimedOut, "Read timed out");
454
0
    }
455
18.4E
  }
456
9.41M
  if (req_->include_trace() && 
Trace::CurrentTrace() != nullptr0
) {
457
0
    resp_->set_trace_buffer(Trace::CurrentTrace()->DumpToString(true));
458
0
  }
459
460
  // In case read time was not specified (i.e. allow_retry is true)
461
  // we just picked a read time and we should communicate it back to the caller.
462
9.41M
  if (allow_retry_) {
463
7.88M
    used_read_time_.ToPB(resp_->mutable_used_read_time());
464
7.88M
  }
465
466
  // Useful when debugging transactions
467
#if defined(DUMP_READ)
468
  if (read_context->req->has_transaction() && read_context->req->pgsql_batch().size() == 1 &&
469
      read_context->req->pgsql_batch()[0].partition_column_values().size() == 1 &&
470
      read_context->resp->pgsql_batch().size() == 1 &&
471
      read_context->resp->pgsql_batch()[0].rows_data_sidecar() == 0) {
472
    auto txn_id = CHECK_RESULT(FullyDecodeTransactionId(
473
        read_context->req->transaction().transaction_id()));
474
    auto value_slice = read_context->context->RpcSidecar(0).as_slice();
475
    auto num = BigEndian::Load64(value_slice.data());
476
    std::string result;
477
    if (num == 0) {
478
      result = "<NONE>";
479
    } else if (num == 1) {
480
      auto len = BigEndian::Load64(value_slice.data() + 14) - 1;
481
      result = Slice(value_slice.data() + 22, len).ToBuffer();
482
    } else {
483
      result = value_slice.ToDebugHexString();
484
    }
485
    auto key = read_context->req->pgsql_batch(0).partition_column_values(0).value().int32_value();
486
    LOG(INFO) << txn_id << " READ DONE: " << key << " = " << result;
487
  }
488
#endif
489
490
9.41M
  MakeRpcOperationCompletionCallback<ReadResponsePB>(
491
9.41M
      std::move(context_), resp_, server_.Clock())(Status::OK());
492
9.41M
  TRACE("Done Read");
493
494
9.41M
  return Status::OK();
495
9.41M
}
496
497
9.44M
Result<ReadHybridTime> ReadQuery::DoRead() {
498
9.44M
  Result<ReadHybridTime> result{ReadHybridTime()};
499
9.44M
  {
500
9.44M
    LongOperationTracker long_operation_tracker("Read", 1s);
501
9.44M
    result = DoReadImpl();
502
9.44M
  }
503
  // Check transaction is still alive in case read was successful
504
  // and data has been written earlier into current tablet in context of current transaction.
505
9.44M
  const auto* transaction =
506
9.44M
      req_->has_transaction() ? 
&req_->transaction()809k
:
nullptr8.63M
;
507
9.44M
  if (result.ok() && 
transaction9.44M
&&
transaction->isolation() == NON_TRANSACTIONAL809k
) {
508
369k
    const auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(transaction->transaction_id()));
509
0
    auto& txn_participant = *tablet()->transaction_participant();
510
369k
    RETURN_NOT_OK(txn_participant.CheckAborted(txn_id));
511
369k
  }
512
9.44M
  return result;
513
9.44M
}
514
515
9.45M
Result<ReadHybridTime> ReadQuery::DoReadImpl() {
516
9.45M
  ReadHybridTime read_time;
517
9.45M
  tablet::ScopedReadOperation read_tx;
518
9.45M
  if (IsForBackfill()) {
519
2.33k
    read_time = read_time_;
520
9.45M
  } else {
521
9.45M
    read_tx = VERIFY_RESULT(
522
0
        tablet::ScopedReadOperation::Create(abstract_tablet_.get(), require_lease_, read_time_));
523
0
    read_time = read_tx.read_time();
524
9.45M
  }
525
9.45M
  used_read_time_ = read_time;
526
9.45M
  if (!req_->redis_batch().empty()) {
527
    // Assert the primary table is a redis table.
528
84.6k
    DCHECK_EQ(abstract_tablet_->table_type(), TableType::REDIS_TABLE_TYPE);
529
84.6k
    auto count = req_->redis_batch_size();
530
84.6k
    std::vector<Status> rets(count);
531
84.6k
    CountDownLatch latch(count);
532
169k
    for (int idx = 0; idx < count; 
idx++84.6k
) {
533
84.6k
      const RedisReadRequestPB& redis_read_req = req_->redis_batch(idx);
534
84.6k
      Status &failed_status_ = rets[idx];
535
84.6k
      auto cb = [&latch, &failed_status_] (const Status &status) -> void {
536
84.6k
                  if (!status.ok())
537
0
                    failed_status_ = status;
538
84.6k
                  latch.CountDown(1);
539
84.6k
                };
540
84.6k
      auto func = Bind(
541
84.6k
          &HandleRedisReadRequestAsync,
542
84.6k
          Unretained(abstract_tablet_.get()),
543
84.6k
          context_.GetClientDeadline(),
544
84.6k
          read_time,
545
84.6k
          redis_read_req,
546
84.6k
          Unretained(resp_->add_redis_batch()),
547
84.6k
          cb);
548
549
84.6k
      Status s;
550
84.6k
      bool run_async = FLAGS_parallelize_read_ops && (idx != count - 1);
551
84.6k
      if (run_async) {
552
8
        s = server_.tablet_manager()->read_pool()->SubmitClosure(func);
553
8
      }
554
555
84.6k
      if (!s.ok() || !run_async) {
556
84.6k
        func.Run();
557
84.6k
      }
558
84.6k
    }
559
84.6k
    latch.Wait();
560
84.6k
    std::vector<Status> failed;
561
84.6k
    for (auto& status : rets) {
562
84.6k
      if (!status.ok()) {
563
0
        failed.push_back(status);
564
0
      }
565
84.6k
    }
566
84.6k
    if (failed.size() == 0) {
567
      // TODO(dtxn) implement read restart for Redis.
568
84.6k
      return ReadHybridTime();
569
84.6k
    } else 
if (0
failed.size() == 10
) {
570
0
      return failed[0];
571
0
    } else {
572
0
      return STATUS(Combined, VectorToString(failed));
573
0
    }
574
84.6k
  }
575
576
9.36M
  if (!req_->ql_batch().empty()) {
577
    // Assert the primary table is a YQL table.
578
7.54M
    DCHECK_EQ(abstract_tablet_->table_type(), TableType::YQL_TABLE_TYPE);
579
7.54M
    ReadRequestPB* mutable_req = const_cast<ReadRequestPB*>(req_);
580
7.54M
    for (QLReadRequestPB& ql_read_req : *mutable_req->mutable_ql_batch()) {
581
      // Update the remote endpoint.
582
7.54M
      ql_read_req.set_allocated_remote_endpoint(&host_port_pb_);
583
7.54M
      ql_read_req.set_allocated_proxy_uuid(mutable_req->mutable_proxy_uuid());
584
7.54M
      auto se = ScopeExit([&ql_read_req] {
585
7.53M
        ql_read_req.release_remote_endpoint();
586
7.53M
        ql_read_req.release_proxy_uuid();
587
7.53M
      });
588
589
7.54M
      tablet::QLReadRequestResult result;
590
7.54M
      TRACE("Start HandleQLReadRequest");
591
7.54M
      RETURN_NOT_OK(abstract_tablet_->HandleQLReadRequest(
592
7.54M
          context_.GetClientDeadline(), read_time, ql_read_req, req_->transaction(), &result));
593
7.54M
      TRACE("Done HandleQLReadRequest");
594
7.54M
      if (result.restart_read_ht.is_valid()) {
595
21
        return FormRestartReadHybridTime(result.restart_read_ht);
596
21
      }
597
7.54M
      result.response.set_rows_data_sidecar(
598
7.54M
          narrow_cast<int32_t>(context_.AddRpcSidecar(result.rows_data)));
599
7.54M
      resp_->add_ql_batch()->Swap(&result.response);
600
7.54M
    }
601
7.54M
    return ReadHybridTime();
602
7.54M
  }
603
604
1.82M
  if (!req_->pgsql_batch().empty()) {
605
1.81M
    ReadRequestPB* mutable_req = const_cast<ReadRequestPB*>(req_);
606
1.81M
    size_t total_num_rows_read = 0;
607
3.47M
    for (PgsqlReadRequestPB& pgsql_read_req : *mutable_req->mutable_pgsql_batch()) {
608
3.47M
      tablet::PgsqlReadRequestResult result;
609
3.47M
      TRACE("Start HandlePgsqlReadRequest");
610
3.47M
      size_t num_rows_read;
611
3.47M
      RETURN_NOT_OK(abstract_tablet_->HandlePgsqlReadRequest(
612
3.47M
          context_.GetClientDeadline(), read_time,
613
3.47M
          !allow_retry_ /* is_explicit_request_read_time */, pgsql_read_req, req_->transaction(),
614
3.47M
          req_->subtransaction(), &result, &num_rows_read));
615
616
3.47M
      total_num_rows_read += num_rows_read;
617
618
3.47M
      TRACE("Done HandlePgsqlReadRequest");
619
3.47M
      if (result.restart_read_ht.is_valid()) {
620
1.85k
        return FormRestartReadHybridTime(result.restart_read_ht);
621
1.85k
      }
622
3.47M
      result.response.set_rows_data_sidecar(
623
3.47M
          narrow_cast<int32_t>(context_.AddRpcSidecar(result.rows_data)));
624
3.47M
      resp_->add_pgsql_batch()->Swap(&result.response);
625
3.47M
    }
626
627
1.81M
    if (req_->consistency_level() == YBConsistencyLevel::CONSISTENT_PREFIX &&
628
1.81M
        
total_num_rows_read > 093
) {
629
84
      tablet()->metrics()->pgsql_consistent_prefix_read_rows->IncrementBy(total_num_rows_read);
630
84
    }
631
1.81M
    return ReadHybridTime();
632
1.81M
  }
633
634
4.05k
  if (abstract_tablet_->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
635
0
    return STATUS(NotSupported, "Transaction status table does not support read");
636
0
  }
637
638
4.05k
  return ReadHybridTime();
639
4.05k
}
640
641
} // namespace
642
643
void PerformRead(
644
    TabletServerIf* server, ReadTabletProvider* read_tablet_provider,
645
9.55M
    const ReadRequestPB* req, ReadResponsePB* resp, rpc::RpcContext context) {
646
9.55M
  auto read_query = std::make_shared<ReadQuery>(
647
9.55M
      server, read_tablet_provider, req, resp, std::move(context));
648
9.55M
  read_query->Perform();
649
9.55M
}
650
651
}  // namespace tserver
652
}  // namespace yb