YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tserver/read_query.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tserver/read_query.h"
15
16
#include "yb/common/row_mark.h"
17
#include "yb/common/transaction.h"
18
19
#include "yb/gutil/bind.h"
20
21
#include "yb/tablet/operations/write_operation.h"
22
#include "yb/tablet/read_result.h"
23
#include "yb/tablet/tablet.h"
24
#include "yb/tablet/tablet_metadata.h"
25
#include "yb/tablet/tablet_metrics.h"
26
#include "yb/tablet/transaction_participant.h"
27
#include "yb/tablet/write_query.h"
28
29
#include "yb/tserver/service_util.h"
30
#include "yb/tserver/tablet_server_interface.h"
31
#include "yb/tserver/ts_tablet_manager.h"
32
#include "yb/tserver/tserver.pb.h"
33
34
#include "yb/util/countdown_latch.h"
35
#include "yb/util/debug/trace_event.h"
36
#include "yb/util/flag_tags.h"
37
#include "yb/util/metrics.h"
38
#include "yb/util/scope_exit.h"
39
#include "yb/util/trace.h"
40
41
using namespace std::literals;
42
43
DEFINE_test_flag(int32, transactional_read_delay_ms, 0,
44
                 "Amount of time to delay between transaction status check and reading start.");
45
46
DEFINE_test_flag(int32, simulate_time_out_failures_msecs, 0, "If greater than 0, we will randomly "
47
                 "mark read requests as timed out and sleep for the specificed amount of time by "
48
                 "this flag to simulate time out failures. The requester will mark the timed out "
49
                 "replica as failed, and its periodic refresh mechanism for the lookup cache will "
50
                 "mark them as available.");
51
52
DEFINE_test_flag(bool, assert_reads_served_by_follower, false, "If set, we verify that the "
53
                 "consistency level is CONSISTENT_PREFIX, and that this server is not the leader "
54
                 "for the tablet");
55
56
DEFINE_bool(parallelize_read_ops, true,
57
            "Controls whether multiple (Redis) read ops that are present in a operation "
58
            "should be executed in parallel.");
