YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/yb_op.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/client/yb_op.h"
34
35
#include "yb/client/client.h"
36
#include "yb/client/client-internal.h"
37
#include "yb/client/meta_cache.h"
38
#include "yb/client/schema.h"
39
#include "yb/client/table.h"
40
41
#include "yb/common/ql_protocol.pb.h"
42
#include "yb/common/ql_rowblock.h"
43
#include "yb/common/ql_scanspec.h"
44
#include "yb/common/ql_type.h"
45
#include "yb/common/ql_value.h"
46
#include "yb/common/redis_protocol.pb.h"
47
#include "yb/common/row_mark.h"
48
#include "yb/common/schema.h"
49
#include "yb/common/wire_protocol.h"
50
#include "yb/common/wire_protocol.pb.h"
51
52
#include "yb/docdb/doc_key.h"
53
#include "yb/docdb/doc_scanspec_util.h"
54
#include "yb/docdb/primitive_value.h"
55
#include "yb/docdb/primitive_value_util.h"
56
#include "yb/rpc/rpc_controller.h"
57
58
#include "yb/tserver/tserver_service.proxy.h"
59
60
#include "yb/util/async_util.h"
61
#include "yb/util/flag_tags.h"
62
#include "yb/util/result.h"
63
#include "yb/util/status_format.h"
64
65
using namespace std::literals;
66
67
DEFINE_bool(redis_allow_reads_from_followers, false,
68
            "If true, the read will be served from the closest replica in the same AZ, which can "
69
            "be a follower.");
