YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/yb_op.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/client/yb_op.h"
34
35
#include "yb/client/client.h"
36
#include "yb/client/client-internal.h"
37
#include "yb/client/meta_cache.h"
38
#include "yb/client/schema.h"
39
#include "yb/client/table.h"
40
41
#include "yb/common/ql_protocol.pb.h"
42
#include "yb/common/ql_rowblock.h"
43
#include "yb/common/ql_scanspec.h"
44
#include "yb/common/ql_type.h"
45
#include "yb/common/ql_value.h"
46
#include "yb/common/redis_protocol.pb.h"
47
#include "yb/common/row_mark.h"
48
#include "yb/common/schema.h"
49
#include "yb/common/wire_protocol.h"
50
#include "yb/common/wire_protocol.pb.h"
51
52
#include "yb/docdb/doc_key.h"
53
#include "yb/docdb/doc_scanspec_util.h"
54
#include "yb/docdb/primitive_value.h"
55
#include "yb/docdb/primitive_value_util.h"
56
#include "yb/rpc/rpc_controller.h"
57
58
#include "yb/tserver/tserver_service.proxy.h"
59
60
#include "yb/util/async_util.h"
61
#include "yb/util/flag_tags.h"
62
#include "yb/util/result.h"
63
#include "yb/util/status_format.h"
64
65
using namespace std::literals;
66
67
DEFINE_bool(redis_allow_reads_from_followers, false,
68
            "If true, the read will be served from the closest replica in the same AZ, which can "
69
            "be a follower.");