59
TAG_FLAG(parallelize_read_ops, advanced);
60
TAG_FLAG(parallelize_read_ops, runtime);
61
62
namespace yb {
63
namespace tserver {
64
65
namespace {
66
67
void HandleRedisReadRequestAsync(
68
    tablet::AbstractTablet* tablet,
69
    CoarseTimePoint deadline,
70
    const ReadHybridTime& read_time,
71
    const RedisReadRequestPB& redis_read_request,
72
    RedisResponsePB* response,
73
    const std::function<void(const Status& s)>& status_cb
74
42.9k
) {
75
42.9k
  status_cb(tablet->HandleRedisReadRequest(deadline, read_time, redis_read_request, response));
76
42.9k
}
77
78
class ReadQuery : public std::enable_shared_from_this<ReadQuery>, public rpc::ThreadPoolTask {
79
 public:
80
  ReadQuery(
81
      TabletServerIf* server, ReadTabletProvider* read_tablet_provider, const ReadRequestPB* req,
82
      ReadResponsePB* resp, rpc::RpcContext context)
83
      : server_(*server), read_tablet_provider_(*read_tablet_provider), req_(req), resp_(resp),
84
4.69M
        context_(std::move(context)) {}
85
86
4.70M
  void Perform() {
87
4.70M
    RespondIfFailed(DoPerform());
88
4.70M
  }
89
90
4.89M
  void RespondIfFailed(const Status& status) {
91
4.89M
    if (!status.ok()) {
92
11.5k
      RespondFailure(status);
93
11.5k
    }
94
4.89M
  }
95
96
54.7k
  void RespondFailure(const Status& status) {
97
54.7k
    SetupErrorAndRespond(resp_->mutable_error(), status, &context_);
98
54.7k
  }
99
100
4.69M
  virtual ~ReadQuery() = default;
101
102
 private:
103
  CHECKED_STATUS DoPerform();
104
105
  // Picks read based for specified read context.
106
  CHECKED_STATUS DoPickReadTime(server::Clock* clock);
107
108
  bool transactional() const;
109
110
  tablet::Tablet* tablet() const;
111
112
  ReadHybridTime FormRestartReadHybridTime(const HybridTime& restart_time) const;
113
114
  CHECKED_STATUS PickReadTime(server::Clock* clock);
115
116
  bool IsForBackfill() const;
117
118
  // Read implementation. If restart is required returns restart time, in case of success
119
  // returns invalid ReadHybridTime. Otherwise returns error status.
120
  Result<ReadHybridTime> DoRead();
121
  Result<ReadHybridTime> DoReadImpl();
122
123
  CHECKED_STATUS Complete();
124
125
  void UpdateConsistentPrefixMetrics();
126
127
  // Used when we write intents during read, i.e. for serializable isolation.
128
  // We cannot proceed with read from completion callback, to avoid holding
129
  // replica state lock for too long.
130
  // So ThreadPool is used to proceed with read.
131
97.6k
  void Run() override {
132
97.6k
    auto status = PickReadTime(server_.Clock());
133
97.6k
    if (status.ok()) {
134
97.6k
      status = Complete();
135
97.6k
    }
136
97.6k
    RespondIfFailed(status);
137
97.6k
  }
138
139
97.6k
  void Done(const Status& status) override {
140
97.6k
    RespondIfFailed(status);
141
97.6k
    retained_self_ = nullptr;
142
97.6k
  }
143
144
  TabletServerIf& server_;
145
  ReadTabletProvider& read_tablet_provider_;
146
  const ReadRequestPB* req_;
147
  ReadResponsePB* resp_;
148
  rpc::RpcContext context_;
149
150
  std::shared_ptr<tablet::AbstractTablet> abstract_tablet_;
151
152
  ReadHybridTime read_time_;
153
  HybridTime safe_ht_to_read_;
154
  ReadHybridTime used_read_time_;
155
  tablet::RequireLease require_lease_ = tablet::RequireLease::kFalse;
156
  HostPortPB host_port_pb_;
157
  bool allow_retry_ = false;
158
  RequestScope request_scope_;
159
  std::shared_ptr<ReadQuery> retained_self_;
160
};
161
162
8.75M
bool ReadQuery::transactional() const {
163
8.75M
  return abstract_tablet_->IsTransactionalRequest(req_->pgsql_batch_size() > 0);
164
8.75M
}
165
166
5.28M
tablet::Tablet* ReadQuery::tablet() const {
167
5.28M
  return down_cast<tablet::Tablet*>(abstract_tablet_.get());
168
5.28M
}
169
170
42
ReadHybridTime ReadQuery::FormRestartReadHybridTime(const HybridTime& restart_time) const {
171
42
  DCHECK_GT(restart_time, read_time_.read);
172
0
  VLOG(1) << "Restart read required at: " << restart_time << ", original: " << read_time_;
173
42
  auto result = read_time_;
174
42
  result.read = std::min(std::max(restart_time, safe_ht_to_read_), read_time_.global_limit);
175
42
  result.local_limit = std::min(safe_ht_to_read_, read_time_.global_limit);
176
42
  return result;
177
42
}
178
179
4.65M
CHECKED_STATUS ReadQuery::PickReadTime(server::Clock* clock) {
180
4.65M
  auto result = DoPickReadTime(clock);
181
4.65M
  if (!result.ok()) {
182
0
    TRACE(result.ToString());
183
0
  }
184
4.65M
  return result;
185
4.65M
}
186
187
4.64M
bool ReadQuery::IsForBackfill() const {
188
4.64M
  if (req_->pgsql_batch_size() > 0) {
189
664k
    if (req_->pgsql_batch(0).is_for_backfill()) {
190
      // Currently, read requests for backfill should only come by themselves, not in batches.
191
289
      DCHECK_EQ(req_->pgsql_batch_size(), 1);
192
289
      return true;
193
289
    }
194
4.64M
  }
195
  // YCQL doesn't send read RPCs for scanning the indexed table and instead directly reads using
196
  // iterator, so there's no equivalent logic for YCQL here.
197
4.64M
  return false;
198
4.64M
}
199
200
4.69M
CHECKED_STATUS ReadQuery::DoPerform() {
201
4.69M
  TRACE("Start Read");
202
4.69M
  TRACE_EVENT1("tserver", "TabletServiceImpl::Read", "tablet_id", req_->tablet_id());
203
1.48k
  VLOG(2) << "Received Read RPC: " << req_->DebugString();
204
  // Unfortunately, determining the isolation level is not as straightforward as it seems. All but
205
  // the first request to a given tablet by a particular transaction assume that the tablet already
206
  // has the transaction metadata, including the isolation level, and those requests expect us to
207
  // retrieve the isolation level from that metadata. Failure to do so was the cause of a
208
  // serialization anomaly tested by TestOneOrTwoAdmins
209
  // (https://github.com/yugabyte/yugabyte-db/issues/1572).
210
211
4.69M
  bool serializable_isolation = false;
212
4.69M
  TabletPeerTablet peer_tablet;
213
4.69M
  if (req_->has_transaction()) {
214
367k
    IsolationLevel isolation_level;
215
367k
    if (req_->transaction().has_isolation()) {
216
      // This must be the first request to this tablet by this particular transaction.
217
238k
      isolation_level = req_->transaction().isolation();
218
129k
    } else {
219
129k
      peer_tablet = VERIFY_RESULT(LookupTabletPeer(
220
129k
          server_.tablet_peer_lookup(), req_->tablet_id()));
221
129k
      isolation_level = VERIFY_RESULT(peer_tablet.tablet->GetIsolationLevelFromPB(*req_));
222
129k
    }
223
367k
    serializable_isolation = isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION;
224
225
367k
    if (PREDICT_FALSE(FLAGS_TEST_transactional_read_delay_ms > 0)) {
226
0
      LOG(INFO) << "Delaying transactional read for "
227
0
                << FLAGS_TEST_transactional_read_delay_ms << " ms.";
228
0
      SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_transactional_read_delay_ms));
229
0
    }
230
231
#if defined(DUMP_READ)
232
    if (req->pgsql_batch().size() > 0) {
233
      LOG(INFO) << CHECK_RESULT(FullyDecodeTransactionId(req->transaction().transaction_id()))
234
                << " READ: " << req->pgsql_batch(0).partition_column_values(0).value().int32_value()
235
                << ", " << isolation_level;
236
    }
237
#endif
238
367k
  }
239
240
  // Get the most restrictive row mark present in the batch of PostgreSQL requests.
241
  // TODO: rather handle individual row marks once we start batching read requests (issue #2495)
242
4.69M
  RowMarkType batch_row_mark = RowMarkType::ROW_MARK_ABSENT;
243
4.69M
  if (!req_->pgsql_batch().empty()) {
244
708k
    uint64_t last_breaking_catalog_version = 0; // unset.
245
1.89M
    for (const auto& pg_req : req_->pgsql_batch()) {
246
      // For postgres requests check that the syscatalog version matches.
247
1.89M
      if (pg_req.has_ysql_catalog_version()) {
248
1.47M
        if (last_breaking_catalog_version == 0) {
249
          // Initialize last breaking version if not yet set.
250
281k
          server_.get_ysql_catalog_version(
251
281k
              nullptr /* current_version */, &last_breaking_catalog_version);
252
281k
        }
253
1.47M
        if (pg_req.ysql_catalog_version() < last_breaking_catalog_version) {
254
3
          return STATUS(
255
3
              QLError, "The catalog snapshot used for this transaction has been invalidated",
256
3
              TabletServerError(TabletServerErrorPB::MISMATCHED_SCHEMA));
257
3
        }
258
1.89M
      }
259
1.89M
      RowMarkType current_row_mark = GetRowMarkTypeFromPB(pg_req);
260
1.89M
      if (IsValidRowMarkType(current_row_mark)) {
261
7.78k
        if (!req_->has_transaction()) {
262
0
          return STATUS(
263
0
              NotSupported, "Read request with row mark types must be part of a transaction",
264
0
              TabletServerError(TabletServerErrorPB::OPERATION_NOT_SUPPORTED));
265
0
        }
266
7.78k
        batch_row_mark = GetStrongestRowMarkType({current_row_mark, batch_row_mark});
267
7.78k
      }
268
1.89M
    }
269
708k
  }
270
4.69M
  const bool has_row_mark = IsValidRowMarkType(batch_row_mark);
271
272
4.69M
  LeaderTabletPeer leader_peer;
273
274
4.69M
  if (serializable_isolation || has_row_mark) {
275
    // At this point we expect that we don't have pure read serializable transactions, and
276
    // always write read intents to detect conflicts with other writes.
277
140k
    leader_peer = VERIFY_RESULT(LookupLeaderTablet(
278
140k
        server_.tablet_peer_lookup(), req_->tablet_id(), std::move(peer_tablet)));
279
    // Serializable read adds intents, i.e. writes data.
280
    // We should check for memory pressure in this case.
281
140k
    RETURN_NOT_OK(CheckWriteThrottling(req_->rejection_score(), leader_peer.peer.get()));
282
140k
    abstract_tablet_ = leader_peer.peer->shared_tablet();
283
4.55M
  } else {
284
4.54M
    abstract_tablet_ = VERIFY_RESULT(read_tablet_provider_.GetTabletForRead(
285
4.54M
        req_->tablet_id(), std::move(peer_tablet.tablet_peer),
286
4.54M
        req_->consistency_level(), AllowSplitTablet::kFalse));
287
4.54M
    leader_peer.leader_term = OpId::kUnknownTerm;
288
4.54M
  }
289
290
4.68M
  if (PREDICT_FALSE(FLAGS_TEST_assert_reads_served_by_follower)) {
291
2.00k
    if (req_->consistency_level() == YBConsistencyLevel::STRONG) {
292
0
      LOG(FATAL) << "--TEST_assert_reads_served_by_follower is true but consistency level is "
293
0
                    "invalid: YBConsistencyLevel::STRONG";
294
0
    }
295
2.00k
    tablet::TabletPeerPtr tablet_peer;
296
2.00k
    RETURN_NOT_OK(server_.tablet_peer_lookup()->GetTabletPeer(req_->tablet_id(), &tablet_peer));
297
2.00k
    if (CheckPeerIsLeader(*tablet_peer).ok()) {
298
0
      LOG(FATAL) << "--TEST_assert_reads_served_by_follower is true but read is being served by "
299
0
                 << " peer " << tablet_peer->permanent_uuid()
300
0
                 << " which is the leader for tablet " << req_->tablet_id();
301
0
    }
302
2.00k
  }
303
304
4.68M
  if (!abstract_tablet_->system() && tablet()->metadata()->hidden()) {
305
0
    return STATUS(NotFound, "Tablet not found", req_->tablet_id());
306
0
  }
307
308
4.68M
  if (FLAGS_TEST_simulate_time_out_failures_msecs > 0 && RandomUniformInt(0, 10) < 2) {
309
84
    LOG(INFO) << "Marking request as timed out for test: " << req_->ShortDebugString();
310
84
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_simulate_time_out_failures_msecs));
311
84
    return STATUS(TimedOut, "timed out for test");
312
84
  }