70
TAG_FLAG(redis_allow_reads_from_followers, evolving);
71
TAG_FLAG(redis_allow_reads_from_followers, runtime);
72
73
namespace yb {
74
namespace client {
75
76
using std::shared_ptr;
77
using std::unique_ptr;
78
79
namespace {
80
81
CHECKED_STATUS InitHashPartitionKey(
82
1.52M
    const Schema& schema, const PartitionSchema& partition_schema, PgsqlReadRequestPB* request) {
83
  // Read partition key from read request.
84
1.52M
  const auto &ybctid = request->ybctid_column_value().value();
85
86
  // Seek a specific partition_key from read_request.
87
  // 1. Not specified hash condition - Full scan.
88
  // 2. paging_state -- Set by server to continue current request.
89
  // 3. lower and upper bound -- Set by PgGate to query a specific set of hash values.
90
  // 4. hash column values -- Given to scan ONE SET of specfic hash values.
91
  // 5. range and regular condition - These are filter expression and will be processed by DocDB.
92
  //    Shouldn't we able to set RANGE boundary here?
93
94
  // If primary index lookup using ybctid requests are batched, there is a possibility that tablets
95
  // might get split after the batch of requests have been prepared. Hence, we need to execute the
96
  // prepared request in both tablet partitions. For this purpose, we use paging state to continue
97
  // executing the request in the second sub-partition after completing the first sub-partition.
98
  //
99
  // batched ybctids
100
  // In order to represent a single ybctid or a batch of ybctids, we leverage the lower bound and
101
  // upper bounds to set hash codes and max hash codes.
102
1.52M
  bool has_paging_state =
103
1.52M
      request->has_paging_state() && request->paging_state().has_next_partition_key();
104
1.52M
  if (has_paging_state) {
105
    // If this is a subsequent query, use the partition key from the paging state. This is only
106
    // supported for forward scan.
107
38.4k
    request->set_partition_key(request->paging_state().next_partition_key());
108
109
    // Check that the paging state hash_code is within [ hash_code, max_hash_code ] bounds.
110
38.4k
    if (schema.num_hash_key_columns() > 0 && !request->partition_key().empty()) {
111
38.4k
      uint16 paging_state_hash_code = PartitionSchema::DecodeMultiColumnHashValue(
112
38.4k
          request->partition_key());
113
38.4k
      if ((request->has_hash_code() && paging_state_hash_code < request->hash_code()) ||
114
38.4k
          (request->has_max_hash_code() && paging_state_hash_code > request->max_hash_code())) {
115
0
        return STATUS_SUBSTITUTE(
116
0
            InternalError,
117
0
            "Out of bounds partition key found in paging state:"
118
0
            "Query's partition bounds: [$0, $1], paging state partition: $2",
119
0
            request->has_hash_code() ? request->hash_code() : 0,
120
0
            request->has_max_hash_code() ? request->max_hash_code() : 0,
121
0
            paging_state_hash_code);
122
0
      }
123
38.4k
      request->set_hash_code(paging_state_hash_code);
124
38.4k
    }
125
1.48M
  } else if (!IsNull(ybctid)) {
126
10.3k
    const auto hash_code = VERIFY_RESULT(docdb::DocKey::DecodeHash(ybctid.binary_value()));
127
10.3k
    request->set_partition_key(PartitionSchema::EncodeMultiColumnHashValue(hash_code));
128
1.47M
  } else if (request->has_lower_bound() || request->has_upper_bound()) {
129
    // If the read request does not provide a specific partition key, but it does provide scan
130
    // boundary, use the given boundary to setup the scan lower and upper bound.
131
1.45k
    if (request->has_lower_bound()) {
132
1.21k
      auto hash = PartitionSchema::DecodeMultiColumnHashValue(request->lower_bound().key());
133
1.21k
      if (!request->lower_bound().is_inclusive()) {
134
0
        ++hash;
135
0
      }
136
1.21k
      request->set_hash_code(hash);
137
138
      // Set partition key to lower bound.
139
1.21k
      request->set_partition_key(request->lower_bound().key());
140
1.21k
    }
141
1.45k
    if (request->has_upper_bound()) {
142
1.21k
      auto hash = PartitionSchema::DecodeMultiColumnHashValue(request->upper_bound().key());
143
1.21k
      if (!request->upper_bound().is_inclusive()) {
144
1.21k
        --hash;
145
1.21k
      }
146
1.21k
      request->set_max_hash_code(hash);
147
1.21k
    }
148
149
1.46M
  } else if (!request->partition_column_values().empty()) {
150
    // If hashed columns are set, use them to compute the exact key and set the bounds
151
1.45M
    RETURN_NOT_OK(partition_schema.EncodeKey(
152
1.45M
        request->partition_column_values(), request->mutable_partition_key()));
153
154
    // Make sure given key is not smaller than lower bound (if any)
155
1.45M
    if (request->has_hash_code()) {
156
0
      auto hash_code = static_cast<uint16>(request->hash_code());
157
0
      auto lower_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
158
0
      if (request->partition_key() < lower_bound) {
159
0
        request->set_partition_key(std::move(lower_bound));
160
0
      }
161
0
    }
162
163
    // Make sure given key is not bigger than upper bound (if any)
164
1.45M
    if (request->has_max_hash_code()) {
165
0
      auto hash_code = static_cast<uint16>(request->max_hash_code());
166
0
      auto upper_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
167
0
      if (request->partition_key() > upper_bound) {
168
0
        request->set_partition_key(std::move(upper_bound));
169
0
      }
170
0
    }
171
172
1.45M
    if (!request->partition_key().empty()) {
173
      // If one specifc partition_key is found, set both bounds to equal partition key now because
174
      // this is a point get.
175
1.45M
      auto hash_code = PartitionSchema::DecodeMultiColumnHashValue(request->partition_key());
176
1.45M
      request->set_hash_code(hash_code);
177
1.45M
      request->set_max_hash_code(hash_code);
178
1.45M
    }
179
180
17.2k
  } else if (!has_paging_state) {
181
    // Full scan. Default to empty key.
182
17.0k
    request->clear_partition_key();
183
17.0k
  }
184
185
1.52M
  return Status::OK();
186
1.52M
}
187
188
CHECKED_STATUS SetRangePartitionBounds(const Schema& schema,
189
                                       const std::string& last_partition,
190
                                       PgsqlReadRequestPB* request,
191
362k
                                       std::string* key_upper_bound) {
192
362k
  vector<docdb::PrimitiveValue> range_components, range_components_end;
193
362k
  RETURN_NOT_OK(GetRangePartitionBounds(
194
362k
      schema, *request, &range_components, &range_components_end));
195
362k
  if (range_components.empty() && range_components_end.empty()) {
196
169k
    if (request->is_forward_scan()) {
197
169k
      request->clear_partition_key();
198
18.4E
    } else {
199
      // In case of backward scan process must be start from the last partition.
200
18.4E
      request->set_partition_key(last_partition);
201
18.4E
    }
202
169k
    key_upper_bound->clear();
203
169k
    return Status::OK();
204
169k
  }
205
192k
  auto upper_bound_key = docdb::DocKey(std::move(range_components_end)).Encode().ToStringBuffer();
206
192k
  if (request->is_forward_scan()) {
207
192k
    request->set_partition_key(
208
192k
         docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer());
209
192k
    *key_upper_bound = std::move(upper_bound_key);
210
18.4E
  } else {
211
    // Backward scan should go from upper bound to lower. But because DocDB can check upper bound
212
    // only it is not set here. Lower bound will be checked on client side in the
213
    // ReviewResponsePagingState function.
214
18.4E
    request->set_partition_key(std::move(upper_bound_key));
215
18.4E
    key_upper_bound->clear();
216
18.4E
  }
217
192k
  return Status::OK();
218
192k
}
219
220
CHECKED_STATUS InitRangePartitionKey(
221
382k
    const Schema& schema, const std::string& last_partition, PgsqlReadRequestPB* request) {
222
  // Set the range partition key.
223
382k
  const auto &ybctid = request->ybctid_column_value().value();
224
225
  // Seek a specific partition_key from read_request.
226
  // 1. Not specified range condition - Full scan.
227
  // 2. ybctid -- Given to fetch one specific row.
228
  // 3. paging_state -- Set by server to continue the same request.
229
  // 4. upper and lower bound -- Set by PgGate to fetch rows within a boundary.
230
  // 5. range column values -- Given to fetch rows for one set of specific range values.
231
  // 6. condition expr -- Given to fetch rows that satisfy specific conditions.
232
382k
  if (!IsNull(ybctid)) {
233
2.78k
    request->set_partition_key(ybctid.binary_value());
234
235
379k
  } else if (request->has_paging_state() &&
236
17.4k
             request->paging_state().has_next_partition_key()) {
237
    // If this is a subsequent query, use the partition key from the paging state.
238
17.4k
    request->set_partition_key(request->paging_state().next_partition_key());
239
240
362k
  } else if (request->has_lower_bound()) {
241
    // When PgGate optimizes RANGE expressions, it will set lower_bound and upper_bound by itself.
242
    // In that case, we use them without recompute them here.
243
    //
244
    // NOTE: Currently, PgGate uses this optimization ONLY for COUNT operator and backfill request.
245
    // It has not done any optimization on RANGE values yet.
246
0
    request->set_partition_key(request->lower_bound().key());
247
248
362k
  } else {
249
    // Evaluate condition to return partition_key and set the upper bound.
250
362k
    string max_key;
251
362k
    RETURN_NOT_OK(SetRangePartitionBounds(schema, last_partition, request, &max_key));
252
362k
    if (!max_key.empty()) {
253
192k
      request->mutable_upper_bound()->set_key(max_key);
254
192k
      request->mutable_upper_bound()->set_is_inclusive(true);
255
192k
    }
256
362k
  }
257
258
382k
  return Status::OK();
259
382k
}
260
261
Result<std::string> GetRangePartitionKey(
262
204k
    const Schema& schema, const google::protobuf::RepeatedPtrField<PgsqlExpressionPB>& range_cols) {
263
204k
  RSTATUS_DCHECK(!schema.num_hash_key_columns(), IllegalState,
264
204k
      "Cannot get range partition key for hash partitioned table");
265
266
204k
  auto range_components = VERIFY_RESULT(client::GetRangeComponents(schema, range_cols));
267
204k
  return docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer();
268
204k
}
269
270
} // namespace
271
272
//--------------------------------------------------------------------------------------------------
273
// YBOperation
274
//--------------------------------------------------------------------------------------------------
275
276
YBOperation::YBOperation(const shared_ptr<YBTable>& table)
277
11.0M
  : table_(table) {
278
11.0M
}
279
280
11.0M
YBOperation::~YBOperation() {}
281
282
104k
void YBOperation::SetTablet(const scoped_refptr<internal::RemoteTablet>& tablet) {
283
104k
  tablet_ = tablet;
284
104k
}
285
286
6.01k
void YBOperation::ResetTablet() {
287
6.01k
  tablet_.reset();
288
6.01k
}
289
290
0
void YBOperation::ResetTable(std::shared_ptr<YBTable> new_table) {
291
0
  table_.reset();
292
0
  table_ = new_table;
293
  // tablet_ can no longer be valid.
294
0
  tablet_.reset();
295
0
}
296
297
0
bool YBOperation::IsTransactional() const {
298
0
  return table_->schema().table_properties().is_transactional();
299
0
}
300
301
0
bool YBOperation::IsYsqlCatalogOp() const {
302
0
  return table_->schema().table_properties().is_ysql_catalog_table();
303
0
}
304
305
533
void YBOperation::MarkTablePartitionListAsStale() {
306
533
  table_->MarkPartitionsAsStale();
307
533
}
308
309
//--------------------------------------------------------------------------------------------------
310
// YBRedisOp
311
//--------------------------------------------------------------------------------------------------
312
313
YBRedisOp::YBRedisOp(const shared_ptr<YBTable>& table)
314
104k
    : YBOperation(table) {
315
104k
}
316
317
209k
RedisResponsePB* YBRedisOp::mutable_response() {
318
209k
  if (!redis_response_) {
319
104k
    redis_response_.reset(new RedisResponsePB());
320
104k
  }
321
209k
  return redis_response_.get();
322
209k
}
323
324
0
const RedisResponsePB& YBRedisOp::response() const {
325
0
  return *DCHECK_NOTNULL(redis_response_.get());
326
0
}
327
328
129k
OpGroup YBRedisReadOp::group() {
329
33.2k
  return FLAGS_redis_allow_reads_from_followers ? OpGroup::kConsistentPrefixRead
330
95.8k
                                                : OpGroup::kLeaderRead;
331
129k
}
332
333
// YBRedisWriteOp -----------------------------------------------------------------
334
335
YBRedisWriteOp::YBRedisWriteOp(const shared_ptr<YBTable>& table)
336
61.5k
    : YBRedisOp(table), redis_write_request_(new RedisWriteRequestPB()) {
337
61.5k
}
338
339
61.5k
size_t YBRedisWriteOp::space_used_by_request() const {
340
61.5k
  return redis_write_request_->ByteSizeLong();
341
61.5k
}
342
343
0
std::string YBRedisWriteOp::ToString() const {
344
0
  return "REDIS_WRITE " + redis_write_request_->key_value().key();
345
0
}
346
347
61.5k
void YBRedisWriteOp::SetHashCode(uint16_t hash_code) {
348
61.5k
  hash_code_ = hash_code;
349
61.5k
  redis_write_request_->mutable_key_value()->set_hash_code(hash_code);
350
61.5k
}
351
352
61.5k
const std::string& YBRedisWriteOp::GetKey() const {
353
61.5k
  return redis_write_request_->key_value().key();
354
61.5k
}
355
356
123k
Status YBRedisWriteOp::GetPartitionKey(std::string *partition_key) const {
357
123k
  const Slice& slice(redis_write_request_->key_value().key());
358
123k
  return table_->partition_schema().EncodeRedisKey(slice, partition_key);
359
123k
}
360
361
// YBRedisReadOp -----------------------------------------------------------------
362
363
YBRedisReadOp::YBRedisReadOp(const shared_ptr<YBTable>& table)
364
43.0k
    : YBRedisOp(table), redis_read_request_(new RedisReadRequestPB()) {
365
43.0k
}
366
367
42.7k
size_t YBRedisReadOp::space_used_by_request() const {
368
42.7k
  return redis_read_request_->SpaceUsedLong();
369
42.7k
}
370
371
0
std::string YBRedisReadOp::ToString() const {
372
0
  return "REDIS_READ " + redis_read_request_->key_value().key();
373
0
}
374
375
43.0k
void YBRedisReadOp::SetHashCode(uint16_t hash_code) {
376
43.0k
  hash_code_ = hash_code;
377
43.0k
  redis_read_request_->mutable_key_value()->set_hash_code(hash_code);
378
43.0k
}
379
380
42.7k
const std::string& YBRedisReadOp::GetKey() const {
381
42.7k
  return redis_read_request_->key_value().key();
382
42.7k
}
383
384
85.7k
Status YBRedisReadOp::GetPartitionKey(std::string *partition_key) const {
385
85.7k
  if (!redis_read_request_->key_value().has_key()) {
386
297
    *partition_key =
387
297
        PartitionSchema::EncodeMultiColumnHashValue(redis_read_request_->key_value().hash_code());
388
297
    return Status::OK();
389
297
  }
390
85.4k
  const Slice& slice(redis_read_request_->key_value().key());
391
85.4k
  return table_->partition_schema().EncodeRedisKey(slice, partition_key);
392
85.4k
}
393
394
//--------------------------------------------------------------------------------------------------
395
// YBCql Operators
396
// - These ops should be prefixed with YBCql instead of YBql.
397
// - The prefixes "ql" or "QL" are used for common entities of all languages and not just CQL.
398
// - The name will be clean up later.
399
//--------------------------------------------------------------------------------------------------
400
401
YBqlOp::YBqlOp(const shared_ptr<YBTable>& table)
402
6.90M
      : YBOperation(table) , ql_response_(new QLResponsePB()) {
403
6.90M
}
404
405
6.92M
YBqlOp::~YBqlOp() {
406
6.92M
}
407
408
175k
bool YBqlOp::succeeded() const {
409
175k
  return response().status() == QLResponsePB::YQL_STATUS_OK;
410
175k
}
411
412
// YBqlWriteOp -----------------------------------------------------------------
413
414
YBqlWriteOp::YBqlWriteOp(const shared_ptr<YBTable>& table)
415
3.02M
    : YBqlOp(table), ql_write_request_(new QLWriteRequestPB()) {
416
3.02M
}
417
418
3.02M
YBqlWriteOp::~YBqlWriteOp() {}
419
420
static std::unique_ptr<YBqlWriteOp> NewYBqlWriteOp(const shared_ptr<YBTable>& table,
421
1.29M
                                                   QLWriteRequestPB::QLStmtType stmt_type) {
422
1.29M
  auto op = std::unique_ptr<YBqlWriteOp>(new YBqlWriteOp(table));
423
1.29M
  QLWriteRequestPB* req = op->mutable_request();
424
1.29M
  req->set_type(stmt_type);
425
1.29M
  req->set_client(YQL_CLIENT_CQL);
426
  // TODO: Request ID should be filled with CQL stream ID. Query ID should be replaced too.
427
1.29M
  req->set_request_id(reinterpret_cast<uint64_t>(op.get()));
428
1.29M
  req->set_query_id(op->GetQueryId());
429
430
1.29M
  req->set_schema_version(table->schema().version());
431
1.29M
  req->set_is_compatible_with_previous_version(
432
1.29M
      table->schema().is_compatible_with_previous_version());
433
434
1.29M
  return op;
435
1.29M
}
436
437
1.29M
std::unique_ptr<YBqlWriteOp> YBqlWriteOp::NewInsert(const std::shared_ptr<YBTable>& table) {
438
1.29M
  return NewYBqlWriteOp(table, QLWriteRequestPB::QL_STMT_INSERT);
439
1.29M
}
440
441
3.44k
std::unique_ptr<YBqlWriteOp> YBqlWriteOp::NewUpdate(const std::shared_ptr<YBTable>& table) {
442
3.44k
  return NewYBqlWriteOp(table, QLWriteRequestPB::QL_STMT_UPDATE);
443
3.44k
}
444
445
1.11k
std::unique_ptr<YBqlWriteOp> YBqlWriteOp::NewDelete(const std::shared_ptr<YBTable>& table) {
446
1.11k
  return NewYBqlWriteOp(table, QLWriteRequestPB::QL_STMT_DELETE);
447
1.11k
}
448
449
5
std::string YBqlWriteOp::ToString() const {
450
5
  return "QL_WRITE " + ql_write_request_->ShortDebugString();
451
5
}
452
453
3.02M
Status YBqlWriteOp::GetPartitionKey(string* partition_key) const {
454
3.02M
  return table_->partition_schema().EncodeKey(ql_write_request_->hashed_column_values(),
455
3.02M
                                              partition_key);
456
3.02M
}
457
458
3.02M
void YBqlWriteOp::SetHashCode(const uint16_t hash_code) {
459
3.02M
  ql_write_request_->set_hash_code(hash_code);
460
3.02M
}
461
462
22.5k
uint16_t YBqlWriteOp::GetHashCode() const {
463
22.5k
  return ql_write_request_->hash_code();
464
22.5k
}
465
466
1.14M
bool YBqlWriteOp::ReadsStaticRow() const {
467
  // A QL write op reads the static row if it reads a static column, or it writes to the static row
468
  // and has a user-defined timestamp (which DocDB requires a read-modify-write by the timestamp).
469
1.14M
  return !ql_write_request_->column_refs().static_ids().empty() ||
470
1.14M
         (writes_static_row_ && ql_write_request_->has_user_timestamp_usec());
471
1.14M
}
472
473
1.14M
bool YBqlWriteOp::ReadsPrimaryRow() const {
474
  // A QL write op reads the primary row reads a non-static column, it writes to the primary row
475
  // and has a user-defined timestamp (which DocDB requires a read-modify-write by the timestamp),
476
  // or if there is an IF clause.
477
1.14M
  return !ql_write_request_->column_refs().ids().empty() ||
478
1.14M
         (writes_primary_row_ && ql_write_request_->has_user_timestamp_usec()) ||
479
1.14M
         ql_write_request_->has_if_expr();
480
1.14M
}
481
482
1.14M
bool YBqlWriteOp::WritesStaticRow() const {
483
1.14M
  return writes_static_row_;
484
1.14M
}
485
486
1.14M
bool YBqlWriteOp::WritesPrimaryRow() const {
487
1.14M
  return writes_primary_row_;
488
1.14M
}
489
490
0
bool YBqlWriteOp::returns_sidecar() {
491
0
  return ql_write_request_->has_if_expr() || ql_write_request_->returns_status();
492
0
}
493
494
// YBqlWriteOp::HashHash/Equal ---------------------------------------------------------------
495
1.28M
size_t YBqlWriteHashKeyComparator::operator()(const YBqlWriteOpPtr& op) const {
496
1.28M
  size_t hash = 0;
497
498
  // Hash the table id.
499
1.28M
  boost::hash_combine(hash, op->table()->id());
500
501
  // Hash the hash key.
502
1.28M
  string key;
503
1.35M
  for (const auto& value : op->request().hashed_column_values()) {
504
1.35M
    AppendToKey(value.value(), &key);
505
1.35M
  }
506
1.28M
  boost::hash_combine(hash, key);
507
508
1.28M
  return hash;
509
1.28M
}
510
511
bool YBqlWriteHashKeyComparator::operator()(const YBqlWriteOpPtr& op1,
512
180k
                                              const YBqlWriteOpPtr& op2) const {
513
  // Check if two write ops overlap that they apply to the same hash key in the same table.
514
180k
  if (op1->table() != op2->table() && op1->table()->id() != op2->table()->id()) {
515
272
    return false;
516
272
  }
517
180k
  const QLWriteRequestPB& req1 = op1->request();
518
180k
  const QLWriteRequestPB& req2 = op2->request();
519
180k
  if (req1.hashed_column_values_size() != req2.hashed_column_values_size()) {
520
0
    return false;
521
0
  }
522
350k
  for (int i = 0; i < req1.hashed_column_values().size(); i++) {
523
180k
    DCHECK(req1.hashed_column_values()[i].has_value());
524
180k
    DCHECK(req2.hashed_column_values()[i].has_value());
525
180k
    if (req1.hashed_column_values()[i].value() != req2.hashed_column_values()[i].value())
526
10.8k
      return false;
527
180k
  }
528
169k
  return true;
529
180k
}
530
531
// YBqlWriteOp::PrimaryHash/Equal ---------------------------------------------------------------
532
1.28M
size_t YBqlWritePrimaryKeyComparator::operator()(const YBqlWriteOpPtr& op) const {
533
1.28M
  size_t hash = YBqlWriteHashKeyComparator()(op);
534
535
  // Hash the range key also.
536
1.28M
  string key;
537
1.18M
  for (const auto& value : op->request().range_column_values()) {
538
1.18M
    AppendToKey(value.value(), &key);
539
1.18M
  }
540
1.28M
  boost::hash_combine(hash, key);
541
542
1.28M
  return hash;
543
1.28M
}
544
545
bool YBqlWritePrimaryKeyComparator::operator()(const YBqlWriteOpPtr& op1,
546
180k
                                                 const YBqlWriteOpPtr& op2) const {
547
180k
  if (!YBqlWriteHashKeyComparator()(op1, op2)) {
548
11.1k
    return false;
549
11.1k
  }
550
551
  // Check if two write ops overlap that they apply to the range key also.
552
169k
  const QLWriteRequestPB& req1 = op1->request();
553
169k
  const QLWriteRequestPB& req2 = op2->request();
554
169k
  if (req1.range_column_values_size() != req2.range_column_values_size()) {
555
0
    return false;
556
0
  }
557
169k
  for (int i = 0; i < req1.range_column_values().size(); i++) {
558
169k
    DCHECK(req1.range_column_values()[i].has_value());
559
169k
    DCHECK(req2.range_column_values()[i].has_value());
560
169k
    if (req1.range_column_values()[i].value() != req2.range_column_values()[i].value())
561
169k
      return false;
562
169k
  }
563
49
  return true;
564
169k
}
565
566
// YBqlReadOp -----------------------------------------------------------------
567
568
YBqlReadOp::YBqlReadOp(const shared_ptr<YBTable>& table)
569
    : YBqlOp(table),
570
      ql_read_request_(new QLReadRequestPB()),
571
3.87M
      yb_consistency_level_(YBConsistencyLevel::STRONG) {
572
3.87M
}
573
574
3.90M
YBqlReadOp::~YBqlReadOp() {}
575
576
11.8M
OpGroup YBqlReadOp::group() {
577
11.8M
  return yb_consistency_level_ == YBConsistencyLevel::CONSISTENT_PREFIX
578
11.8M
      ? OpGroup::kConsistentPrefixRead : OpGroup::kLeaderRead;
579
11.8M
}
580
581
3.88M
std::unique_ptr<YBqlReadOp> YBqlReadOp::NewSelect(const shared_ptr<YBTable>& table) {
582
3.88M
  std::unique_ptr<YBqlReadOp> op(new YBqlReadOp(table));
583
3.88M
  QLReadRequestPB *req = op->mutable_request();
584
3.88M
  req->set_client(YQL_CLIENT_CQL);
585
  // TODO: Request ID should be filled with CQL stream ID. Query ID should be replaced too.
586
3.88M
  req->set_request_id(reinterpret_cast<uint64_t>(op.get()));
587
3.88M
  req->set_query_id(op->GetQueryId());
588
589
3.88M
  req->set_schema_version(table->schema().version());
590
3.88M
  req->set_is_compatible_with_previous_version(
591
3.88M
      table->schema().is_compatible_with_previous_version());
592
593
3.88M
  return op;
594
3.88M
}
595
596
0
std::string YBqlReadOp::ToString() const {
597
0
  return "QL_READ " + ql_read_request_->DebugString();
598
0
}
599
600
3.75M
void YBqlReadOp::SetHashCode(const uint16_t hash_code) {
601
3.75M
  ql_read_request_->set_hash_code(hash_code);
602
3.75M
}
603
604
3.90M
Status YBqlReadOp::GetPartitionKey(string* partition_key) const {
605
3.90M
  if (!ql_read_request_->hashed_column_values().empty()) {
606
    // If hashed columns are set, use them to compute the exact key and set the bounds
607
3.71M
    RETURN_NOT_OK(table_->partition_schema().EncodeKey(ql_read_request_->hashed_column_values(),
608
3.71M
        partition_key));
609
610
    // TODO: If user specified token range doesn't contain the hash columns specified then the query
611
    // will have no effect. We need to implement an exit path rather than requesting the tablets.
612
    // For now, we set point query some value that is not equal to the hash to the hash columns
613
    // Which will return no result.
614
615
    // Make sure given key is not smaller than lower bound (if any)
616
3.71M
    if (ql_read_request_->has_hash_code()) {
617
21
      uint16 hash_code = static_cast<uint16>(ql_read_request_->hash_code());
618
21
      auto lower_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
619
21
      if (*partition_key < lower_bound) *partition_key = std::move(lower_bound);
620
21
    }
621
622
    // Make sure given key is not bigger than upper bound (if any)
623
3.71M
    if (ql_read_request_->has_max_hash_code()) {
624
11
      uint16 hash_code = static_cast<uint16>(ql_read_request_->max_hash_code());
625
11
      auto upper_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
626
11
      if (*partition_key > upper_bound) *partition_key = std::move(upper_bound);
627
11
    }
628
629
    // Set both bounds to equal partition key now, because this is a point get
630
3.71M
    ql_read_request_->set_hash_code(
631
3.71M
          PartitionSchema::DecodeMultiColumnHashValue(*partition_key));
632
3.71M
    ql_read_request_->set_max_hash_code(
633
3.71M
          PartitionSchema::DecodeMultiColumnHashValue(*partition_key));
634
191k
  } else {
635
    // Otherwise, set the partition key to the hash_code (lower bound of the token range).
636
191k
    if (ql_read_request_->has_hash_code()) {
637
26.9k
      uint16 hash_code = static_cast<uint16>(ql_read_request_->hash_code());
638
26.9k
      *partition_key = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
639
164k
    } else {
640
      // Default to empty key, this will start a scan from the beginning.
641
164k
      partition_key->clear();
642
164k
    }
643
191k
  }
644
645
  // If this is a continued query use the partition key from the paging state
646
  // If paging state is there, set hash_code = paging state. This is only supported for forward
647
  // scans.
648
3.90M
  if (ql_read_request_->has_paging_state() &&
649
34.5k
      ql_read_request_->paging_state().has_next_partition_key() &&
650
34.5k
      !ql_read_request_->paging_state().next_partition_key().empty()) {
651
34.1k
    *partition_key = ql_read_request_->paging_state().next_partition_key();
652
653
    // Check that the partition key we got from the paging state is within bounds.
654
34.1k
    uint16 paging_state_hash_code = PartitionSchema::DecodeMultiColumnHashValue(*partition_key);
655
34.1k
    if ((ql_read_request_->has_hash_code() &&
656
27.0k
            paging_state_hash_code < ql_read_request_->hash_code()) ||
657
34.1k
        (ql_read_request_->has_max_hash_code() &&
658
185
            paging_state_hash_code > ql_read_request_->max_hash_code())) {
659
0
      return STATUS_SUBSTITUTE(InternalError,
660
0
                               "Out of bounds partition key found in paging state:"
661
0
                               "Query's partition bounds: [$0, $1], paging state partition: $2",
662
0
                               ql_read_request_->hash_code(),
663
0
                               ql_read_request_->max_hash_code() ,
664
0
                               paging_state_hash_code);
665
0
    }
666
667
34.1k
    ql_read_request_->set_hash_code(paging_state_hash_code);
668
34.1k
  }
669
670
3.90M
  return Status::OK();
671
3.90M
}
672
673
std::vector<ColumnSchema> MakeColumnSchemasFromColDesc(
674
3.94M
  const google::protobuf::RepeatedPtrField<QLRSColDescPB>& rscol_descs) {
675
3.94M
  std::vector<ColumnSchema> column_schemas;
676
3.94M
  column_schemas.reserve(rscol_descs.size());
677
13.1M
  for (const auto& rscol_desc : rscol_descs) {
678
13.1M
    column_schemas.emplace_back(rscol_desc.name(), QLType::FromQLTypePB(rscol_desc.ql_type()));
679
13.1M
  }
680
3.94M
  return column_schemas;
681
3.94M
}
682
683
3.94M
std::vector<ColumnSchema> YBqlReadOp::MakeColumnSchemasFromRequest() const {
684
  // Tests don't have access to the QL internal statement object, so they have to use rsrow
685
  // descriptor from the read request.
686
3.94M
  return MakeColumnSchemasFromColDesc(request().rsrow_desc().rscol_descs());
687
3.94M
}
688
689
1.19k
Result<QLRowBlock> YBqlReadOp::MakeRowBlock() const {
690
1.19k
  Schema schema(MakeColumnSchemasFromRequest(), 0);
691
1.19k
  QLRowBlock result(schema);
692
1.19k
  Slice data(rows_data_);
693
1.19k
  if (!data.empty()) {
694
1.19k
    RETURN_NOT_OK(result.Deserialize(request().client(), &data));
695
1.19k
  }
696
1.19k
  return result;
697
1.19k
}
698
699
//--------------------------------------------------------------------------------------------------
700
// YBPgsql Operators
701
//--------------------------------------------------------------------------------------------------
702
703
YBPgsqlOp::YBPgsqlOp(const shared_ptr<YBTable>& table, std::string* partition_key)
704
      : YBOperation(table), response_(new PgsqlResponsePB()),
705
4.02M
        partition_key_(partition_key ? std::move(*partition_key) : std::string()) {
706
4.02M
}
707
708
4.02M
YBPgsqlOp::~YBPgsqlOp() {
709
4.02M
}
710
711
3.10M
bool YBPgsqlOp::succeeded() const {
712
3.10M
  return response().status() == PgsqlResponsePB::PGSQL_STATUS_OK;
713
3.10M
}
714
715
3.10M
bool YBPgsqlOp::applied() {
716
3.10M
  return succeeded() && !response_->skipped();
717
3.10M
}
718
719
namespace {
720
721
0
std::string ResponseSuffix(const PgsqlResponsePB& response) {
722
0
  const auto str = response.ShortDebugString();
723
0
  return str.empty() ? std::string() : (", response: " + str);
724
0
}
725
726
} // namespace
727
728
//--------------------------------------------------------------------------------------------------
729
// YBPgsqlWriteOp
730
731
YBPgsqlWriteOp::YBPgsqlWriteOp(const shared_ptr<YBTable>& table, PgsqlWriteRequestPB* request)
732
2.12M
    : YBPgsqlOp(table, request ? request->mutable_partition_key() : nullptr), request_(request) {
733
2.12M
  if (!request) {
734
135
    request_holder_ = std::make_unique<PgsqlWriteRequestPB>();
735
135
    request_ = request_holder_.get();
736
135
  }
737
2.12M
}
738
739
2.12M
YBPgsqlWriteOp::~YBPgsqlWriteOp() {}
740
741
static std::unique_ptr<YBPgsqlWriteOp> NewYBPgsqlWriteOp(
742
    const shared_ptr<YBTable>& table,
743
135
    PgsqlWriteRequestPB::PgsqlStmtType stmt_type) {
744
135
  auto op = std::make_unique<YBPgsqlWriteOp>(table);
745
135
  PgsqlWriteRequestPB *req = op->mutable_request();
746
135
  req->set_stmt_type(stmt_type);
747
135
  req->set_client(YQL_CLIENT_PGSQL);
748
135
  req->set_table_id(table->id());
749
135
  req->set_schema_version(table->schema().version());
750
135
  req->set_stmt_id(op->GetQueryId());
751
752
135
  return op;
753
135
}
754
755
54
std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewInsert(const std::shared_ptr<YBTable>& table) {
756
54
  return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_INSERT);
757
54
}
758
759
33
std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewUpdate(const std::shared_ptr<YBTable>& table) {
760
33
  return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_UPDATE);
761
33
}
762
763
48
std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewDelete(const std::shared_ptr<YBTable>& table) {
764
48
  return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_DELETE);