70
TAG_FLAG(redis_allow_reads_from_followers, evolving);
71
TAG_FLAG(redis_allow_reads_from_followers, runtime);
72
73
namespace yb {
74
namespace client {
75
76
using std::shared_ptr;
77
using std::unique_ptr;
78
79
namespace {
80
81
CHECKED_STATUS InitHashPartitionKey(
82
3.11M
    const Schema& schema, const PartitionSchema& partition_schema, PgsqlReadRequestPB* request) {
83
  // Read partition key from read request.
84
3.11M
  const auto &ybctid = request->ybctid_column_value().value();
85
86
  // Seek a specific partition_key from read_request.
87
  // 1. Not specified hash condition - Full scan.
88
  // 2. paging_state -- Set by server to continue current request.
89
  // 3. lower and upper bound -- Set by PgGate to query a specific set of hash values.
90
  // 4. hash column values -- Given to scan ONE SET of specfic hash values.
91
  // 5. range and regular condition - These are filter expression and will be processed by DocDB.
92
  //    Shouldn't we able to set RANGE boundary here?
93
94
  // If primary index lookup using ybctid requests are batched, there is a possibility that tablets
95
  // might get split after the batch of requests have been prepared. Hence, we need to execute the
96
  // prepared request in both tablet partitions. For this purpose, we use paging state to continue
97
  // executing the request in the second sub-partition after completing the first sub-partition.
98
  //
99
  // batched ybctids
100
  // In order to represent a single ybctid or a batch of ybctids, we leverage the lower bound and
101
  // upper bounds to set hash codes and max hash codes.
102
3.11M
  bool has_paging_state =
103
3.11M
      request->has_paging_state() && 
request->paging_state().has_next_partition_key()89.3k
;
104
3.11M
  if (has_paging_state) {
105
    // If this is a subsequent query, use the partition key from the paging state. This is only
106
    // supported for forward scan.
107
89.4k
    request->set_partition_key(request->paging_state().next_partition_key());
108
109
    // Check that the paging state hash_code is within [ hash_code, max_hash_code ] bounds.
110
89.4k
    if (schema.num_hash_key_columns() > 0 && 
!request->partition_key().empty()89.4k
) {
111
89.3k
      uint16 paging_state_hash_code = PartitionSchema::DecodeMultiColumnHashValue(
112
89.3k
          request->partition_key());
113
89.3k
      if ((request->has_hash_code() && 
paging_state_hash_code < request->hash_code()46.6k
) ||
114
89.3k
          
(89.3k
request->has_max_hash_code()89.3k
&&
paging_state_hash_code > request->max_hash_code()1.26k
)) {
115
0
        return STATUS_SUBSTITUTE(
116
0
            InternalError,
117
0
            "Out of bounds partition key found in paging state:"
118
0
            "Query's partition bounds: [$0, $1], paging state partition: $2",
119
0
            request->has_hash_code() ? request->hash_code() : 0,
120
0
            request->has_max_hash_code() ? request->max_hash_code() : 0,
121
0
            paging_state_hash_code);
122
0
      }
123
89.3k
      request->set_hash_code(paging_state_hash_code);
124
89.3k
    }
125
3.02M
  } else if (!IsNull(ybctid)) {
126
17.8k
    const auto hash_code = VERIFY_RESULT(docdb::DocKey::DecodeHash(ybctid.binary_value()));
127
0
    request->set_partition_key(PartitionSchema::EncodeMultiColumnHashValue(hash_code));
128
3.00M
  } else if (request->has_lower_bound() || 
request->has_upper_bound()2.99M
) {
129
    // If the read request does not provide a specific partition key, but it does provide scan
130
    // boundary, use the given boundary to setup the scan lower and upper bound.
131
21.5k
    if (request->has_lower_bound()) {
132
14.8k
      auto hash = PartitionSchema::DecodeMultiColumnHashValue(request->lower_bound().key());
133
14.8k
      if (!request->lower_bound().is_inclusive()) {
134
3
        ++hash;
135
3
      }
136
14.8k
      request->set_hash_code(hash);
137
138
      // Set partition key to lower bound.
139
14.8k
      request->set_partition_key(request->lower_bound().key());
140
14.8k
    }
141
21.5k
    if (request->has_upper_bound()) {
142
14.7k
      auto hash = PartitionSchema::DecodeMultiColumnHashValue(request->upper_bound().key());
143
14.7k
      if (!request->upper_bound().is_inclusive()) {
144
14.7k
        --hash;
145
14.7k
      }
146
14.7k
      request->set_max_hash_code(hash);
147
14.7k
    }
148
149
2.98M
  } else if (!request->partition_column_values().empty()) {
150
    // If hashed columns are set, use them to compute the exact key and set the bounds
151
2.92M
    RETURN_NOT_OK(partition_schema.EncodeKey(
152
2.92M
        request->partition_column_values(), request->mutable_partition_key()));
153
154
    // Make sure given key is not smaller than lower bound (if any)
155
2.92M
    if (request->has_hash_code()) {
156
0
      auto hash_code = static_cast<uint16>(request->hash_code());
157
0
      auto lower_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
158
0
      if (request->partition_key() < lower_bound) {
159
0
        request->set_partition_key(std::move(lower_bound));
160
0
      }
161
0
    }
162
163
    // Make sure given key is not bigger than upper bound (if any)
164
2.92M
    if (request->has_max_hash_code()) {
165
0
      auto hash_code = static_cast<uint16>(request->max_hash_code());
166
0
      auto upper_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
167
0
      if (request->partition_key() > upper_bound) {
168
0
        request->set_partition_key(std::move(upper_bound));
169
0
      }
170
0
    }
171
172
2.92M
    if (
!request->partition_key().empty()2.92M
) {
173
      // If one specifc partition_key is found, set both bounds to equal partition key now because
174
      // this is a point get.
175
2.92M
      auto hash_code = PartitionSchema::DecodeMultiColumnHashValue(request->partition_key());
176
2.92M
      request->set_hash_code(hash_code);
177
2.92M
      request->set_max_hash_code(hash_code);
178
2.92M
    }
179
180
2.92M
  } else 
if (61.8k
!has_paging_state61.8k
) {
181
    // Full scan. Default to empty key.
182
61.1k
    request->clear_partition_key();
183
61.1k
  }
184
185
3.11M
  return Status::OK();
186
3.11M
}
187
188
CHECKED_STATUS SetRangePartitionBounds(const Schema& schema,
189
                                       const std::string& last_partition,
190
                                       PgsqlReadRequestPB* request,
191
1.21M
                                       std::string* key_upper_bound) {
192
1.21M
  vector<docdb::PrimitiveValue> range_components, range_components_end;
193
1.21M
  RETURN_NOT_OK(GetRangePartitionBounds(
194
1.21M
      schema, *request, &range_components, &range_components_end));
195
1.21M
  if (range_components.empty() && 
range_components_end.empty()530k
) {
196
530k
    if (request->is_forward_scan()) {
197
530k
      request->clear_partition_key();
198
530k
    } else {
199
      // In case of backward scan process must be start from the last partition.
200
53
      request->set_partition_key(last_partition);
201
53
    }
202
530k
    key_upper_bound->clear();
203
530k
    return Status::OK();
204
530k
  }
205
682k
  auto upper_bound_key = docdb::DocKey(std::move(range_components_end)).Encode().ToStringBuffer();
206
682k
  if (request->is_forward_scan()) {
207
682k
    request->set_partition_key(
208
682k
         docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer());
209
682k
    *key_upper_bound = std::move(upper_bound_key);
210
682k
  } else {
211
    // Backward scan should go from upper bound to lower. But because DocDB can check upper bound
212
    // only it is not set here. Lower bound will be checked on client side in the
213
    // ReviewResponsePagingState function.
214
117
    request->set_partition_key(std::move(upper_bound_key));
215
117
    key_upper_bound->clear();
216
117
  }
217
682k
  return Status::OK();
218
1.21M
}
219
220
CHECKED_STATUS InitRangePartitionKey(
221
1.25M
    const Schema& schema, const std::string& last_partition, PgsqlReadRequestPB* request) {
222
  // Set the range partition key.
223
1.25M
  const auto &ybctid = request->ybctid_column_value().value();
224
225
  // Seek a specific partition_key from read_request.
226
  // 1. Not specified range condition - Full scan.
227
  // 2. ybctid -- Given to fetch one specific row.
228
  // 3. paging_state -- Set by server to continue the same request.
229
  // 4. upper and lower bound -- Set by PgGate to fetch rows within a boundary.
230
  // 5. range column values -- Given to fetch rows for one set of specific range values.
231
  // 6. condition expr -- Given to fetch rows that satisfy specific conditions.
232
1.25M
  if (!IsNull(ybctid)) {
233
7.76k
    request->set_partition_key(ybctid.binary_value());
234
235
1.24M
  } else if (request->has_paging_state() &&
236
1.24M
             
request->paging_state().has_next_partition_key()34.1k
) {
237
    // If this is a subsequent query, use the partition key from the paging state.
238
34.1k
    request->set_partition_key(request->paging_state().next_partition_key());
239
240
1.21M
  } else if (request->has_lower_bound()) {
241
    // When PgGate optimizes RANGE expressions, it will set lower_bound and upper_bound by itself.
242
    // In that case, we use them without recompute them here.
243
    //
244
    // NOTE: Currently, PgGate uses this optimization ONLY for COUNT operator and backfill request.
245
    // It has not done any optimization on RANGE values yet.
246
108
    request->set_partition_key(request->lower_bound().key());
247
248
1.21M
  } else {
249
    // Evaluate condition to return partition_key and set the upper bound.
250
1.21M
    string max_key;
251
1.21M
    RETURN_NOT_OK(SetRangePartitionBounds(schema, last_partition, request, &max_key));
252
1.21M
    if (!max_key.empty()) {
253
682k
      request->mutable_upper_bound()->set_key(max_key);
254
682k
      request->mutable_upper_bound()->set_is_inclusive(true);
255
682k
    }
256
1.21M
  }
257
258
1.25M
  return Status::OK();
259
1.25M
}
260
261
Result<std::string> GetRangePartitionKey(
262
831k
    const Schema& schema, const google::protobuf::RepeatedPtrField<PgsqlExpressionPB>& range_cols) {
263
831k
  RSTATUS_DCHECK(!schema.num_hash_key_columns(), IllegalState,
264
831k
      "Cannot get range partition key for hash partitioned table");
265
266
831k
  auto range_components = VERIFY_RESULT(client::GetRangeComponents(schema, range_cols, true));
267
0
  return docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer();
268
831k
}
269
270
} // namespace
271
272
//--------------------------------------------------------------------------------------------------
273
// YBOperation
274
//--------------------------------------------------------------------------------------------------
275
276
YBOperation::YBOperation(const shared_ptr<YBTable>& table)
277
23.2M
  : table_(table) {
278
23.2M
}
279
280
23.2M
YBOperation::~YBOperation() {}
281
282
207k
void YBOperation::SetTablet(const scoped_refptr<internal::RemoteTablet>& tablet) {
283
207k
  tablet_ = tablet;
284
207k
}
285
286
9.03k
void YBOperation::ResetTablet() {
287
9.03k
  tablet_.reset();
288
9.03k
}
289
290
0
void YBOperation::ResetTable(std::shared_ptr<YBTable> new_table) {
291
0
  table_.reset();
292
0
  table_ = new_table;
293
  // tablet_ can no longer be valid.
294
0
  tablet_.reset();
295
0
}
296
297
0
bool YBOperation::IsTransactional() const {
298
0
  return table_->schema().table_properties().is_transactional();
299
0
}
300
301
0
bool YBOperation::IsYsqlCatalogOp() const {
302
0
  return table_->schema().table_properties().is_ysql_catalog_table();
303
0
}
304
305
2.05k
void YBOperation::MarkTablePartitionListAsStale() {
306
2.05k
  table_->MarkPartitionsAsStale();
307
2.05k
}
308
309
//--------------------------------------------------------------------------------------------------
310
// YBRedisOp
311
//--------------------------------------------------------------------------------------------------
312
313
YBRedisOp::YBRedisOp(const shared_ptr<YBTable>& table)
314
208k
    : YBOperation(table) {
315
208k
}
316
317
416k
RedisResponsePB* YBRedisOp::mutable_response() {
318
416k
  if (!redis_response_) {
319
208k
    redis_response_.reset(new RedisResponsePB());
320
208k
  }
321
416k
  return redis_response_.get();
322
416k
}
323
324
6
const RedisResponsePB& YBRedisOp::response() const {
325
6
  return *DCHECK_NOTNULL(redis_response_.get());
326
6
}
327
328
254k
OpGroup YBRedisReadOp::group() {
329
254k
  return FLAGS_redis_allow_reads_from_followers ? 
OpGroup::kConsistentPrefixRead62.1k
330
254k
                                                : 
OpGroup::kLeaderRead192k
;
331
254k
}
332
333
// YBRedisWriteOp -----------------------------------------------------------------
334
335
YBRedisWriteOp::YBRedisWriteOp(const shared_ptr<YBTable>& table)
336
123k
    : YBRedisOp(table), redis_write_request_(new RedisWriteRequestPB()) {
337
123k
}
338
339
123k
size_t YBRedisWriteOp::space_used_by_request() const {
340
123k
  return redis_write_request_->ByteSizeLong();
341
123k
}
342
343
0
std::string YBRedisWriteOp::ToString() const {
344
0
  return "REDIS_WRITE " + redis_write_request_->key_value().key();
345
0
}
346
347
123k
void YBRedisWriteOp::SetHashCode(uint16_t hash_code) {
348
123k
  hash_code_ = hash_code;
349
123k
  redis_write_request_->mutable_key_value()->set_hash_code(hash_code);
350
123k
}
351
352
123k
const std::string& YBRedisWriteOp::GetKey() const {
353
123k
  return redis_write_request_->key_value().key();
354
123k
}
355
356
246k
Status YBRedisWriteOp::GetPartitionKey(std::string *partition_key) const {
357
246k
  const Slice& slice(redis_write_request_->key_value().key());
358
246k
  return table_->partition_schema().EncodeRedisKey(slice, partition_key);
359
246k
}
360
361
// YBRedisReadOp -----------------------------------------------------------------
362
363
YBRedisReadOp::YBRedisReadOp(const shared_ptr<YBTable>& table)
364
84.8k
    : YBRedisOp(table), redis_read_request_(new RedisReadRequestPB()) {
365
84.8k
}
366
367
84.1k
size_t YBRedisReadOp::space_used_by_request() const {
368
84.1k
  return redis_read_request_->SpaceUsedLong();
369
84.1k
}
370
371
0
std::string YBRedisReadOp::ToString() const {
372
0
  return "REDIS_READ " + redis_read_request_->key_value().key();
373
0
}
374
375
84.8k
void YBRedisReadOp::SetHashCode(uint16_t hash_code) {
376
84.8k
  hash_code_ = hash_code;
377
84.8k
  redis_read_request_->mutable_key_value()->set_hash_code(hash_code);
378
84.8k
}
379
380
84.1k
const std::string& YBRedisReadOp::GetKey() const {
381
84.1k
  return redis_read_request_->key_value().key();
382
84.1k
}
383
384
169k
Status YBRedisReadOp::GetPartitionKey(std::string *partition_key) const {
385
169k
  if (!redis_read_request_->key_value().has_key()) {
386
630
    *partition_key =
387
630
        PartitionSchema::EncodeMultiColumnHashValue(redis_read_request_->key_value().hash_code());
388
630
    return Status::OK();
389
630
  }
390
168k
  const Slice& slice(redis_read_request_->key_value().key());
391
168k
  return table_->partition_schema().EncodeRedisKey(slice, partition_key);
392
169k
}
393
394
//--------------------------------------------------------------------------------------------------
395
// YBCql Operators
396
// - These ops should be prefixed with YBCql instead of YBql.
397
// - The prefixes "ql" or "QL" are used for common entities of all languages and not just CQL.
398
// - The name will be clean up later.
399
//--------------------------------------------------------------------------------------------------
400
401
YBqlOp::YBqlOp(const shared_ptr<YBTable>& table)
402
11.4M
      : YBOperation(table) , ql_response_(new QLResponsePB()) {
403
11.4M
}
404
405
11.4M
YBqlOp::~YBqlOp() {
406
11.4M
}
407
408
142k
bool YBqlOp::succeeded() const {
409
142k
  return response().status() == QLResponsePB::YQL_STATUS_OK;
410
142k
}
411
412
// YBqlWriteOp -----------------------------------------------------------------
413
414
YBqlWriteOp::YBqlWriteOp(const shared_ptr<YBTable>& table)
415
3.98M
    : YBqlOp(table), ql_write_request_(new QLWriteRequestPB()) {
416
3.98M
}
417
418
3.98M
YBqlWriteOp::~YBqlWriteOp() {}
419
420
static std::unique_ptr<YBqlWriteOp> NewYBqlWriteOp(const shared_ptr<YBTable>& table,
421
2.03M
                                                   QLWriteRequestPB::QLStmtType stmt_type) {
422
2.03M
  auto op = std::unique_ptr<YBqlWriteOp>(new YBqlWriteOp(table));
423
2.03M
  QLWriteRequestPB* req = op->mutable_request();
424
2.03M
  req->set_type(stmt_type);
425
2.03M
  req->set_client(YQL_CLIENT_CQL);
426
  // TODO: Request ID should be filled with CQL stream ID. Query ID should be replaced too.
427
2.03M
  req->set_request_id(reinterpret_cast<uint64_t>(op.get()));
428
2.03M
  req->set_query_id(op->GetQueryId());
429
430
2.03M
  req->set_schema_version(table->schema().version());
431
2.03M
  req->set_is_compatible_with_previous_version(
432
2.03M
      table->schema().is_compatible_with_previous_version());
433
434
2.03M
  return op;
435
2.03M
}
436
437
2.02M
std::unique_ptr<YBqlWriteOp> YBqlWriteOp::NewInsert(const std::shared_ptr<YBTable>& table) {
438
2.02M
  return NewYBqlWriteOp(table, QLWriteRequestPB::QL_STMT_INSERT);
439
2.02M
}
440
441
3.46k
std::unique_ptr<YBqlWriteOp> YBqlWriteOp::NewUpdate(const std::shared_ptr<YBTable>& table) {
442
3.46k
  return NewYBqlWriteOp(table, QLWriteRequestPB::QL_STMT_UPDATE);
443
3.46k
}
444
445
1.16k
std::unique_ptr<YBqlWriteOp> YBqlWriteOp::NewDelete(const std::shared_ptr<YBTable>& table) {
446
1.16k
  return NewYBqlWriteOp(table, QLWriteRequestPB::QL_STMT_DELETE);
447
1.16k
}
448
449
5
std::string YBqlWriteOp::ToString() const {
450
5
  return "QL_WRITE " + ql_write_request_->ShortDebugString();
451
5
}
452
453
3.99M
Status YBqlWriteOp::GetPartitionKey(string* partition_key) const {
454
3.99M
  return table_->partition_schema().EncodeKey(ql_write_request_->hashed_column_values(),
455
3.99M
                                              partition_key);
456
3.99M
}
457
458
3.99M
void YBqlWriteOp::SetHashCode(const uint16_t hash_code) {
459
3.99M
  ql_write_request_->set_hash_code(hash_code);
460
3.99M
}
461
462
18.5k
uint16_t YBqlWriteOp::GetHashCode() const {
463
18.5k
  return ql_write_request_->hash_code();
464
18.5k
}
465
466
1.89M
bool YBqlWriteOp::ReadsStaticRow() const {
467
  // A QL write op reads the static row if it reads a static column, or it writes to the static row
468
  // and has a user-defined timestamp (which DocDB requires a read-modify-write by the timestamp).
469
1.89M
  return !ql_write_request_->column_refs().static_ids().empty() ||
470
1.89M
         (writes_static_row_ && 
ql_write_request_->has_user_timestamp_usec()226
);
471
1.89M
}
472
473
1.89M
bool YBqlWriteOp::ReadsPrimaryRow() const {
474
  // A QL write op reads the primary row reads a non-static column, it writes to the primary row
475
  // and has a user-defined timestamp (which DocDB requires a read-modify-write by the timestamp),
476
  // or if there is an IF clause.
477
1.89M
  return !ql_write_request_->column_refs().ids().empty() ||
478
1.89M
         
(1.89M
writes_primary_row_1.89M
&&
ql_write_request_->has_user_timestamp_usec()1.87M
) ||
479
1.89M
         
ql_write_request_->has_if_expr()1.89M
;
480
1.89M
}
481
482
1.89M
bool YBqlWriteOp::WritesStaticRow() const {
483
1.89M
  return writes_static_row_;
484
1.89M
}
485
486
1.89M
bool YBqlWriteOp::WritesPrimaryRow() const {
487
1.89M
  return writes_primary_row_;
488
1.89M
}
489
490
0
bool YBqlWriteOp::returns_sidecar() {
491
0
  return ql_write_request_->has_if_expr() || ql_write_request_->returns_status();
492
0
}
493
494
// YBqlWriteOp::HashHash/Equal ---------------------------------------------------------------
495
2.01M
size_t YBqlWriteHashKeyComparator::operator()(const YBqlWriteOpPtr& op) const {
496
2.01M
  size_t hash = 0;
497
498
  // Hash the table id.
499
2.01M
  boost::hash_combine(hash, op->table()->id());
500
501
  // Hash the hash key.
502
2.01M
  string key;
503
2.08M
  for (const auto& value : op->request().hashed_column_values()) {
504
2.08M
    AppendToKey(value.value(), &key);
505
2.08M
  }
506
2.01M
  boost::hash_combine(hash, key);
507
508
2.01M
  return hash;
509
2.01M
}
510
511
bool YBqlWriteHashKeyComparator::operator()(const YBqlWriteOpPtr& op1,
512
126k
                                              const YBqlWriteOpPtr& op2) const {
513
  // Check if two write ops overlap that they apply to the same hash key in the same table.
514
126k
  if (op1->table() != op2->table() && 
op1->table()->id() != op2->table()->id()216
) {
515
216
    return false;
516
216
  }
517
126k
  const QLWriteRequestPB& req1 = op1->request();
518
126k
  const QLWriteRequestPB& req2 = op2->request();
519
126k
  if (req1.hashed_column_values_size() != req2.hashed_column_values_size()) {
520
0
    return false;
521
0
  }
522
243k
  
for (int i = 0; 126k
i < req1.hashed_column_values().size();
i++116k
) {
523
127k
    DCHECK(req1.hashed_column_values()[i].has_value());
524
127k
    DCHECK(req2.hashed_column_values()[i].has_value());
525
127k
    if (req1.hashed_column_values()[i].value() != req2.hashed_column_values()[i].value())
526
10.7k
      return false;
527
127k
  }
528
115k
  return true;
529
126k
}
530
531
// YBqlWriteOp::PrimaryHash/Equal ---------------------------------------------------------------
532
2.01M
size_t YBqlWritePrimaryKeyComparator::operator()(const YBqlWriteOpPtr& op) const {
533
2.01M
  size_t hash = YBqlWriteHashKeyComparator()(op);
534
535
  // Hash the range key also.
536
2.01M
  string key;
537
2.01M
  for (const auto& value : op->request().range_column_values()) {
538
1.93M
    AppendToKey(value.value(), &key);
539
1.93M
  }
540
2.01M
  boost::hash_combine(hash, key);
541
542
2.01M
  return hash;
543
2.01M
}
544
545
bool YBqlWritePrimaryKeyComparator::operator()(const YBqlWriteOpPtr& op1,
546
126k
                                                 const YBqlWriteOpPtr& op2) const {
547
126k
  if (!YBqlWriteHashKeyComparator()(op1, op2)) {
548
10.9k
    return false;
549
10.9k
  }
550
551
  // Check if two write ops overlap that they apply to the range key also.
552
115k
  const QLWriteRequestPB& req1 = op1->request();
553
115k
  const QLWriteRequestPB& req2 = op2->request();
554
115k
  if (req1.range_column_values_size() != req2.range_column_values_size()) {
555
0
    return false;
556
0
  }
557
115k
  
for (int i = 0; 115k
i < req1.range_column_values().size();
i++132
) {
558
115k
    DCHECK(req1.range_column_values()[i].has_value());
559
115k
    DCHECK(req2.range_column_values()[i].has_value());
560
115k
    if (req1.range_column_values()[i].value() != req2.range_column_values()[i].value())
561
115k
      return false;
562
115k
  }
563
45
  return true;
564
115k
}
565
566
// YBqlReadOp -----------------------------------------------------------------
567
568
YBqlReadOp::YBqlReadOp(const shared_ptr<YBTable>& table)
569
    : YBqlOp(table),
570
      ql_read_request_(new QLReadRequestPB()),
571
7.45M
      yb_consistency_level_(YBConsistencyLevel::STRONG) {
572
7.45M
}
573
574
7.50M
YBqlReadOp::~YBqlReadOp() {}
575
576
22.6M
OpGroup YBqlReadOp::group() {
577
22.6M
  return yb_consistency_level_ == YBConsistencyLevel::CONSISTENT_PREFIX
578
22.6M
      ? 
OpGroup::kConsistentPrefixRead3.89k
:
OpGroup::kLeaderRead22.6M
;
579
22.6M
}
580
581
7.47M
std::unique_ptr<YBqlReadOp> YBqlReadOp::NewSelect(const shared_ptr<YBTable>& table) {
582
7.47M
  std::unique_ptr<YBqlReadOp> op(new YBqlReadOp(table));
583
7.47M
  QLReadRequestPB *req = op->mutable_request();
584
7.47M
  req->set_client(YQL_CLIENT_CQL);
585
  // TODO: Request ID should be filled with CQL stream ID. Query ID should be replaced too.
586
7.47M
  req->set_request_id(reinterpret_cast<uint64_t>(op.get()));
587
7.47M
  req->set_query_id(op->GetQueryId());
588
589
7.47M
  req->set_schema_version(table->schema().version());
590
7.47M
  req->set_is_compatible_with_previous_version(
591
7.47M
      table->schema().is_compatible_with_previous_version());
592
593
7.47M
  return op;
594
7.47M
}
595
596
0
std::string YBqlReadOp::ToString() const {
597
0
  return "QL_READ " + ql_read_request_->DebugString();
598
0
}
599
600
7.35M
void YBqlReadOp::SetHashCode(const uint16_t hash_code) {
601
7.35M
  ql_read_request_->set_hash_code(hash_code);
602
7.35M
}
603
604
7.49M
Status YBqlReadOp::GetPartitionKey(string* partition_key) const {
605
7.49M
  if (!ql_read_request_->hashed_column_values().empty()) {
606
    // If hashed columns are set, use them to compute the exact key and set the bounds
607
7.29M
    RETURN_NOT_OK(table_->partition_schema().EncodeKey(ql_read_request_->hashed_column_values(),
608
7.29M
        partition_key));
609
610
    // TODO: If user specified token range doesn't contain the hash columns specified then the query
611
    // will have no effect. We need to implement an exit path rather than requesting the tablets.
612
    // For now, we set point query some value that is not equal to the hash to the hash columns
613
    // Which will return no result.
614
615
    // Make sure given key is not smaller than lower bound (if any)
616
7.29M
    if (ql_read_request_->has_hash_code()) {
617
26
      uint16 hash_code = static_cast<uint16>(ql_read_request_->hash_code());
618
26
      auto lower_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
619
26
      if (*partition_key < lower_bound) 
*partition_key = std::move(lower_bound)6
;
620
26
    }
621
622
    // Make sure given key is not bigger than upper bound (if any)
623
7.29M
    if (ql_read_request_->has_max_hash_code()) {
624
16
      uint16 hash_code = static_cast<uint16>(ql_read_request_->max_hash_code());
625
16
      auto upper_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
626
16
      if (*partition_key > upper_bound) 
*partition_key = std::move(upper_bound)5
;
627
16
    }
628
629
    // Set both bounds to equal partition key now, because this is a point get
630
7.29M
    ql_read_request_->set_hash_code(
631
7.29M
          PartitionSchema::DecodeMultiColumnHashValue(*partition_key));
632
7.29M
    ql_read_request_->set_max_hash_code(
633
7.29M
          PartitionSchema::DecodeMultiColumnHashValue(*partition_key));
634
7.29M
  } else {
635
    // Otherwise, set the partition key to the hash_code (lower bound of the token range).
636
204k
    if (ql_read_request_->has_hash_code()) {
637
27.4k
      uint16 hash_code = static_cast<uint16>(ql_read_request_->hash_code());
638
27.4k
      *partition_key = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
639
176k
    } else {
640
      // Default to empty key, this will start a scan from the beginning.
641
176k
      partition_key->clear();
642
176k
    }
643
204k
  }
644
645
  // If this is a continued query use the partition key from the paging state
646
  // If paging state is there, set hash_code = paging state. This is only supported for forward
647
  // scans.
648
7.49M
  if (ql_read_request_->has_paging_state() &&
649
7.49M
      
ql_read_request_->paging_state().has_next_partition_key()35.0k
&&
650
7.49M
      
!ql_read_request_->paging_state().next_partition_key().empty()35.0k
) {
651
34.6k
    *partition_key = ql_read_request_->paging_state().next_partition_key();
652
653
    // Check that the partition key we got from the paging state is within bounds.
654
34.6k
    uint16 paging_state_hash_code = PartitionSchema::DecodeMultiColumnHashValue(*partition_key);
655
34.6k
    if ((ql_read_request_->has_hash_code() &&
656
34.6k
            
paging_state_hash_code < ql_read_request_->hash_code()27.4k
) ||
657
34.6k
        (ql_read_request_->has_max_hash_code() &&
658
34.6k
            
paging_state_hash_code > ql_read_request_->max_hash_code()185
)) {
659
0
      return STATUS_SUBSTITUTE(InternalError,
660
0
                               "Out of bounds partition key found in paging state:"
661
0
                               "Query's partition bounds: [$0, $1], paging state partition: $2",
662
0
                               ql_read_request_->hash_code(),
663
0
                               ql_read_request_->max_hash_code() ,
664
0
                               paging_state_hash_code);
665
0
    }
666
667
34.6k
    ql_read_request_->set_hash_code(paging_state_hash_code);
668
34.6k
  }
669
670
7.49M
  return Status::OK();
671
7.49M
}
672
673
std::vector<ColumnSchema> MakeColumnSchemasFromColDesc(
674
7.54M
  const google::protobuf::RepeatedPtrField<QLRSColDescPB>& rscol_descs) {
675
7.54M
  std::vector<ColumnSchema> column_schemas;
676
7.54M
  column_schemas.reserve(rscol_descs.size());
677
24.0M
  for (const auto& rscol_desc : rscol_descs) {
678
24.0M
    column_schemas.emplace_back(rscol_desc.name(), QLType::FromQLTypePB(rscol_desc.ql_type()));
679
24.0M
  }
680
7.54M
  return column_schemas;
681
7.54M
}
682
683
7.54M
std::vector<ColumnSchema> YBqlReadOp::MakeColumnSchemasFromRequest() const {
684
  // Tests don't have access to the QL internal statement object, so they have to use rsrow
685
  // descriptor from the read request.
686
7.54M
  return MakeColumnSchemasFromColDesc(request().rsrow_desc().rscol_descs());
687
7.54M
}
688
689
796
Result<QLRowBlock> YBqlReadOp::MakeRowBlock() const {
690
796
  Schema schema(MakeColumnSchemasFromRequest(), 0);
691
796
  QLRowBlock result(schema);
692
796
  Slice data(rows_data_);
693
796
  if (!data.empty()) {
694
796
    RETURN_NOT_OK(result.Deserialize(request().client(), &data));
695
796
  }
696
796
  return result;
697
796
}
698
699
//--------------------------------------------------------------------------------------------------
700
// YBPgsql Operators
701
//--------------------------------------------------------------------------------------------------
702
703
YBPgsqlOp::YBPgsqlOp(const shared_ptr<YBTable>& table, std::string* partition_key)
704
      : YBOperation(table), response_(new PgsqlResponsePB()),
705
11.5M
        partition_key_(partition_key ? std::move(*partition_key) : std::string()) {
706
11.5M
}
707
708
11.5M
YBPgsqlOp::~YBPgsqlOp() {
709
11.5M
}
710
711
9.03M
bool YBPgsqlOp::succeeded() const {
712
9.03M
  return response().status() == PgsqlResponsePB::PGSQL_STATUS_OK;
713
9.03M
}
714
715
9.03M
bool YBPgsqlOp::applied() {
716
9.03M
  return succeeded() && 
!response_->skipped()9.02M
;
717
9.03M
}
718
719
namespace {
720
721
0
std::string ResponseSuffix(const PgsqlResponsePB& response) {
722
0
  const auto str = response.ShortDebugString();
723
0
  return str.empty() ? std::string() : (", response: " + str);
724
0
}
725
726
} // namespace
727
728
//--------------------------------------------------------------------------------------------------
729
// YBPgsqlWriteOp
730
731
YBPgsqlWriteOp::YBPgsqlWriteOp(const shared_ptr<YBTable>& table, PgsqlWriteRequestPB* request)
732
7.21M
    : YBPgsqlOp(table, request ? request->mutable_partition_key() : nullptr), request_(request) {
733
7.21M
  if (!request) {
734
3.56k
    request_holder_ = std::make_unique<PgsqlWriteRequestPB>();
735
3.56k
    request_ = request_holder_.get();
736
3.56k
  }
737
7.21M
}
738
739
7.21M
YBPgsqlWriteOp::~YBPgsqlWriteOp() {}
740
741
static std::unique_ptr<YBPgsqlWriteOp> NewYBPgsqlWriteOp(
742
    const shared_ptr<YBTable>& table,
743
3.56k
    PgsqlWriteRequestPB::PgsqlStmtType stmt_type) {
744
3.56k
  auto op = std::make_unique<YBPgsqlWriteOp>(table);
745
3.56k
  PgsqlWriteRequestPB *req = op->mutable_request();
746
3.56k
  req->set_stmt_type(stmt_type);
747
3.56k
  req->set_client(YQL_CLIENT_PGSQL);
748
3.56k
  req->set_table_id(table->id());
749
3.56k
  req->set_schema_version(table->schema().version());
750
3.56k
  req->set_stmt_id(op->GetQueryId());
751
752
3.56k
  return op;
753
3.56k
}
754
755
295
std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewInsert(const std::shared_ptr<YBTable>& table) {
756
295
  return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_INSERT);
757
295
}
758
759
2.97k
std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewUpdate(const std::shared_ptr<YBTable>& table) {
760
2.97k
  return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_UPDATE);
761
2.97k
}
762
763
295
std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewDelete(const std::shared_ptr<YBTable>& table) {
764
295
  return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_DELETE);