313
314
4.68M
  if (server_.Clock()) {
315
4.68M
    server::UpdateClock(*req_, server_.Clock());
316
4.68M
  }
317
318
  // safe_ht_to_read is used only for read restart, so if read_time is valid, then we would respond
319
  // with "restart required".
320
4.68M
  read_time_ = ReadHybridTime::FromReadTimePB(*req_);
321
322
4.68M
  allow_retry_ = !read_time_;
323
4.68M
  require_lease_ = tablet::RequireLease(req_->consistency_level() == YBConsistencyLevel::STRONG);
324
  // TODO: should check all the tables referenced by the requests to decide if it is transactional.
325
4.68M
  const bool transactional = this->transactional();
326
  // Should not pick read time for serializable isolation, since it is picked after read intents
327
  // are added. Also conflict resolution for serializable isolation should be done without read time
328
  // specified. So we use max hybrid time for conflict resolution in such case.
329
  // It was implemented as part of #655.
330
4.68M
  if (!serializable_isolation) {
331
4.55M
    RETURN_NOT_OK(PickReadTime(server_.Clock()));
332
4.55M
  }
333
334
4.68M
  if (transactional) {
335
    // Serial number is used to check whether this operation was initiated before
336
    // transaction status request. So we should initialize it as soon as possible.
337
732k
    request_scope_ = RequestScope(tablet()->transaction_participant());
338
732k
    read_time_.serial_no = request_scope_.request_id();
339
732k
  }