765
48
}
766
767
std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewTruncateColocated(
768
0
    const std::shared_ptr<YBTable>& table) {
769
0
  return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED);
770
0
}
771
772
0
std::string YBPgsqlWriteOp::ToString() const {
773
0
  return Format(
774
0
      "PGSQL_WRITE $0$1$2", request_->ShortDebugString(),
775
0
      (write_time_ ? " write_time: " + write_time_.ToString() : ""), ResponseSuffix(response()));
776
0
}
777
778
1.39M
void YBPgsqlWriteOp::SetHashCode(const uint16_t hash_code) {
779
1.39M
  request_->set_hash_code(hash_code);
780
1.39M
}
781
782
0
bool YBPgsqlWriteOp::IsTransactional() const {
783
0
  return !is_single_row_txn_ && table_->schema().table_properties().is_transactional();
784
0
}
785
786
2.12M
CHECKED_STATUS YBPgsqlWriteOp::GetPartitionKey(std::string* partition_key) const {
787
2.12M
  if (!request_holder_) {
788
2.12M
    return YBPgsqlOp::GetPartitionKey(partition_key);
789
2.12M
  }
790
156
  RETURN_NOT_OK(InitPartitionKey(table_->InternalSchema(), table_->partition_schema(), request_));
791
156
  *partition_key = std::move(*request_->mutable_partition_key());
792
156
  return Status::OK();
793
156
}
794
795
//--------------------------------------------------------------------------------------------------
796
// YBPgsqlReadOp
797
798
YBPgsqlReadOp::YBPgsqlReadOp(const shared_ptr<YBTable>& table, PgsqlReadRequestPB* request)
799
    : YBPgsqlOp(table, request ? request->mutable_partition_key() : nullptr),