765
295
}
766
767
std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewTruncateColocated(
768
0
    const std::shared_ptr<YBTable>& table) {
769
0
  return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED);
770
0
}
771
772
0
std::string YBPgsqlWriteOp::ToString() const {
773
0
  return Format(
774
0
      "PGSQL_WRITE $0$1$2", request_->ShortDebugString(),
775
0
      (write_time_ ? " write_time: " + write_time_.ToString() : ""), ResponseSuffix(response()));
776
0
}
777
778
5.61M
void YBPgsqlWriteOp::SetHashCode(const uint16_t hash_code) {
779
5.61M
  request_->set_hash_code(hash_code);
780
5.61M
}
781
782
0
bool YBPgsqlWriteOp::IsTransactional() const {
783
0
  return !is_single_row_txn_ && table_->schema().table_properties().is_transactional();
784
0
}
785
786
7.21M
CHECKED_STATUS YBPgsqlWriteOp::GetPartitionKey(std::string* partition_key) const {
787
7.21M
  if (!request_holder_) {
788
7.20M
    return YBPgsqlOp::GetPartitionKey(partition_key);
789
7.20M
  }
790
4.21k
  RETURN_NOT_OK(InitPartitionKey(table_->InternalSchema(), table_->partition_schema(), request_));
791
4.21k
  *partition_key = std::move(*request_->mutable_partition_key());
792
4.21k
  return Status::OK();
793
4.21k
}
794
795
//--------------------------------------------------------------------------------------------------
796
// YBPgsqlReadOp
797
798
YBPgsqlReadOp::YBPgsqlReadOp(const shared_ptr<YBTable>& table, PgsqlReadRequestPB* request)
799
    : YBPgsqlOp(table, request ? request->mutable_partition_key() : nullptr),