340
341
4.68M
  const auto& remote_address = context_.remote_address();
342
4.68M
  host_port_pb_.set_host(remote_address.address().to_string());
343
4.68M
  host_port_pb_.set_port(remote_address.port());
344
345
4.68M
  if (serializable_isolation || has_row_mark) {
346
140k
    auto deadline = context_.GetClientDeadline();
347
140k
    auto query = std::make_unique<tablet::WriteQuery>(
348
140k
        leader_peer.leader_term, deadline, leader_peer.peer.get(),
349
140k
        leader_peer.peer->tablet(), nullptr /* response */,
350
140k
        docdb::OperationKind::kRead);
351
352
140k
    auto& write = *query->operation().AllocateRequest();
353
140k
    auto& write_batch = *write.mutable_write_batch();
354
140k
    *write_batch.mutable_transaction() = req_->transaction();
355
140k
    if (has_row_mark) {
356
7.77k
      write_batch.set_row_mark_type(batch_row_mark);
357
7.77k
      query->set_read_time(read_time_);
358
7.77k
    }
359
140k
    write.set_unused_tablet_id(""); // For backward compatibility.
360
140k
    write_batch.set_deprecated_may_have_metadata(true);
361
140k
    write.set_batch_idx(req_->batch_idx());
362
    // TODO(dtxn) write request id
363
364
140k
    RETURN_NOT_OK(leader_peer.peer->tablet()->CreateReadIntents(
365
140k
        req_->transaction(), req_->subtransaction(), req_->ql_batch(), req_->pgsql_batch(),
366
140k
        &write_batch));
367
368
140k
    query->AdjustYsqlQueryTransactionality(req_->pgsql_batch_size());
369
370
140k
    query->set_callback([peer = leader_peer.peer, self = shared_from_this()](const Status& status) {
371
140k
      if (!status.ok()) {
372
43.2k
        self->RespondFailure(status);
373
97.6k
      } else {
374
97.6k
        self->retained_self_ = self;
375
97.6k
        peer->Enqueue(self.get());
376
97.6k
      }
377
140k
    });
378
140k
    leader_peer.peer->WriteAsync(std::move(query));
379
140k
    return Status::OK();
380
4.54M
  }