800
      request_(request),
801
1.90M
      yb_consistency_level_(YBConsistencyLevel::STRONG) {
802
1.90M
  if (!request) {
803
69
    request_holder_ = std::make_unique<PgsqlReadRequestPB>();
804
69
    request_ = request_holder_.get();
805
69
  }
806
1.90M
}
807
808
69
std::unique_ptr<YBPgsqlReadOp> YBPgsqlReadOp::NewSelect(const shared_ptr<YBTable>& table) {
809
69
  std::unique_ptr<YBPgsqlReadOp> op(new YBPgsqlReadOp(table));
810
69
  PgsqlReadRequestPB *req = op->mutable_request();
811
69
  req->set_client(YQL_CLIENT_PGSQL);
812
69
  req->set_table_id(table->id());
813
69
  req->set_schema_version(table->schema().version());
814
69
  req->set_stmt_id(op->GetQueryId());
815
816
69
  return op;
817
69
}
818
819
0
std::unique_ptr<YBPgsqlReadOp> YBPgsqlReadOp::NewSample(const shared_ptr<YBTable>& table) {
820
0
  std::unique_ptr<YBPgsqlReadOp> op(new YBPgsqlReadOp(table));
821
0
  PgsqlReadRequestPB *req = op->mutable_request();
822
0
  req->set_client(YQL_CLIENT_PGSQL);
823
0
  req->set_table_id(table->id());
824
0
  req->set_schema_version(table->schema().version());
825
0
  req->set_stmt_id(op->GetQueryId());
826
827
0
  return op;
828
0
}
829
830
0
std::string YBPgsqlReadOp::ToString() const {
831
0
  return "PGSQL_READ " + request_->ShortDebugString() + ResponseSuffix(response());
832
0
}
833
834
1.49M
void YBPgsqlReadOp::SetHashCode(const uint16_t hash_code) {
835
1.49M
  request_->set_hash_code(hash_code);
836
1.49M
}
837
838
std::vector<ColumnSchema> YBPgsqlReadOp::MakeColumnSchemasFromColDesc(
839
0
  const google::protobuf::RepeatedPtrField<PgsqlRSColDescPB>& rscol_descs) {
840
0
  std::vector<ColumnSchema> column_schemas;
841
0
  column_schemas.reserve(rscol_descs.size());
842
0
  for (const auto& rscol_desc : rscol_descs) {
843
0
    column_schemas.emplace_back(rscol_desc.name(), QLType::FromQLTypePB(rscol_desc.ql_type()));
844
0
  }
845
0
  return column_schemas;
846
0
}
847
848
0
std::vector<ColumnSchema> YBPgsqlReadOp::MakeColumnSchemasFromRequest() const {
849
  // Tests don't have access to the QL internal statement object, so they have to use rsrow
850
  // descriptor from the read request.
851
0
  return MakeColumnSchemasFromColDesc(request().rsrow_desc().rscol_descs());
852
0
}
853
854
11.5M
OpGroup YBPgsqlReadOp::group() {
855
11.5M
  return yb_consistency_level_ == YBConsistencyLevel::CONSISTENT_PREFIX
856
11.5M
      ? OpGroup::kConsistentPrefixRead : OpGroup::kLeaderRead;
857
11.5M
}
858
859
949k
void YBPgsqlReadOp::SetUsedReadTime(const ReadHybridTime& used_time) {
860
949k
  used_read_time_ = used_time;
861
949k
}
862
863
1.89M
CHECKED_STATUS YBPgsqlReadOp::GetPartitionKey(std::string* partition_key) const {
864
1.89M
  if (!request_holder_) {
865
1.89M
    return YBPgsqlOp::GetPartitionKey(partition_key);
866
1.89M
  }
867
251
  RETURN_NOT_OK(InitPartitionKey(
868
251
      table_->InternalSchema(), table_->partition_schema(), table_->GetPartitionsShared()->back(),
869
251
      request_));
870
251
  *partition_key = std::move(*request_->mutable_partition_key());
871
251
  return Status::OK();
872
251
}
873
874
////////////////////////////////////////////////////////////
875
// YBNoOp
876
////////////////////////////////////////////////////////////
877
878
YBNoOp::YBNoOp(const std::shared_ptr<YBTable>& table)
879
0
  : table_(table) {
880
0
}
881
882
0
Status YBNoOp::Execute(YBClient* client, const YBPartialRow& key) {
883
0
  string encoded_key;
884
0
  RETURN_NOT_OK(table_->partition_schema().EncodeKey(key, &encoded_key));
885
0
  CoarseTimePoint deadline = CoarseMonoClock::Now() + 5s;
886
887
0
  tserver::NoOpRequestPB noop_req;
888
0
  tserver::NoOpResponsePB noop_resp;
889
890
0
  for (int attempt = 1; attempt < 11; attempt++) {
891
0
    Synchronizer sync;
892
0
    auto remote_ = VERIFY_RESULT(client->data_->meta_cache_->LookupTabletByKeyFuture(
893
0
        table_, encoded_key, deadline).get());
894
895
0
    internal::RemoteTabletServer *ts = nullptr;
896
0
    std::vector<internal::RemoteTabletServer*> candidates;
897
0
    std::set<string> blacklist;  // TODO: empty set for now.
898
0
    Status lookup_status = client->data_->GetTabletServer(
899
0
       client,
900
0
       remote_,
901
0
       YBClient::ReplicaSelection::LEADER_ONLY,
902
0
       blacklist,
903
0
       &candidates,
904
0
       &ts);
905
906
    // If we get ServiceUnavailable, this indicates that the tablet doesn't
907
    // currently have any known leader. We should sleep and retry, since
908
    // it's likely that the tablet is undergoing a leader election and will
909
    // soon have one.
910
0
    if (lookup_status.IsServiceUnavailable() && CoarseMonoClock::Now() < deadline) {
911
0
      const int sleep_ms = attempt * 100;
912
0
      VLOG(1) << "Tablet " << remote_->tablet_id() << " current unavailable: "
913
0
              << lookup_status.ToString() << ". Sleeping for " << sleep_ms << "ms "
914
0
              << "and retrying...";
915
0
      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
916
0
      continue;
917
0
    }
918
0
    RETURN_NOT_OK(lookup_status);
919
920
0
    auto now = CoarseMonoClock::Now();
921
0
    if (deadline < now) {
922
0
      return STATUS(TimedOut, "Op timed out, deadline expired");
923
0
    }
924
925
    // Recalculate the deadlines.
926
    // If we have other replicas beyond this one to try, then we'll use the default RPC timeout.
927
    // That gives us time to try other replicas later. Otherwise, use the full remaining deadline
928
    // for the user's call.
929
0
    CoarseTimePoint rpc_deadline;
930
0
    if (static_cast<int>(candidates.size()) - blacklist.size() > 1) {
931
0
      rpc_deadline = now + client->default_rpc_timeout();
932
0
      rpc_deadline = std::min(deadline, rpc_deadline);
933
0
    } else {
934
0
      rpc_deadline = deadline;
935
0
    }
936
937
0
    rpc::RpcController controller;
938
0
    controller.set_deadline(rpc_deadline);
939
940
0
    CHECK(ts->proxy());
941
0
    const Status rpc_status = ts->proxy()->NoOp(noop_req, &noop_resp, &controller);
942
0
    if (rpc_status.ok() && !noop_resp.has_error()) {
943
0
      break;
944
0
    }
945
946
0
    LOG(INFO) << rpc_status.CodeAsString();
947
0
    if (noop_resp.has_error()) {
948
0
      Status s = StatusFromPB(noop_resp.error().status());
949
0
      LOG(INFO) << rpc_status.CodeAsString();
950
0
    }
951
    /*
952
     * TODO: For now, we just try a few attempts and exit. Ideally, we should check for
953
     * errors that are retriable, and retry if so.
954
     * RETURN_NOT_OK(CanBeRetried(true, rpc_status, server_status, rpc_deadline, deadline,
955
     *                         candidates, blacklist));
956
     */
957
0
  }
958
959
0
  return Status::OK();
960
0
}
961
962
1.52M
bool YBPgsqlReadOp::should_add_intents(IsolationLevel isolation_level) {
963
1.52M
  return isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION ||
964
463k
         IsValidRowMarkType(GetRowMarkTypeFromPB(*request_));
965
1.52M
}
966
967
CHECKED_STATUS InitPartitionKey(
968
    const Schema& schema, const PartitionSchema& partition_schema,
969
1.90M
    const std::string& last_partition, PgsqlReadRequestPB* request) {
970
1.90M
  if (schema.num_hash_key_columns() > 0) {
971
1.52M
    return InitHashPartitionKey(schema, partition_schema, request);
972
1.52M
  }
973
974
382k
  return InitRangePartitionKey(schema, last_partition, request);
975
382k
}
976
977
CHECKED_STATUS InitPartitionKey(
978
2.13M
    const Schema& schema, const PartitionSchema& partition_schema, PgsqlWriteRequestPB* request) {
979
2.13M
  const auto& ybctid = request->ybctid_column_value().value();
980
2.13M
  if (schema.num_hash_key_columns() > 0) {
981
1.39M
    if (!IsNull(ybctid)) {
982
1.25M
      const uint16 hash_code = VERIFY_RESULT(docdb::DocKey::DecodeHash(ybctid.binary_value()));
983
1.25M
      request->set_hash_code(hash_code);
984
1.25M
      request->set_partition_key(PartitionSchema::EncodeMultiColumnHashValue(hash_code));
985
1.25M
      return Status::OK();
986
147k
    }
987
988
    // Computing the partition_key.
989
147k
    return partition_schema.EncodeKey(
990
147k
        request->partition_column_values(), request->mutable_partition_key());
991
731k
  } else {
992
    // Range partitioned table
993
731k
    if (!IsNull(ybctid)) {
994
526k
      request->set_partition_key(ybctid.binary_value());
995
526k
      return Status::OK();
996
526k
    }
997
998
    // Computing the range key.
999
205k
    request->set_partition_key(VERIFY_RESULT(GetRangePartitionKey(
1000
205k
        schema, request->range_column_values())));
1001
205k
    return Status::OK();
1002
205k
  }
1003
2.13M
}
1004
1005
Result<std::vector<docdb::PrimitiveValue>> GetRangeComponents(
1006
396k
    const Schema& schema, const google::protobuf::RepeatedPtrField<PgsqlExpressionPB>& range_cols) {
1007
396k
  int i = 0;
1008
396k
  auto num_range_key_columns = narrow_cast<int>(schema.num_range_key_columns());
1009
396k
  std::vector<docdb::PrimitiveValue> result;
1010
991k
  for (const auto& col_id : schema.column_ids()) {
1011
991k
    if (!schema.is_range_column(col_id)) {
1012
0
      continue;
1013
0
    }
1014
1015
991k
    const ColumnSchema& column_schema = VERIFY_RESULT(schema.column_by_id(col_id));
1016
991k
    if (i >= range_cols.size() || range_cols[i].value().value_case() == QLValuePB::VALUE_NOT_SET) {
1017
104k
      result.emplace_back(docdb::ValueType::kLowest);
1018
887k
    } else {
1019
887k
      result.push_back(docdb::PrimitiveValue::FromQLValuePB(
1020
887k
          range_cols[i].value(), column_schema.sorting_type()));
1021
887k
    }
1022
1023
991k
    if (++i == num_range_key_columns) {
1024
396k
      break;
1025
396k
    }
1026
991k
  }
1027
396k
  return result;
1028
396k
}
1029
1030
Status GetRangePartitionBounds(const Schema& schema,
1031
                               const PgsqlReadRequestPB& request,
1032
                               vector<docdb::PrimitiveValue>* lower_bound,
1033
362k
                               vector<docdb::PrimitiveValue>* upper_bound) {
1034
362k
  SCHECK(!schema.num_hash_key_columns(), IllegalState,
1035
362k
         "Cannot set range partition key for hash partitioned table");
1036
362k
  const auto& range_cols = request.range_column_values();
1037
362k
  const auto& condition_expr = request.condition_expr();
1038
362k
  if (condition_expr.has_condition() &&
1039
1.52k
      implicit_cast<size_t>(range_cols.size()) < schema.num_range_key_columns()) {
1040
1.52k
    auto prefixed_range_components = VERIFY_RESULT(docdb::InitKeyColumnPrimitiveValues(
1041
1.52k
        range_cols, schema, schema.num_hash_key_columns()));
1042
1.52k
    QLScanRange scan_range(schema, condition_expr.condition());
1043
1.52k
    *lower_bound = docdb::GetRangeKeyScanSpec(
1044
1.52k
        schema, &prefixed_range_components, &scan_range, true /* lower_bound */);
1045
1.52k
    *upper_bound = docdb::GetRangeKeyScanSpec(
1046
1.52k
        schema, &prefixed_range_components, &scan_range, false /* upper_bound */);
1047
360k
  } else if (!range_cols.empty()) {
1048
191k
    *lower_bound = VERIFY_RESULT(GetRangeComponents(schema, range_cols));
1049
191k
    *upper_bound = *lower_bound;
1050
191k
    upper_bound->emplace_back(docdb::ValueType::kHighest);
1051
191k
  }
1052
362k
  return Status::OK();
1053
362k
}
1054
1055
}  // namespace client
1056
}  // namespace yb