800
      request_(request),
801
4.36M
      yb_consistency_level_(YBConsistencyLevel::STRONG) {
802
4.36M
  if (!request) {
803
3.23k
    request_holder_ = std::make_unique<PgsqlReadRequestPB>();
804
3.23k
    request_ = request_holder_.get();
805
3.23k
  }
806
4.36M
}
807
808
3.23k
std::unique_ptr<YBPgsqlReadOp> YBPgsqlReadOp::NewSelect(const shared_ptr<YBTable>& table) {
809
3.23k
  std::unique_ptr<YBPgsqlReadOp> op(new YBPgsqlReadOp(table));
810
3.23k
  PgsqlReadRequestPB *req = op->mutable_request();
811
3.23k
  req->set_client(YQL_CLIENT_PGSQL);
812
3.23k
  req->set_table_id(table->id());
813
3.23k
  req->set_schema_version(table->schema().version());
814
3.23k
  req->set_stmt_id(op->GetQueryId());
815
816
3.23k
  return op;
817
3.23k
}
818
819
0
std::unique_ptr<YBPgsqlReadOp> YBPgsqlReadOp::NewSample(const shared_ptr<YBTable>& table) {
820
0
  std::unique_ptr<YBPgsqlReadOp> op(new YBPgsqlReadOp(table));
821
0
  PgsqlReadRequestPB *req = op->mutable_request();
822
0
  req->set_client(YQL_CLIENT_PGSQL);
823
0
  req->set_table_id(table->id());
824
0
  req->set_schema_version(table->schema().version());
825
0
  req->set_stmt_id(op->GetQueryId());
826
827
0
  return op;
828
0
}
829
830
0
std::string YBPgsqlReadOp::ToString() const {
831
0
  return "PGSQL_READ " + request_->ShortDebugString() + ResponseSuffix(response());
832
0
}
833
834
3.03M
void YBPgsqlReadOp::SetHashCode(const uint16_t hash_code) {
835
3.03M
  request_->set_hash_code(hash_code);
836
3.03M
}
837
838
std::vector<ColumnSchema> YBPgsqlReadOp::MakeColumnSchemasFromColDesc(
839
0
  const google::protobuf::RepeatedPtrField<PgsqlRSColDescPB>& rscol_descs) {
840
0
  std::vector<ColumnSchema> column_schemas;
841
0
  column_schemas.reserve(rscol_descs.size());
842
0
  for (const auto& rscol_desc : rscol_descs) {
843
0
    column_schemas.emplace_back(rscol_desc.name(), QLType::FromQLTypePB(rscol_desc.ql_type()));
844
0
  }
845
0
  return column_schemas;
846
0
}
847
848
0
std::vector<ColumnSchema> YBPgsqlReadOp::MakeColumnSchemasFromRequest() const {
849
  // Tests don't have access to the QL internal statement object, so they have to use rsrow
850
  // descriptor from the read request.
851
0
  return MakeColumnSchemasFromColDesc(request().rsrow_desc().rscol_descs());
852
0
}
853
854
25.1M
OpGroup YBPgsqlReadOp::group() {
855
25.1M
  return yb_consistency_level_ == YBConsistencyLevel::CONSISTENT_PREFIX
856
25.1M
      ? 
OpGroup::kConsistentPrefixRead269
:
OpGroup::kLeaderRead25.1M
;
857
25.1M
}
858
859
1.95M
void YBPgsqlReadOp::SetUsedReadTime(const ReadHybridTime& used_time) {
860
1.95M
  used_read_time_ = used_time;
861
1.95M
}
862
863
4.36M
CHECKED_STATUS YBPgsqlReadOp::GetPartitionKey(std::string* partition_key) const {
864
4.36M
  if (!request_holder_) {
865
4.35M
    return YBPgsqlOp::GetPartitionKey(partition_key);
866
4.35M
  }
867
4.77k
  RETURN_NOT_OK(InitPartitionKey(
868
4.77k
      table_->InternalSchema(), table_->partition_schema(), table_->GetPartitionsShared()->back(),
869
4.77k
      request_));
870
4.77k
  *partition_key = std::move(*request_->mutable_partition_key());
871
4.77k
  return Status::OK();
872
4.77k
}
873
874
////////////////////////////////////////////////////////////
875
// YBNoOp
876
////////////////////////////////////////////////////////////
877
878
YBNoOp::YBNoOp(const std::shared_ptr<YBTable>& table)
879
0
  : table_(table) {
880
0
}
881
882
0
Status YBNoOp::Execute(YBClient* client, const YBPartialRow& key) {
883
0
  string encoded_key;
884
0
  RETURN_NOT_OK(table_->partition_schema().EncodeKey(key, &encoded_key));
885
0
  CoarseTimePoint deadline = CoarseMonoClock::Now() + 5s;
886
887
0
  tserver::NoOpRequestPB noop_req;
888
0
  tserver::NoOpResponsePB noop_resp;
889
890
0
  for (int attempt = 1; attempt < 11; attempt++) {
891
0
    Synchronizer sync;
892
0
    auto remote_ = VERIFY_RESULT(client->data_->meta_cache_->LookupTabletByKeyFuture(
893
0
        table_, encoded_key, deadline).get());
894
895
0
    internal::RemoteTabletServer *ts = nullptr;
896
0
    std::vector<internal::RemoteTabletServer*> candidates;
897
0
    std::set<string> blacklist;  // TODO: empty set for now.
898
0
    Status lookup_status = client->data_->GetTabletServer(
899
0
       client,
900
0
       remote_,
901
0
       YBClient::ReplicaSelection::LEADER_ONLY,
902
0
       blacklist,
903
0
       &candidates,
904
0
       &ts);
905
906
    // If we get ServiceUnavailable, this indicates that the tablet doesn't
907
    // currently have any known leader. We should sleep and retry, since
908
    // it's likely that the tablet is undergoing a leader election and will
909
    // soon have one.
910
0
    if (lookup_status.IsServiceUnavailable() && CoarseMonoClock::Now() < deadline) {
911
0
      const int sleep_ms = attempt * 100;
912
0
      VLOG(1) << "Tablet " << remote_->tablet_id() << " current unavailable: "
913
0
              << lookup_status.ToString() << ". Sleeping for " << sleep_ms << "ms "
914
0
              << "and retrying...";
915
0
      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
916
0
      continue;
917
0
    }
918
0
    RETURN_NOT_OK(lookup_status);
919
920
0
    auto now = CoarseMonoClock::Now();
921
0
    if (deadline < now) {
922
0
      return STATUS(TimedOut, "Op timed out, deadline expired");
923
0
    }
924
925
    // Recalculate the deadlines.
926
    // If we have other replicas beyond this one to try, then we'll use the default RPC timeout.
927
    // That gives us time to try other replicas later. Otherwise, use the full remaining deadline
928
    // for the user's call.
929
0
    CoarseTimePoint rpc_deadline;
930
0
    if (static_cast<int>(candidates.size()) - blacklist.size() > 1) {
931
0
      rpc_deadline = now + client->default_rpc_timeout();
932
0
      rpc_deadline = std::min(deadline, rpc_deadline);
933
0
    } else {
934
0
      rpc_deadline = deadline;
935
0
    }
936
937
0
    rpc::RpcController controller;
938
0
    controller.set_deadline(rpc_deadline);
939
940
0
    CHECK(ts->proxy());
941
0
    const Status rpc_status = ts->proxy()->NoOp(noop_req, &noop_resp, &controller);
942
0
    if (rpc_status.ok() && !noop_resp.has_error()) {
943
0
      break;
944
0
    }
945
946
0
    LOG(INFO) << rpc_status.CodeAsString();
947
0
    if (noop_resp.has_error()) {
948
0
      Status s = StatusFromPB(noop_resp.error().status());
949
0
      LOG(INFO) << rpc_status.CodeAsString();
950
0
    }
951
    /*
952
     * TODO: For now, we just try a few attempts and exit. Ideally, we should check for
953
     * errors that are retriable, and retry if so.
954
     * RETURN_NOT_OK(CanBeRetried(true, rpc_status, server_status, rpc_deadline, deadline,
955
     *                         candidates, blacklist));
956
     */
957
0
  }
958
959
0
  return Status::OK();
960
0
}
961
962
3.36M
bool YBPgsqlReadOp::should_add_intents(IsolationLevel isolation_level) {
963
3.36M
  return isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION ||
964
3.36M
         
IsValidRowMarkType(GetRowMarkTypeFromPB(*request_))1.18M
;
965
3.36M
}
966
967
CHECKED_STATUS InitPartitionKey(
968
    const Schema& schema, const PartitionSchema& partition_schema,
969
4.37M
    const std::string& last_partition, PgsqlReadRequestPB* request) {
970
4.37M
  if (schema.num_hash_key_columns() > 0) {
971
3.11M
    return InitHashPartitionKey(schema, partition_schema, request);
972
3.11M
  }
973
974
1.25M
  return InitRangePartitionKey(schema, last_partition, request);
975
4.37M
}
976
977
CHECKED_STATUS InitPartitionKey(
978
7.21M
    const Schema& schema, const PartitionSchema& partition_schema, PgsqlWriteRequestPB* request) {
979
7.21M
  const auto& ybctid = request->ybctid_column_value().value();
980
7.21M
  if (schema.num_hash_key_columns() > 0) {
981
5.61M
    if (!IsNull(ybctid)) {
982
5.09M
      const uint16 hash_code = VERIFY_RESULT(docdb::DocKey::DecodeHash(ybctid.binary_value()));
983
0
      request->set_hash_code(hash_code);
984
5.09M
      request->set_partition_key(PartitionSchema::EncodeMultiColumnHashValue(hash_code));
985
5.09M
      return Status::OK();
986
5.09M
    }
987
988
    // Computing the partition_key.
989
515k
    return partition_schema.EncodeKey(
990
515k
        request->partition_column_values(), request->mutable_partition_key());
991
5.61M
  } else {
992
    // Range partitioned table
993
1.59M
    if (!IsNull(ybctid)) {
994
764k
      request->set_partition_key(ybctid.binary_value());
995
764k
      return Status::OK();
996
764k
    }
997
998
    // Computing the range key.
999
833k
    request->set_partition_key(VERIFY_RESULT(GetRangePartitionKey(
1000
833k
        schema, request->range_column_values())));
1001
0
    return Status::OK();
1002
833k
  }
1003
7.21M
}
1004
1005
Result<std::vector<docdb::PrimitiveValue>> GetRangeComponents(
1006
    const Schema& schema,
1007
    const google::protobuf::RepeatedPtrField<PgsqlExpressionPB>& range_cols,
1008
2.18M
    bool lower_bound) {
1009
2.18M
  int i = 0;
1010
2.18M
  auto num_range_key_columns = narrow_cast<int>(schema.num_range_key_columns());
1011
2.18M
  std::vector<docdb::PrimitiveValue> result;
1012
4.64M
  for (const auto& col_id : schema.column_ids()) {
1013
4.64M
    if (!schema.is_range_column(col_id)) {
1014
0
      continue;
1015
0
    }
1016
1017
4.64M
    const ColumnSchema& column_schema = VERIFY_RESULT(schema.column_by_id(col_id));
1018
4.64M
    if (i >= range_cols.size() || 
range_cols[i].value().value_case() == QLValuePB::VALUE_NOT_SET4.29M
) {
1019
539k
      if (lower_bound) {
1020
360k
        result.emplace_back(docdb::ValueType::kLowest);
1021
360k
      } else {
1022
178k
        result.emplace_back(docdb::ValueType::kHighest);
1023
178k
      }
1024
4.11M
    } else {
1025
4.11M
      result.push_back(docdb::PrimitiveValue::FromQLValuePB(
1026
4.11M
          range_cols[i].value(), column_schema.sorting_type()));
1027
4.11M
    }
1028
1029
4.64M
    if (++i == num_range_key_columns) {
1030
2.18M
      break;
1031
2.18M
    }
1032
1033
2.46M
    if (!lower_bound) {
1034
340k
      result.emplace_back(docdb::ValueType::kHighest);
1035
340k
    }
1036
2.46M
  }
1037
2.18M
  return result;
1038
2.18M
}
1039
1040
Status GetRangePartitionBounds(const Schema& schema,
1041
                               const PgsqlReadRequestPB& request,
1042
                               vector<docdb::PrimitiveValue>* lower_bound,
1043
1.21M
                               vector<docdb::PrimitiveValue>* upper_bound) {
1044
1.21M
  SCHECK(!schema.num_hash_key_columns(), IllegalState,
1045
1.21M
         "Cannot set range partition key for hash partitioned table");
1046
1.21M
  const auto& range_cols = request.range_column_values();
1047
1.21M
  const auto& condition_expr = request.condition_expr();
1048
1.21M
  if (condition_expr.has_condition() &&
1049
1.21M
      
implicit_cast<size_t>(range_cols.size()) < schema.num_range_key_columns()4.99k
) {
1050
4.99k
    auto prefixed_range_components = VERIFY_RESULT(docdb::InitKeyColumnPrimitiveValues(
1051
4.99k
        range_cols, schema, schema.num_hash_key_columns()));
1052
0
    QLScanRange scan_range(schema, condition_expr.condition());
1053
4.99k
    *lower_bound = docdb::GetRangeKeyScanSpec(
1054
4.99k
        schema, &prefixed_range_components, &scan_range, true /* lower_bound */);
1055
4.99k
    *upper_bound = docdb::GetRangeKeyScanSpec(
1056
4.99k
        schema, &prefixed_range_components, &scan_range, false /* upper_bound */);
1057
1.20M
  } else if (!range_cols.empty()) {
1058
677k
    *lower_bound = VERIFY_RESULT(GetRangeComponents(schema, range_cols, true));
1059
677k
    *upper_bound = VERIFY_RESULT(GetRangeComponents(schema, range_cols, false));
1060
677k
  }
1061
1.21M
  return Status::OK();
1062
1.21M
}
1063
1064
}  // namespace client
1065
}  // namespace yb