381
382
4.54M
  if (req_->consistency_level() == YBConsistencyLevel::CONSISTENT_PREFIX) {
383
11.7k
    if (abstract_tablet_) {
384
11.7k
      tablet()->metrics()->consistent_prefix_read_requests->Increment();
385
11.7k
    }
386
11.7k
  }
387
388
4.54M
  return Complete();
389
4.54M
}
390
391
4.65M
CHECKED_STATUS ReadQuery::DoPickReadTime(server::Clock* clock) {
392
4.65M
  if (!read_time_) {
393
4.07M
    safe_ht_to_read_ = VERIFY_RESULT(abstract_tablet_->SafeTime(require_lease_));
394
    // If the read time is not specified, then it is a single-shard read.
395
    // So we should restart it in server in case of failure.
396
4.07M
    read_time_.read = safe_ht_to_read_;
397
4.07M
    if (transactional()) {
398
114k
      read_time_.global_limit = clock->MaxGlobalNow();
399
114k
      read_time_.local_limit = std::min(safe_ht_to_read_, read_time_.global_limit);
400
401
2
      VLOG(1) << "Read time: " << read_time_.ToString();
402
3.95M
    } else {
403
3.95M
      read_time_.local_limit = read_time_.read;
404
3.95M
      read_time_.global_limit = read_time_.read;
405
3.95M
    }
406
582k
  } else {
407
582k
    safe_ht_to_read_ = VERIFY_RESULT(abstract_tablet_->SafeTime(
408
582k
        require_lease_, read_time_.read, context_.GetClientDeadline()));
409
582k
  }
410
4.65M
  return Status::OK();
411
4.65M
}
412
413
4.63M
CHECKED_STATUS ReadQuery::Complete() {
414
4.63M
  for (;;) {
415
4.63M
    resp_->Clear();
416
4.63M
    context_.ResetRpcSidecars();
417
3.01k
    VLOG(1) << "Read time: " << read_time_ << ", safe: " << safe_ht_to_read_;
418
4.63M
    const auto result = VERIFY_RESULT(DoRead());
419
4.63M
    if (allow_retry_ && read_time_ && read_time_ == result) {
420
0
      YB_LOG_EVERY_N_SECS(DFATAL, 5)
421
0
          << __func__ << ", restarting read with the same read time: " << result << THROTTLE_MSG;
422
0
      allow_retry_ = false;
423
0
    }
424
4.63M
    read_time_ = result;
425
    // If read was successful, then restart time is invalid. Finishing.
426
    // (If a read restart was requested, then read_time would be set to the time at which we have
427
    // to restart.)
428
4.63M
    if (!read_time_) {
429
      // allow_retry means that that the read time was not set in the request and therefore we can
430
      // retry read restarts on the tablet server.
431
4.63M
      if (!allow_retry_) {
432
575k
        auto local_limit = std::min(safe_ht_to_read_, used_read_time_.global_limit);
433
575k
        resp_->set_local_limit_ht(local_limit.ToUint64());
434
575k
      }
435
4.63M
      break;
436
4.63M
    }
437
18.4E
    if (!allow_retry_) {
438
      // If the read time is specified, then we read as part of a transaction. So we should restart
439
      // whole transaction. In this case we report restart time and abort reading.
440
42
      resp_->Clear();
441
42
      auto restart_read_time = resp_->mutable_restart_read_time();
442
42
      restart_read_time->set_read_ht(read_time_.read.ToUint64());
443
42
      restart_read_time->set_deprecated_max_of_read_time_and_local_limit_ht(
444
42
          read_time_.local_limit.ToUint64());
445
42
      restart_read_time->set_local_limit_ht(read_time_.local_limit.ToUint64());
446
      // Global limit is ignored by caller, so we don't set it.
447
42
      tablet()->metrics()->restart_read_requests->Increment();
448
42
      break;
449
42
    }
450
451
18.4E
    if (CoarseMonoClock::now() > context_.GetClientDeadline()) {
452
0
      TRACE("Read timed out");
453
0
      return STATUS(TimedOut, "Read timed out");
454
0
    }
455
18.4E
  }
456
4.63M
  if (req_->include_trace() && Trace::CurrentTrace() != nullptr) {
457
0
    resp_->set_trace_buffer(Trace::CurrentTrace()->DumpToString(true));
458
0
  }
459
460
  // In case read time was not specified (i.e. allow_retry is true)
461
  // we just picked a read time and we should communicate it back to the caller.
462
4.63M
  if (allow_retry_) {
463
4.06M
    used_read_time_.ToPB(resp_->mutable_used_read_time());
464
4.06M
  }
465
466
  // Useful when debugging transactions
467
#if defined(DUMP_READ)
468
  if (read_context->req->has_transaction() && read_context->req->pgsql_batch().size() == 1 &&
469
      read_context->req->pgsql_batch()[0].partition_column_values().size() == 1 &&
470
      read_context->resp->pgsql_batch().size() == 1 &&
471
      read_context->resp->pgsql_batch()[0].rows_data_sidecar() == 0) {
472
    auto txn_id = CHECK_RESULT(FullyDecodeTransactionId(
473
        read_context->req->transaction().transaction_id()));
474
    auto value_slice = read_context->context->RpcSidecar(0).as_slice();
475
    auto num = BigEndian::Load64(value_slice.data());
476
    std::string result;
477
    if (num == 0) {
478
      result = "<NONE>";
479
    } else if (num == 1) {
480
      auto len = BigEndian::Load64(value_slice.data() + 14) - 1;
481
      result = Slice(value_slice.data() + 22, len).ToBuffer();
482
    } else {
483
      result = value_slice.ToDebugHexString();
484
    }
485
    auto key = read_context->req->pgsql_batch(0).partition_column_values(0).value().int32_value();
486
    LOG(INFO) << txn_id << " READ DONE: " << key << " = " << result;
487
  }
488
#endif
489
490
4.63M
  MakeRpcOperationCompletionCallback<ReadResponsePB>(
491
4.63M
      std::move(context_), resp_, server_.Clock())(Status::OK());
492
4.63M
  TRACE("Done Read");
493
494
4.63M
  return Status::OK();
495
4.63M
}
496
497
4.63M
Result<ReadHybridTime> ReadQuery::DoRead() {
498
4.63M
  Result<ReadHybridTime> result{ReadHybridTime()};
499
4.63M
  {
500
4.63M
    LongOperationTracker long_operation_tracker("Read", 1s);
501
4.63M
    result = DoReadImpl();
502
4.63M
  }
503
  // Check transaction is still alive in case read was successful
504
  // and data has been written earlier into current tablet in context of current transaction.
505
4.63M
  const auto* transaction =
506
4.31M
      req_->has_transaction() ? &req_->transaction() : nullptr;
507
4.64M
  if (result.ok() && transaction && transaction->isolation() == NON_TRANSACTIONAL) {
508
129k
    const auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(transaction->transaction_id()));
509
129k
    auto& txn_participant = *tablet()->transaction_participant();
510
129k
    RETURN_NOT_OK(txn_participant.CheckAborted(txn_id));
511
129k
  }
512
4.63M
  return result;
513
4.63M
}
514
515
4.64M
Result<ReadHybridTime> ReadQuery::DoReadImpl() {
516
4.64M
  ReadHybridTime read_time;
517
4.64M
  tablet::ScopedReadOperation read_tx;
518
4.64M
  if (IsForBackfill()) {
519
289
    read_time = read_time_;
520
4.64M
  } else {
521
4.64M
    read_tx = VERIFY_RESULT(
522
4.64M
        tablet::ScopedReadOperation::Create(abstract_tablet_.get(), require_lease_, read_time_));
523
4.64M
    read_time = read_tx.read_time();
524
4.64M
  }
525
4.64M
  used_read_time_ = read_time;
526
4.64M
  if (!req_->redis_batch().empty()) {
527
    // Assert the primary table is a redis table.
528
42.9k
    DCHECK_EQ(abstract_tablet_->table_type(), TableType::REDIS_TABLE_TYPE);
529
42.9k
    auto count = req_->redis_batch_size();
530
42.9k
    std::vector<Status> rets(count);
531
42.9k
    CountDownLatch latch(count);
532
85.8k
    for (int idx = 0; idx < count; idx++) {
533
42.9k
      const RedisReadRequestPB& redis_read_req = req_->redis_batch(idx);
534
42.9k
      Status &failed_status_ = rets[idx];
535
42.9k
      auto cb = [&latch, &failed_status_] (const Status &status) -> void {
536
42.9k
                  if (!status.ok())
537
0
                    failed_status_ = status;
538
42.9k
                  latch.CountDown(1);
539
42.9k
                };
540
42.9k
      auto func = Bind(
541
42.9k
          &HandleRedisReadRequestAsync,
542
42.9k
          Unretained(abstract_tablet_.get()),
543
42.9k
          context_.GetClientDeadline(),
544
42.9k
          read_time,
545
42.9k
          redis_read_req,
546
42.9k
          Unretained(resp_->add_redis_batch()),
547
42.9k
          cb);
548
549
42.9k
      Status s;
550
42.9k
      bool run_async = FLAGS_parallelize_read_ops && (idx != count - 1);
551
42.9k
      if (run_async) {
552
0
        s = server_.tablet_manager()->read_pool()->SubmitClosure(func);
553
0
      }
554
555
42.9k
      if (!s.ok() || !run_async) {
556
42.9k
        func.Run();
557
42.9k
      }
558
42.9k
    }
559
42.9k
    latch.Wait();
560
42.9k
    std::vector<Status> failed;
561
42.9k
    for (auto& status : rets) {
562
42.9k
      if (!status.ok()) {
563
0
        failed.push_back(status);
564
0
      }
565
42.9k
    }
566
42.9k
    if (failed.size() == 0) {
567
      // TODO(dtxn) implement read restart for Redis.
568
42.9k
      return ReadHybridTime();
569
0
    } else if (failed.size() == 1) {
570
0
      return failed[0];
571
0
    } else {
572
0
      return STATUS(Combined, VectorToString(failed));
573
0
    }
574
4.60M
  }
575
576
4.60M
  if (!req_->ql_batch().empty()) {
577
    // Assert the primary table is a YQL table.
578
3.93M
    DCHECK_EQ(abstract_tablet_->table_type(), TableType::YQL_TABLE_TYPE);
579
3.93M
    ReadRequestPB* mutable_req = const_cast<ReadRequestPB*>(req_);
580
3.93M
    for (QLReadRequestPB& ql_read_req : *mutable_req->mutable_ql_batch()) {
581
      // Update the remote endpoint.
582
3.93M
      ql_read_req.set_allocated_remote_endpoint(&host_port_pb_);
583
3.93M
      ql_read_req.set_allocated_proxy_uuid(mutable_req->mutable_proxy_uuid());
584
3.92M
      auto se = ScopeExit([&ql_read_req] {
585
3.92M
        ql_read_req.release_remote_endpoint();
586
3.92M
        ql_read_req.release_proxy_uuid();
587
3.92M
      });
588
589
3.93M
      tablet::QLReadRequestResult result;
590
3.93M
      TRACE("Start HandleQLReadRequest");
591
3.93M
      RETURN_NOT_OK(abstract_tablet_->HandleQLReadRequest(
592
3.93M
          context_.GetClientDeadline(), read_time, ql_read_req, req_->transaction(), &result));
593
3.93M
      TRACE("Done HandleQLReadRequest");
594
3.93M
      if (result.restart_read_ht.is_valid()) {
595
41
        return FormRestartReadHybridTime(result.restart_read_ht);
596
41
      }
597
3.93M
      result.response.set_rows_data_sidecar(
598
3.93M
          narrow_cast<int32_t>(context_.AddRpcSidecar(result.rows_data)));
599
3.93M
      resp_->add_ql_batch()->Swap(&result.response);
600
3.93M
    }
601
3.93M
    return ReadHybridTime();
602
666k
  }
603
604
666k
  if (!req_->pgsql_batch().empty()) {
605
664k
    ReadRequestPB* mutable_req = const_cast<ReadRequestPB*>(req_);
606
664k
    size_t total_num_rows_read = 0;
607
1.50M
    for (PgsqlReadRequestPB& pgsql_read_req : *mutable_req->mutable_pgsql_batch()) {
608
1.50M
      tablet::PgsqlReadRequestResult result;
609
1.50M
      TRACE("Start HandlePgsqlReadRequest");
610
1.50M
      size_t num_rows_read;
611
1.50M
      RETURN_NOT_OK(abstract_tablet_->HandlePgsqlReadRequest(
612
1.50M
          context_.GetClientDeadline(), read_time,
613
1.50M
          !allow_retry_ /* is_explicit_request_read_time */, pgsql_read_req, req_->transaction(),
614
1.50M
          req_->subtransaction(), &result, &num_rows_read));
615
616
1.50M
      total_num_rows_read += num_rows_read;
617
618
1.50M
      TRACE("Done HandlePgsqlReadRequest");
619
1.50M
      if (result.restart_read_ht.is_valid()) {
620
1
        return FormRestartReadHybridTime(result.restart_read_ht);
621
1
      }
622
1.50M
      result.response.set_rows_data_sidecar(
623
1.50M
          narrow_cast<int32_t>(context_.AddRpcSidecar(result.rows_data)));
624
1.50M
      resp_->add_pgsql_batch()->Swap(&result.response);
625
1.50M
    }
626
627
664k
    if (req_->consistency_level() == YBConsistencyLevel::CONSISTENT_PREFIX &&
628
0
        total_num_rows_read > 0) {
629
0
      tablet()->metrics()->pgsql_consistent_prefix_read_rows->IncrementBy(total_num_rows_read);
630
0
    }
631
664k
    return ReadHybridTime();
632
1.73k
  }
633
634
1.73k
  if (abstract_tablet_->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
635
0
    return STATUS(NotSupported, "Transaction status table does not support read");
636
0
  }
637
638
1.73k
  return ReadHybridTime();
639
1.73k
}
640
641
} // namespace
642
643
void PerformRead(
644
    TabletServerIf* server, ReadTabletProvider* read_tablet_provider,
645
4.69M
    const ReadRequestPB* req, ReadResponsePB* resp, rpc::RpcContext context) {
646
4.69M
  auto read_query = std::make_shared<ReadQuery>(
647
4.69M
      server, read_tablet_provider, req, resp, std::move(context));
648
4.69M
  read_query->Perform();
649
4.69M
}
650
651
}  // namespace tserver
652
}  // namespace yb