/Users/deen/code/yugabyte-db/src/yb/client/yb_op.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/client/yb_op.h" |
34 | | |
35 | | #include "yb/client/client.h" |
36 | | #include "yb/client/client-internal.h" |
37 | | #include "yb/client/meta_cache.h" |
38 | | #include "yb/client/schema.h" |
39 | | #include "yb/client/table.h" |
40 | | |
41 | | #include "yb/common/ql_protocol.pb.h" |
42 | | #include "yb/common/ql_rowblock.h" |
43 | | #include "yb/common/ql_scanspec.h" |
44 | | #include "yb/common/ql_type.h" |
45 | | #include "yb/common/ql_value.h" |
46 | | #include "yb/common/redis_protocol.pb.h" |
47 | | #include "yb/common/row_mark.h" |
48 | | #include "yb/common/schema.h" |
49 | | #include "yb/common/wire_protocol.h" |
50 | | #include "yb/common/wire_protocol.pb.h" |
51 | | |
52 | | #include "yb/docdb/doc_key.h" |
53 | | #include "yb/docdb/doc_scanspec_util.h" |
54 | | #include "yb/docdb/primitive_value.h" |
55 | | #include "yb/docdb/primitive_value_util.h" |
56 | | #include "yb/rpc/rpc_controller.h" |
57 | | |
58 | | #include "yb/tserver/tserver_service.proxy.h" |
59 | | |
60 | | #include "yb/util/async_util.h" |
61 | | #include "yb/util/flag_tags.h" |
62 | | #include "yb/util/result.h" |
63 | | #include "yb/util/status_format.h" |
64 | | |
65 | | using namespace std::literals; |
66 | | |
67 | | DEFINE_bool(redis_allow_reads_from_followers, false, |
68 | | "If true, the read will be served from the closest replica in the same AZ, which can " |
69 | | "be a follower."); |
70 | | TAG_FLAG(redis_allow_reads_from_followers, evolving); |
71 | | TAG_FLAG(redis_allow_reads_from_followers, runtime); |
72 | | |
73 | | namespace yb { |
74 | | namespace client { |
75 | | |
76 | | using std::shared_ptr; |
77 | | using std::unique_ptr; |
78 | | |
79 | | namespace { |
80 | | |
81 | | CHECKED_STATUS InitHashPartitionKey( |
82 | 3.11M | const Schema& schema, const PartitionSchema& partition_schema, PgsqlReadRequestPB* request) { |
83 | | // Read partition key from read request. |
84 | 3.11M | const auto &ybctid = request->ybctid_column_value().value(); |
85 | | |
86 | | // Seek a specific partition_key from read_request. |
87 | | // 1. Not specified hash condition - Full scan. |
88 | | // 2. paging_state -- Set by server to continue current request. |
89 | | // 3. lower and upper bound -- Set by PgGate to query a specific set of hash values. |
90 | | // 4. hash column values -- Given to scan ONE SET of specfic hash values. |
91 | | // 5. range and regular condition - These are filter expression and will be processed by DocDB. |
92 | | // Shouldn't we able to set RANGE boundary here? |
93 | | |
94 | | // If primary index lookup using ybctid requests are batched, there is a possibility that tablets |
95 | | // might get split after the batch of requests have been prepared. Hence, we need to execute the |
96 | | // prepared request in both tablet partitions. For this purpose, we use paging state to continue |
97 | | // executing the request in the second sub-partition after completing the first sub-partition. |
98 | | // |
99 | | // batched ybctids |
100 | | // In order to represent a single ybctid or a batch of ybctids, we leverage the lower bound and |
101 | | // upper bounds to set hash codes and max hash codes. |
102 | 3.11M | bool has_paging_state = |
103 | 3.11M | request->has_paging_state() && request->paging_state().has_next_partition_key()89.3k ; |
104 | 3.11M | if (has_paging_state) { |
105 | | // If this is a subsequent query, use the partition key from the paging state. This is only |
106 | | // supported for forward scan. |
107 | 89.4k | request->set_partition_key(request->paging_state().next_partition_key()); |
108 | | |
109 | | // Check that the paging state hash_code is within [ hash_code, max_hash_code ] bounds. |
110 | 89.4k | if (schema.num_hash_key_columns() > 0 && !request->partition_key().empty()89.4k ) { |
111 | 89.3k | uint16 paging_state_hash_code = PartitionSchema::DecodeMultiColumnHashValue( |
112 | 89.3k | request->partition_key()); |
113 | 89.3k | if ((request->has_hash_code() && paging_state_hash_code < request->hash_code()46.6k ) || |
114 | 89.3k | (89.3k request->has_max_hash_code()89.3k && paging_state_hash_code > request->max_hash_code()1.26k )) { |
115 | 0 | return STATUS_SUBSTITUTE( |
116 | 0 | InternalError, |
117 | 0 | "Out of bounds partition key found in paging state:" |
118 | 0 | "Query's partition bounds: [$0, $1], paging state partition: $2", |
119 | 0 | request->has_hash_code() ? request->hash_code() : 0, |
120 | 0 | request->has_max_hash_code() ? request->max_hash_code() : 0, |
121 | 0 | paging_state_hash_code); |
122 | 0 | } |
123 | 89.3k | request->set_hash_code(paging_state_hash_code); |
124 | 89.3k | } |
125 | 3.02M | } else if (!IsNull(ybctid)) { |
126 | 17.8k | const auto hash_code = VERIFY_RESULT(docdb::DocKey::DecodeHash(ybctid.binary_value())); |
127 | 0 | request->set_partition_key(PartitionSchema::EncodeMultiColumnHashValue(hash_code)); |
128 | 3.00M | } else if (request->has_lower_bound() || request->has_upper_bound()2.99M ) { |
129 | | // If the read request does not provide a specific partition key, but it does provide scan |
130 | | // boundary, use the given boundary to setup the scan lower and upper bound. |
131 | 21.5k | if (request->has_lower_bound()) { |
132 | 14.8k | auto hash = PartitionSchema::DecodeMultiColumnHashValue(request->lower_bound().key()); |
133 | 14.8k | if (!request->lower_bound().is_inclusive()) { |
134 | 3 | ++hash; |
135 | 3 | } |
136 | 14.8k | request->set_hash_code(hash); |
137 | | |
138 | | // Set partition key to lower bound. |
139 | 14.8k | request->set_partition_key(request->lower_bound().key()); |
140 | 14.8k | } |
141 | 21.5k | if (request->has_upper_bound()) { |
142 | 14.7k | auto hash = PartitionSchema::DecodeMultiColumnHashValue(request->upper_bound().key()); |
143 | 14.7k | if (!request->upper_bound().is_inclusive()) { |
144 | 14.7k | --hash; |
145 | 14.7k | } |
146 | 14.7k | request->set_max_hash_code(hash); |
147 | 14.7k | } |
148 | | |
149 | 2.98M | } else if (!request->partition_column_values().empty()) { |
150 | | // If hashed columns are set, use them to compute the exact key and set the bounds |
151 | 2.92M | RETURN_NOT_OK(partition_schema.EncodeKey( |
152 | 2.92M | request->partition_column_values(), request->mutable_partition_key())); |
153 | | |
154 | | // Make sure given key is not smaller than lower bound (if any) |
155 | 2.92M | if (request->has_hash_code()) { |
156 | 0 | auto hash_code = static_cast<uint16>(request->hash_code()); |
157 | 0 | auto lower_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code); |
158 | 0 | if (request->partition_key() < lower_bound) { |
159 | 0 | request->set_partition_key(std::move(lower_bound)); |
160 | 0 | } |
161 | 0 | } |
162 | | |
163 | | // Make sure given key is not bigger than upper bound (if any) |
164 | 2.92M | if (request->has_max_hash_code()) { |
165 | 0 | auto hash_code = static_cast<uint16>(request->max_hash_code()); |
166 | 0 | auto upper_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code); |
167 | 0 | if (request->partition_key() > upper_bound) { |
168 | 0 | request->set_partition_key(std::move(upper_bound)); |
169 | 0 | } |
170 | 0 | } |
171 | | |
172 | 2.92M | if (!request->partition_key().empty()2.92M ) { |
173 | | // If one specifc partition_key is found, set both bounds to equal partition key now because |
174 | | // this is a point get. |
175 | 2.92M | auto hash_code = PartitionSchema::DecodeMultiColumnHashValue(request->partition_key()); |
176 | 2.92M | request->set_hash_code(hash_code); |
177 | 2.92M | request->set_max_hash_code(hash_code); |
178 | 2.92M | } |
179 | | |
180 | 2.92M | } else if (61.8k !has_paging_state61.8k ) { |
181 | | // Full scan. Default to empty key. |
182 | 61.1k | request->clear_partition_key(); |
183 | 61.1k | } |
184 | | |
185 | 3.11M | return Status::OK(); |
186 | 3.11M | } |
187 | | |
188 | | CHECKED_STATUS SetRangePartitionBounds(const Schema& schema, |
189 | | const std::string& last_partition, |
190 | | PgsqlReadRequestPB* request, |
191 | 1.21M | std::string* key_upper_bound) { |
192 | 1.21M | vector<docdb::PrimitiveValue> range_components, range_components_end; |
193 | 1.21M | RETURN_NOT_OK(GetRangePartitionBounds( |
194 | 1.21M | schema, *request, &range_components, &range_components_end)); |
195 | 1.21M | if (range_components.empty() && range_components_end.empty()530k ) { |
196 | 530k | if (request->is_forward_scan()) { |
197 | 530k | request->clear_partition_key(); |
198 | 530k | } else { |
199 | | // In case of backward scan process must be start from the last partition. |
200 | 53 | request->set_partition_key(last_partition); |
201 | 53 | } |
202 | 530k | key_upper_bound->clear(); |
203 | 530k | return Status::OK(); |
204 | 530k | } |
205 | 682k | auto upper_bound_key = docdb::DocKey(std::move(range_components_end)).Encode().ToStringBuffer(); |
206 | 682k | if (request->is_forward_scan()) { |
207 | 682k | request->set_partition_key( |
208 | 682k | docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer()); |
209 | 682k | *key_upper_bound = std::move(upper_bound_key); |
210 | 682k | } else { |
211 | | // Backward scan should go from upper bound to lower. But because DocDB can check upper bound |
212 | | // only it is not set here. Lower bound will be checked on client side in the |
213 | | // ReviewResponsePagingState function. |
214 | 117 | request->set_partition_key(std::move(upper_bound_key)); |
215 | 117 | key_upper_bound->clear(); |
216 | 117 | } |
217 | 682k | return Status::OK(); |
218 | 1.21M | } |
219 | | |
220 | | CHECKED_STATUS InitRangePartitionKey( |
221 | 1.25M | const Schema& schema, const std::string& last_partition, PgsqlReadRequestPB* request) { |
222 | | // Set the range partition key. |
223 | 1.25M | const auto &ybctid = request->ybctid_column_value().value(); |
224 | | |
225 | | // Seek a specific partition_key from read_request. |
226 | | // 1. Not specified range condition - Full scan. |
227 | | // 2. ybctid -- Given to fetch one specific row. |
228 | | // 3. paging_state -- Set by server to continue the same request. |
229 | | // 4. upper and lower bound -- Set by PgGate to fetch rows within a boundary. |
230 | | // 5. range column values -- Given to fetch rows for one set of specific range values. |
231 | | // 6. condition expr -- Given to fetch rows that satisfy specific conditions. |
232 | 1.25M | if (!IsNull(ybctid)) { |
233 | 7.76k | request->set_partition_key(ybctid.binary_value()); |
234 | | |
235 | 1.24M | } else if (request->has_paging_state() && |
236 | 1.24M | request->paging_state().has_next_partition_key()34.1k ) { |
237 | | // If this is a subsequent query, use the partition key from the paging state. |
238 | 34.1k | request->set_partition_key(request->paging_state().next_partition_key()); |
239 | | |
240 | 1.21M | } else if (request->has_lower_bound()) { |
241 | | // When PgGate optimizes RANGE expressions, it will set lower_bound and upper_bound by itself. |
242 | | // In that case, we use them without recompute them here. |
243 | | // |
244 | | // NOTE: Currently, PgGate uses this optimization ONLY for COUNT operator and backfill request. |
245 | | // It has not done any optimization on RANGE values yet. |
246 | 108 | request->set_partition_key(request->lower_bound().key()); |
247 | | |
248 | 1.21M | } else { |
249 | | // Evaluate condition to return partition_key and set the upper bound. |
250 | 1.21M | string max_key; |
251 | 1.21M | RETURN_NOT_OK(SetRangePartitionBounds(schema, last_partition, request, &max_key)); |
252 | 1.21M | if (!max_key.empty()) { |
253 | 682k | request->mutable_upper_bound()->set_key(max_key); |
254 | 682k | request->mutable_upper_bound()->set_is_inclusive(true); |
255 | 682k | } |
256 | 1.21M | } |
257 | | |
258 | 1.25M | return Status::OK(); |
259 | 1.25M | } |
260 | | |
261 | | Result<std::string> GetRangePartitionKey( |
262 | 831k | const Schema& schema, const google::protobuf::RepeatedPtrField<PgsqlExpressionPB>& range_cols) { |
263 | 831k | RSTATUS_DCHECK(!schema.num_hash_key_columns(), IllegalState, |
264 | 831k | "Cannot get range partition key for hash partitioned table"); |
265 | | |
266 | 831k | auto range_components = VERIFY_RESULT(client::GetRangeComponents(schema, range_cols, true)); |
267 | 0 | return docdb::DocKey(std::move(range_components)).Encode().ToStringBuffer(); |
268 | 831k | } |
269 | | |
270 | | } // namespace |
271 | | |
272 | | //-------------------------------------------------------------------------------------------------- |
273 | | // YBOperation |
274 | | //-------------------------------------------------------------------------------------------------- |
275 | | |
276 | | YBOperation::YBOperation(const shared_ptr<YBTable>& table) |
277 | 23.2M | : table_(table) { |
278 | 23.2M | } |
279 | | |
280 | 23.2M | YBOperation::~YBOperation() {} |
281 | | |
282 | 207k | void YBOperation::SetTablet(const scoped_refptr<internal::RemoteTablet>& tablet) { |
283 | 207k | tablet_ = tablet; |
284 | 207k | } |
285 | | |
286 | 9.03k | void YBOperation::ResetTablet() { |
287 | 9.03k | tablet_.reset(); |
288 | 9.03k | } |
289 | | |
290 | 0 | void YBOperation::ResetTable(std::shared_ptr<YBTable> new_table) { |
291 | 0 | table_.reset(); |
292 | 0 | table_ = new_table; |
293 | | // tablet_ can no longer be valid. |
294 | 0 | tablet_.reset(); |
295 | 0 | } |
296 | | |
297 | 0 | bool YBOperation::IsTransactional() const { |
298 | 0 | return table_->schema().table_properties().is_transactional(); |
299 | 0 | } |
300 | | |
301 | 0 | bool YBOperation::IsYsqlCatalogOp() const { |
302 | 0 | return table_->schema().table_properties().is_ysql_catalog_table(); |
303 | 0 | } |
304 | | |
305 | 2.05k | void YBOperation::MarkTablePartitionListAsStale() { |
306 | 2.05k | table_->MarkPartitionsAsStale(); |
307 | 2.05k | } |
308 | | |
309 | | //-------------------------------------------------------------------------------------------------- |
310 | | // YBRedisOp |
311 | | //-------------------------------------------------------------------------------------------------- |
312 | | |
313 | | YBRedisOp::YBRedisOp(const shared_ptr<YBTable>& table) |
314 | 208k | : YBOperation(table) { |
315 | 208k | } |
316 | | |
317 | 416k | RedisResponsePB* YBRedisOp::mutable_response() { |
318 | 416k | if (!redis_response_) { |
319 | 208k | redis_response_.reset(new RedisResponsePB()); |
320 | 208k | } |
321 | 416k | return redis_response_.get(); |
322 | 416k | } |
323 | | |
324 | 6 | const RedisResponsePB& YBRedisOp::response() const { |
325 | 6 | return *DCHECK_NOTNULL(redis_response_.get()); |
326 | 6 | } |
327 | | |
328 | 254k | OpGroup YBRedisReadOp::group() { |
329 | 254k | return FLAGS_redis_allow_reads_from_followers ? OpGroup::kConsistentPrefixRead62.1k |
330 | 254k | : OpGroup::kLeaderRead192k ; |
331 | 254k | } |
332 | | |
333 | | // YBRedisWriteOp ----------------------------------------------------------------- |
334 | | |
335 | | YBRedisWriteOp::YBRedisWriteOp(const shared_ptr<YBTable>& table) |
336 | 123k | : YBRedisOp(table), redis_write_request_(new RedisWriteRequestPB()) { |
337 | 123k | } |
338 | | |
339 | 123k | size_t YBRedisWriteOp::space_used_by_request() const { |
340 | 123k | return redis_write_request_->ByteSizeLong(); |
341 | 123k | } |
342 | | |
343 | 0 | std::string YBRedisWriteOp::ToString() const { |
344 | 0 | return "REDIS_WRITE " + redis_write_request_->key_value().key(); |
345 | 0 | } |
346 | | |
347 | 123k | void YBRedisWriteOp::SetHashCode(uint16_t hash_code) { |
348 | 123k | hash_code_ = hash_code; |
349 | 123k | redis_write_request_->mutable_key_value()->set_hash_code(hash_code); |
350 | 123k | } |
351 | | |
352 | 123k | const std::string& YBRedisWriteOp::GetKey() const { |
353 | 123k | return redis_write_request_->key_value().key(); |
354 | 123k | } |
355 | | |
356 | 246k | Status YBRedisWriteOp::GetPartitionKey(std::string *partition_key) const { |
357 | 246k | const Slice& slice(redis_write_request_->key_value().key()); |
358 | 246k | return table_->partition_schema().EncodeRedisKey(slice, partition_key); |
359 | 246k | } |
360 | | |
361 | | // YBRedisReadOp ----------------------------------------------------------------- |
362 | | |
363 | | YBRedisReadOp::YBRedisReadOp(const shared_ptr<YBTable>& table) |
364 | 84.8k | : YBRedisOp(table), redis_read_request_(new RedisReadRequestPB()) { |
365 | 84.8k | } |
366 | | |
367 | 84.1k | size_t YBRedisReadOp::space_used_by_request() const { |
368 | 84.1k | return redis_read_request_->SpaceUsedLong(); |
369 | 84.1k | } |
370 | | |
371 | 0 | std::string YBRedisReadOp::ToString() const { |
372 | 0 | return "REDIS_READ " + redis_read_request_->key_value().key(); |
373 | 0 | } |
374 | | |
375 | 84.8k | void YBRedisReadOp::SetHashCode(uint16_t hash_code) { |
376 | 84.8k | hash_code_ = hash_code; |
377 | 84.8k | redis_read_request_->mutable_key_value()->set_hash_code(hash_code); |
378 | 84.8k | } |
379 | | |
380 | 84.1k | const std::string& YBRedisReadOp::GetKey() const { |
381 | 84.1k | return redis_read_request_->key_value().key(); |
382 | 84.1k | } |
383 | | |
384 | 169k | Status YBRedisReadOp::GetPartitionKey(std::string *partition_key) const { |
385 | 169k | if (!redis_read_request_->key_value().has_key()) { |
386 | 630 | *partition_key = |
387 | 630 | PartitionSchema::EncodeMultiColumnHashValue(redis_read_request_->key_value().hash_code()); |
388 | 630 | return Status::OK(); |
389 | 630 | } |
390 | 168k | const Slice& slice(redis_read_request_->key_value().key()); |
391 | 168k | return table_->partition_schema().EncodeRedisKey(slice, partition_key); |
392 | 169k | } |
393 | | |
394 | | //-------------------------------------------------------------------------------------------------- |
395 | | // YBCql Operators |
396 | | // - These ops should be prefixed with YBCql instead of YBql. |
397 | | // - The prefixes "ql" or "QL" are used for common entities of all languages and not just CQL. |
398 | | // - The name will be clean up later. |
399 | | //-------------------------------------------------------------------------------------------------- |
400 | | |
401 | | YBqlOp::YBqlOp(const shared_ptr<YBTable>& table) |
402 | 11.4M | : YBOperation(table) , ql_response_(new QLResponsePB()) { |
403 | 11.4M | } |
404 | | |
405 | 11.4M | YBqlOp::~YBqlOp() { |
406 | 11.4M | } |
407 | | |
408 | 142k | bool YBqlOp::succeeded() const { |
409 | 142k | return response().status() == QLResponsePB::YQL_STATUS_OK; |
410 | 142k | } |
411 | | |
412 | | // YBqlWriteOp ----------------------------------------------------------------- |
413 | | |
414 | | YBqlWriteOp::YBqlWriteOp(const shared_ptr<YBTable>& table) |
415 | 3.98M | : YBqlOp(table), ql_write_request_(new QLWriteRequestPB()) { |
416 | 3.98M | } |
417 | | |
418 | 3.98M | YBqlWriteOp::~YBqlWriteOp() {} |
419 | | |
420 | | static std::unique_ptr<YBqlWriteOp> NewYBqlWriteOp(const shared_ptr<YBTable>& table, |
421 | 2.03M | QLWriteRequestPB::QLStmtType stmt_type) { |
422 | 2.03M | auto op = std::unique_ptr<YBqlWriteOp>(new YBqlWriteOp(table)); |
423 | 2.03M | QLWriteRequestPB* req = op->mutable_request(); |
424 | 2.03M | req->set_type(stmt_type); |
425 | 2.03M | req->set_client(YQL_CLIENT_CQL); |
426 | | // TODO: Request ID should be filled with CQL stream ID. Query ID should be replaced too. |
427 | 2.03M | req->set_request_id(reinterpret_cast<uint64_t>(op.get())); |
428 | 2.03M | req->set_query_id(op->GetQueryId()); |
429 | | |
430 | 2.03M | req->set_schema_version(table->schema().version()); |
431 | 2.03M | req->set_is_compatible_with_previous_version( |
432 | 2.03M | table->schema().is_compatible_with_previous_version()); |
433 | | |
434 | 2.03M | return op; |
435 | 2.03M | } |
436 | | |
437 | 2.02M | std::unique_ptr<YBqlWriteOp> YBqlWriteOp::NewInsert(const std::shared_ptr<YBTable>& table) { |
438 | 2.02M | return NewYBqlWriteOp(table, QLWriteRequestPB::QL_STMT_INSERT); |
439 | 2.02M | } |
440 | | |
441 | 3.46k | std::unique_ptr<YBqlWriteOp> YBqlWriteOp::NewUpdate(const std::shared_ptr<YBTable>& table) { |
442 | 3.46k | return NewYBqlWriteOp(table, QLWriteRequestPB::QL_STMT_UPDATE); |
443 | 3.46k | } |
444 | | |
445 | 1.16k | std::unique_ptr<YBqlWriteOp> YBqlWriteOp::NewDelete(const std::shared_ptr<YBTable>& table) { |
446 | 1.16k | return NewYBqlWriteOp(table, QLWriteRequestPB::QL_STMT_DELETE); |
447 | 1.16k | } |
448 | | |
449 | 5 | std::string YBqlWriteOp::ToString() const { |
450 | 5 | return "QL_WRITE " + ql_write_request_->ShortDebugString(); |
451 | 5 | } |
452 | | |
453 | 3.99M | Status YBqlWriteOp::GetPartitionKey(string* partition_key) const { |
454 | 3.99M | return table_->partition_schema().EncodeKey(ql_write_request_->hashed_column_values(), |
455 | 3.99M | partition_key); |
456 | 3.99M | } |
457 | | |
458 | 3.99M | void YBqlWriteOp::SetHashCode(const uint16_t hash_code) { |
459 | 3.99M | ql_write_request_->set_hash_code(hash_code); |
460 | 3.99M | } |
461 | | |
462 | 18.5k | uint16_t YBqlWriteOp::GetHashCode() const { |
463 | 18.5k | return ql_write_request_->hash_code(); |
464 | 18.5k | } |
465 | | |
466 | 1.89M | bool YBqlWriteOp::ReadsStaticRow() const { |
467 | | // A QL write op reads the static row if it reads a static column, or it writes to the static row |
468 | | // and has a user-defined timestamp (which DocDB requires a read-modify-write by the timestamp). |
469 | 1.89M | return !ql_write_request_->column_refs().static_ids().empty() || |
470 | 1.89M | (writes_static_row_ && ql_write_request_->has_user_timestamp_usec()226 ); |
471 | 1.89M | } |
472 | | |
473 | 1.89M | bool YBqlWriteOp::ReadsPrimaryRow() const { |
474 | | // A QL write op reads the primary row reads a non-static column, it writes to the primary row |
475 | | // and has a user-defined timestamp (which DocDB requires a read-modify-write by the timestamp), |
476 | | // or if there is an IF clause. |
477 | 1.89M | return !ql_write_request_->column_refs().ids().empty() || |
478 | 1.89M | (1.89M writes_primary_row_1.89M && ql_write_request_->has_user_timestamp_usec()1.87M ) || |
479 | 1.89M | ql_write_request_->has_if_expr()1.89M ; |
480 | 1.89M | } |
481 | | |
482 | 1.89M | bool YBqlWriteOp::WritesStaticRow() const { |
483 | 1.89M | return writes_static_row_; |
484 | 1.89M | } |
485 | | |
486 | 1.89M | bool YBqlWriteOp::WritesPrimaryRow() const { |
487 | 1.89M | return writes_primary_row_; |
488 | 1.89M | } |
489 | | |
490 | 0 | bool YBqlWriteOp::returns_sidecar() { |
491 | 0 | return ql_write_request_->has_if_expr() || ql_write_request_->returns_status(); |
492 | 0 | } |
493 | | |
494 | | // YBqlWriteOp::HashHash/Equal --------------------------------------------------------------- |
495 | 2.01M | size_t YBqlWriteHashKeyComparator::operator()(const YBqlWriteOpPtr& op) const { |
496 | 2.01M | size_t hash = 0; |
497 | | |
498 | | // Hash the table id. |
499 | 2.01M | boost::hash_combine(hash, op->table()->id()); |
500 | | |
501 | | // Hash the hash key. |
502 | 2.01M | string key; |
503 | 2.08M | for (const auto& value : op->request().hashed_column_values()) { |
504 | 2.08M | AppendToKey(value.value(), &key); |
505 | 2.08M | } |
506 | 2.01M | boost::hash_combine(hash, key); |
507 | | |
508 | 2.01M | return hash; |
509 | 2.01M | } |
510 | | |
511 | | bool YBqlWriteHashKeyComparator::operator()(const YBqlWriteOpPtr& op1, |
512 | 126k | const YBqlWriteOpPtr& op2) const { |
513 | | // Check if two write ops overlap that they apply to the same hash key in the same table. |
514 | 126k | if (op1->table() != op2->table() && op1->table()->id() != op2->table()->id()216 ) { |
515 | 216 | return false; |
516 | 216 | } |
517 | 126k | const QLWriteRequestPB& req1 = op1->request(); |
518 | 126k | const QLWriteRequestPB& req2 = op2->request(); |
519 | 126k | if (req1.hashed_column_values_size() != req2.hashed_column_values_size()) { |
520 | 0 | return false; |
521 | 0 | } |
522 | 243k | for (int i = 0; 126k i < req1.hashed_column_values().size(); i++116k ) { |
523 | 127k | DCHECK(req1.hashed_column_values()[i].has_value()); |
524 | 127k | DCHECK(req2.hashed_column_values()[i].has_value()); |
525 | 127k | if (req1.hashed_column_values()[i].value() != req2.hashed_column_values()[i].value()) |
526 | 10.7k | return false; |
527 | 127k | } |
528 | 115k | return true; |
529 | 126k | } |
530 | | |
531 | | // YBqlWriteOp::PrimaryHash/Equal --------------------------------------------------------------- |
532 | 2.01M | size_t YBqlWritePrimaryKeyComparator::operator()(const YBqlWriteOpPtr& op) const { |
533 | 2.01M | size_t hash = YBqlWriteHashKeyComparator()(op); |
534 | | |
535 | | // Hash the range key also. |
536 | 2.01M | string key; |
537 | 2.01M | for (const auto& value : op->request().range_column_values()) { |
538 | 1.93M | AppendToKey(value.value(), &key); |
539 | 1.93M | } |
540 | 2.01M | boost::hash_combine(hash, key); |
541 | | |
542 | 2.01M | return hash; |
543 | 2.01M | } |
544 | | |
545 | | bool YBqlWritePrimaryKeyComparator::operator()(const YBqlWriteOpPtr& op1, |
546 | 126k | const YBqlWriteOpPtr& op2) const { |
547 | 126k | if (!YBqlWriteHashKeyComparator()(op1, op2)) { |
548 | 10.9k | return false; |
549 | 10.9k | } |
550 | | |
551 | | // Check if two write ops overlap that they apply to the range key also. |
552 | 115k | const QLWriteRequestPB& req1 = op1->request(); |
553 | 115k | const QLWriteRequestPB& req2 = op2->request(); |
554 | 115k | if (req1.range_column_values_size() != req2.range_column_values_size()) { |
555 | 0 | return false; |
556 | 0 | } |
557 | 115k | for (int i = 0; 115k i < req1.range_column_values().size(); i++132 ) { |
558 | 115k | DCHECK(req1.range_column_values()[i].has_value()); |
559 | 115k | DCHECK(req2.range_column_values()[i].has_value()); |
560 | 115k | if (req1.range_column_values()[i].value() != req2.range_column_values()[i].value()) |
561 | 115k | return false; |
562 | 115k | } |
563 | 45 | return true; |
564 | 115k | } |
565 | | |
566 | | // YBqlReadOp ----------------------------------------------------------------- |
567 | | |
568 | | YBqlReadOp::YBqlReadOp(const shared_ptr<YBTable>& table) |
569 | | : YBqlOp(table), |
570 | | ql_read_request_(new QLReadRequestPB()), |
571 | 7.45M | yb_consistency_level_(YBConsistencyLevel::STRONG) { |
572 | 7.45M | } |
573 | | |
574 | 7.50M | YBqlReadOp::~YBqlReadOp() {} |
575 | | |
576 | 22.6M | OpGroup YBqlReadOp::group() { |
577 | 22.6M | return yb_consistency_level_ == YBConsistencyLevel::CONSISTENT_PREFIX |
578 | 22.6M | ? OpGroup::kConsistentPrefixRead3.89k : OpGroup::kLeaderRead22.6M ; |
579 | 22.6M | } |
580 | | |
581 | 7.47M | std::unique_ptr<YBqlReadOp> YBqlReadOp::NewSelect(const shared_ptr<YBTable>& table) { |
582 | 7.47M | std::unique_ptr<YBqlReadOp> op(new YBqlReadOp(table)); |
583 | 7.47M | QLReadRequestPB *req = op->mutable_request(); |
584 | 7.47M | req->set_client(YQL_CLIENT_CQL); |
585 | | // TODO: Request ID should be filled with CQL stream ID. Query ID should be replaced too. |
586 | 7.47M | req->set_request_id(reinterpret_cast<uint64_t>(op.get())); |
587 | 7.47M | req->set_query_id(op->GetQueryId()); |
588 | | |
589 | 7.47M | req->set_schema_version(table->schema().version()); |
590 | 7.47M | req->set_is_compatible_with_previous_version( |
591 | 7.47M | table->schema().is_compatible_with_previous_version()); |
592 | | |
593 | 7.47M | return op; |
594 | 7.47M | } |
595 | | |
596 | 0 | std::string YBqlReadOp::ToString() const { |
597 | 0 | return "QL_READ " + ql_read_request_->DebugString(); |
598 | 0 | } |
599 | | |
600 | 7.35M | void YBqlReadOp::SetHashCode(const uint16_t hash_code) { |
601 | 7.35M | ql_read_request_->set_hash_code(hash_code); |
602 | 7.35M | } |
603 | | |
604 | 7.49M | Status YBqlReadOp::GetPartitionKey(string* partition_key) const { |
605 | 7.49M | if (!ql_read_request_->hashed_column_values().empty()) { |
606 | | // If hashed columns are set, use them to compute the exact key and set the bounds |
607 | 7.29M | RETURN_NOT_OK(table_->partition_schema().EncodeKey(ql_read_request_->hashed_column_values(), |
608 | 7.29M | partition_key)); |
609 | | |
610 | | // TODO: If user specified token range doesn't contain the hash columns specified then the query |
611 | | // will have no effect. We need to implement an exit path rather than requesting the tablets. |
612 | | // For now, we set point query some value that is not equal to the hash to the hash columns |
613 | | // Which will return no result. |
614 | | |
615 | | // Make sure given key is not smaller than lower bound (if any) |
616 | 7.29M | if (ql_read_request_->has_hash_code()) { |
617 | 26 | uint16 hash_code = static_cast<uint16>(ql_read_request_->hash_code()); |
618 | 26 | auto lower_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code); |
619 | 26 | if (*partition_key < lower_bound) *partition_key = std::move(lower_bound)6 ; |
620 | 26 | } |
621 | | |
622 | | // Make sure given key is not bigger than upper bound (if any) |
623 | 7.29M | if (ql_read_request_->has_max_hash_code()) { |
624 | 16 | uint16 hash_code = static_cast<uint16>(ql_read_request_->max_hash_code()); |
625 | 16 | auto upper_bound = PartitionSchema::EncodeMultiColumnHashValue(hash_code); |
626 | 16 | if (*partition_key > upper_bound) *partition_key = std::move(upper_bound)5 ; |
627 | 16 | } |
628 | | |
629 | | // Set both bounds to equal partition key now, because this is a point get |
630 | 7.29M | ql_read_request_->set_hash_code( |
631 | 7.29M | PartitionSchema::DecodeMultiColumnHashValue(*partition_key)); |
632 | 7.29M | ql_read_request_->set_max_hash_code( |
633 | 7.29M | PartitionSchema::DecodeMultiColumnHashValue(*partition_key)); |
634 | 7.29M | } else { |
635 | | // Otherwise, set the partition key to the hash_code (lower bound of the token range). |
636 | 204k | if (ql_read_request_->has_hash_code()) { |
637 | 27.4k | uint16 hash_code = static_cast<uint16>(ql_read_request_->hash_code()); |
638 | 27.4k | *partition_key = PartitionSchema::EncodeMultiColumnHashValue(hash_code); |
639 | 176k | } else { |
640 | | // Default to empty key, this will start a scan from the beginning. |
641 | 176k | partition_key->clear(); |
642 | 176k | } |
643 | 204k | } |
644 | | |
645 | | // If this is a continued query use the partition key from the paging state |
646 | | // If paging state is there, set hash_code = paging state. This is only supported for forward |
647 | | // scans. |
648 | 7.49M | if (ql_read_request_->has_paging_state() && |
649 | 7.49M | ql_read_request_->paging_state().has_next_partition_key()35.0k && |
650 | 7.49M | !ql_read_request_->paging_state().next_partition_key().empty()35.0k ) { |
651 | 34.6k | *partition_key = ql_read_request_->paging_state().next_partition_key(); |
652 | | |
653 | | // Check that the partition key we got from the paging state is within bounds. |
654 | 34.6k | uint16 paging_state_hash_code = PartitionSchema::DecodeMultiColumnHashValue(*partition_key); |
655 | 34.6k | if ((ql_read_request_->has_hash_code() && |
656 | 34.6k | paging_state_hash_code < ql_read_request_->hash_code()27.4k ) || |
657 | 34.6k | (ql_read_request_->has_max_hash_code() && |
658 | 34.6k | paging_state_hash_code > ql_read_request_->max_hash_code()185 )) { |
659 | 0 | return STATUS_SUBSTITUTE(InternalError, |
660 | 0 | "Out of bounds partition key found in paging state:" |
661 | 0 | "Query's partition bounds: [$0, $1], paging state partition: $2", |
662 | 0 | ql_read_request_->hash_code(), |
663 | 0 | ql_read_request_->max_hash_code() , |
664 | 0 | paging_state_hash_code); |
665 | 0 | } |
666 | | |
667 | 34.6k | ql_read_request_->set_hash_code(paging_state_hash_code); |
668 | 34.6k | } |
669 | | |
670 | 7.49M | return Status::OK(); |
671 | 7.49M | } |
672 | | |
673 | | std::vector<ColumnSchema> MakeColumnSchemasFromColDesc( |
674 | 7.54M | const google::protobuf::RepeatedPtrField<QLRSColDescPB>& rscol_descs) { |
675 | 7.54M | std::vector<ColumnSchema> column_schemas; |
676 | 7.54M | column_schemas.reserve(rscol_descs.size()); |
677 | 24.0M | for (const auto& rscol_desc : rscol_descs) { |
678 | 24.0M | column_schemas.emplace_back(rscol_desc.name(), QLType::FromQLTypePB(rscol_desc.ql_type())); |
679 | 24.0M | } |
680 | 7.54M | return column_schemas; |
681 | 7.54M | } |
682 | | |
683 | 7.54M | std::vector<ColumnSchema> YBqlReadOp::MakeColumnSchemasFromRequest() const { |
684 | | // Tests don't have access to the QL internal statement object, so they have to use rsrow |
685 | | // descriptor from the read request. |
686 | 7.54M | return MakeColumnSchemasFromColDesc(request().rsrow_desc().rscol_descs()); |
687 | 7.54M | } |
688 | | |
689 | 796 | Result<QLRowBlock> YBqlReadOp::MakeRowBlock() const { |
690 | 796 | Schema schema(MakeColumnSchemasFromRequest(), 0); |
691 | 796 | QLRowBlock result(schema); |
692 | 796 | Slice data(rows_data_); |
693 | 796 | if (!data.empty()) { |
694 | 796 | RETURN_NOT_OK(result.Deserialize(request().client(), &data)); |
695 | 796 | } |
696 | 796 | return result; |
697 | 796 | } |
698 | | |
699 | | //-------------------------------------------------------------------------------------------------- |
700 | | // YBPgsql Operators |
701 | | //-------------------------------------------------------------------------------------------------- |
702 | | |
703 | | YBPgsqlOp::YBPgsqlOp(const shared_ptr<YBTable>& table, std::string* partition_key) |
704 | | : YBOperation(table), response_(new PgsqlResponsePB()), |
705 | 11.5M | partition_key_(partition_key ? std::move(*partition_key) : std::string()) { |
706 | 11.5M | } |
707 | | |
708 | 11.5M | YBPgsqlOp::~YBPgsqlOp() { |
709 | 11.5M | } |
710 | | |
711 | 9.03M | bool YBPgsqlOp::succeeded() const { |
712 | 9.03M | return response().status() == PgsqlResponsePB::PGSQL_STATUS_OK; |
713 | 9.03M | } |
714 | | |
715 | 9.03M | bool YBPgsqlOp::applied() { |
716 | 9.03M | return succeeded() && !response_->skipped()9.02M ; |
717 | 9.03M | } |
718 | | |
719 | | namespace { |
720 | | |
721 | 0 | std::string ResponseSuffix(const PgsqlResponsePB& response) { |
722 | 0 | const auto str = response.ShortDebugString(); |
723 | 0 | return str.empty() ? std::string() : (", response: " + str); |
724 | 0 | } |
725 | | |
726 | | } // namespace |
727 | | |
728 | | //-------------------------------------------------------------------------------------------------- |
729 | | // YBPgsqlWriteOp |
730 | | |
731 | | YBPgsqlWriteOp::YBPgsqlWriteOp(const shared_ptr<YBTable>& table, PgsqlWriteRequestPB* request) |
732 | 7.21M | : YBPgsqlOp(table, request ? request->mutable_partition_key() : nullptr), request_(request) { |
733 | 7.21M | if (!request) { |
734 | 3.56k | request_holder_ = std::make_unique<PgsqlWriteRequestPB>(); |
735 | 3.56k | request_ = request_holder_.get(); |
736 | 3.56k | } |
737 | 7.21M | } |
738 | | |
739 | 7.21M | YBPgsqlWriteOp::~YBPgsqlWriteOp() {} |
740 | | |
741 | | static std::unique_ptr<YBPgsqlWriteOp> NewYBPgsqlWriteOp( |
742 | | const shared_ptr<YBTable>& table, |
743 | 3.56k | PgsqlWriteRequestPB::PgsqlStmtType stmt_type) { |
744 | 3.56k | auto op = std::make_unique<YBPgsqlWriteOp>(table); |
745 | 3.56k | PgsqlWriteRequestPB *req = op->mutable_request(); |
746 | 3.56k | req->set_stmt_type(stmt_type); |
747 | 3.56k | req->set_client(YQL_CLIENT_PGSQL); |
748 | 3.56k | req->set_table_id(table->id()); |
749 | 3.56k | req->set_schema_version(table->schema().version()); |
750 | 3.56k | req->set_stmt_id(op->GetQueryId()); |
751 | | |
752 | 3.56k | return op; |
753 | 3.56k | } |
754 | | |
755 | 295 | std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewInsert(const std::shared_ptr<YBTable>& table) { |
756 | 295 | return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_INSERT); |
757 | 295 | } |
758 | | |
759 | 2.97k | std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewUpdate(const std::shared_ptr<YBTable>& table) { |
760 | 2.97k | return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_UPDATE); |
761 | 2.97k | } |
762 | | |
763 | 295 | std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewDelete(const std::shared_ptr<YBTable>& table) { |
764 | 295 | return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_DELETE); |
765 | 295 | } |
766 | | |
767 | | std::unique_ptr<YBPgsqlWriteOp> YBPgsqlWriteOp::NewTruncateColocated( |
768 | 0 | const std::shared_ptr<YBTable>& table) { |
769 | 0 | return NewYBPgsqlWriteOp(table, PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED); |
770 | 0 | } |
771 | | |
772 | 0 | std::string YBPgsqlWriteOp::ToString() const { |
773 | 0 | return Format( |
774 | 0 | "PGSQL_WRITE $0$1$2", request_->ShortDebugString(), |
775 | 0 | (write_time_ ? " write_time: " + write_time_.ToString() : ""), ResponseSuffix(response())); |
776 | 0 | } |
777 | | |
778 | 5.61M | void YBPgsqlWriteOp::SetHashCode(const uint16_t hash_code) { |
779 | 5.61M | request_->set_hash_code(hash_code); |
780 | 5.61M | } |
781 | | |
782 | 0 | bool YBPgsqlWriteOp::IsTransactional() const { |
783 | 0 | return !is_single_row_txn_ && table_->schema().table_properties().is_transactional(); |
784 | 0 | } |
785 | | |
786 | 7.21M | CHECKED_STATUS YBPgsqlWriteOp::GetPartitionKey(std::string* partition_key) const { |
787 | 7.21M | if (!request_holder_) { |
788 | 7.20M | return YBPgsqlOp::GetPartitionKey(partition_key); |
789 | 7.20M | } |
790 | 4.21k | RETURN_NOT_OK(InitPartitionKey(table_->InternalSchema(), table_->partition_schema(), request_)); |
791 | 4.21k | *partition_key = std::move(*request_->mutable_partition_key()); |
792 | 4.21k | return Status::OK(); |
793 | 4.21k | } |
794 | | |
795 | | //-------------------------------------------------------------------------------------------------- |
796 | | // YBPgsqlReadOp |
797 | | |
798 | | YBPgsqlReadOp::YBPgsqlReadOp(const shared_ptr<YBTable>& table, PgsqlReadRequestPB* request) |
799 | | : YBPgsqlOp(table, request ? request->mutable_partition_key() : nullptr), |
800 | | request_(request), |
801 | 4.36M | yb_consistency_level_(YBConsistencyLevel::STRONG) { |
802 | 4.36M | if (!request) { |
803 | 3.23k | request_holder_ = std::make_unique<PgsqlReadRequestPB>(); |
804 | 3.23k | request_ = request_holder_.get(); |
805 | 3.23k | } |
806 | 4.36M | } |
807 | | |
808 | 3.23k | std::unique_ptr<YBPgsqlReadOp> YBPgsqlReadOp::NewSelect(const shared_ptr<YBTable>& table) { |
809 | 3.23k | std::unique_ptr<YBPgsqlReadOp> op(new YBPgsqlReadOp(table)); |
810 | 3.23k | PgsqlReadRequestPB *req = op->mutable_request(); |
811 | 3.23k | req->set_client(YQL_CLIENT_PGSQL); |
812 | 3.23k | req->set_table_id(table->id()); |
813 | 3.23k | req->set_schema_version(table->schema().version()); |
814 | 3.23k | req->set_stmt_id(op->GetQueryId()); |
815 | | |
816 | 3.23k | return op; |
817 | 3.23k | } |
818 | | |
819 | 0 | std::unique_ptr<YBPgsqlReadOp> YBPgsqlReadOp::NewSample(const shared_ptr<YBTable>& table) { |
820 | 0 | std::unique_ptr<YBPgsqlReadOp> op(new YBPgsqlReadOp(table)); |
821 | 0 | PgsqlReadRequestPB *req = op->mutable_request(); |
822 | 0 | req->set_client(YQL_CLIENT_PGSQL); |
823 | 0 | req->set_table_id(table->id()); |
824 | 0 | req->set_schema_version(table->schema().version()); |
825 | 0 | req->set_stmt_id(op->GetQueryId()); |
826 | |
|
827 | 0 | return op; |
828 | 0 | } |
829 | | |
830 | 0 | std::string YBPgsqlReadOp::ToString() const { |
831 | 0 | return "PGSQL_READ " + request_->ShortDebugString() + ResponseSuffix(response()); |
832 | 0 | } |
833 | | |
834 | 3.03M | void YBPgsqlReadOp::SetHashCode(const uint16_t hash_code) { |
835 | 3.03M | request_->set_hash_code(hash_code); |
836 | 3.03M | } |
837 | | |
838 | | std::vector<ColumnSchema> YBPgsqlReadOp::MakeColumnSchemasFromColDesc( |
839 | 0 | const google::protobuf::RepeatedPtrField<PgsqlRSColDescPB>& rscol_descs) { |
840 | 0 | std::vector<ColumnSchema> column_schemas; |
841 | 0 | column_schemas.reserve(rscol_descs.size()); |
842 | 0 | for (const auto& rscol_desc : rscol_descs) { |
843 | 0 | column_schemas.emplace_back(rscol_desc.name(), QLType::FromQLTypePB(rscol_desc.ql_type())); |
844 | 0 | } |
845 | 0 | return column_schemas; |
846 | 0 | } |
847 | | |
848 | 0 | std::vector<ColumnSchema> YBPgsqlReadOp::MakeColumnSchemasFromRequest() const { |
849 | | // Tests don't have access to the QL internal statement object, so they have to use rsrow |
850 | | // descriptor from the read request. |
851 | 0 | return MakeColumnSchemasFromColDesc(request().rsrow_desc().rscol_descs()); |
852 | 0 | } |
853 | | |
854 | 25.1M | OpGroup YBPgsqlReadOp::group() { |
855 | 25.1M | return yb_consistency_level_ == YBConsistencyLevel::CONSISTENT_PREFIX |
856 | 25.1M | ? OpGroup::kConsistentPrefixRead269 : OpGroup::kLeaderRead25.1M ; |
857 | 25.1M | } |
858 | | |
859 | 1.95M | void YBPgsqlReadOp::SetUsedReadTime(const ReadHybridTime& used_time) { |
860 | 1.95M | used_read_time_ = used_time; |
861 | 1.95M | } |
862 | | |
863 | 4.36M | CHECKED_STATUS YBPgsqlReadOp::GetPartitionKey(std::string* partition_key) const { |
864 | 4.36M | if (!request_holder_) { |
865 | 4.35M | return YBPgsqlOp::GetPartitionKey(partition_key); |
866 | 4.35M | } |
867 | 4.77k | RETURN_NOT_OK(InitPartitionKey( |
868 | 4.77k | table_->InternalSchema(), table_->partition_schema(), table_->GetPartitionsShared()->back(), |
869 | 4.77k | request_)); |
870 | 4.77k | *partition_key = std::move(*request_->mutable_partition_key()); |
871 | 4.77k | return Status::OK(); |
872 | 4.77k | } |
873 | | |
874 | | //////////////////////////////////////////////////////////// |
875 | | // YBNoOp |
876 | | //////////////////////////////////////////////////////////// |
877 | | |
878 | | YBNoOp::YBNoOp(const std::shared_ptr<YBTable>& table) |
879 | 0 | : table_(table) { |
880 | 0 | } |
881 | | |
882 | 0 | Status YBNoOp::Execute(YBClient* client, const YBPartialRow& key) { |
883 | 0 | string encoded_key; |
884 | 0 | RETURN_NOT_OK(table_->partition_schema().EncodeKey(key, &encoded_key)); |
885 | 0 | CoarseTimePoint deadline = CoarseMonoClock::Now() + 5s; |
886 | |
|
887 | 0 | tserver::NoOpRequestPB noop_req; |
888 | 0 | tserver::NoOpResponsePB noop_resp; |
889 | |
|
890 | 0 | for (int attempt = 1; attempt < 11; attempt++) { |
891 | 0 | Synchronizer sync; |
892 | 0 | auto remote_ = VERIFY_RESULT(client->data_->meta_cache_->LookupTabletByKeyFuture( |
893 | 0 | table_, encoded_key, deadline).get()); |
894 | | |
895 | 0 | internal::RemoteTabletServer *ts = nullptr; |
896 | 0 | std::vector<internal::RemoteTabletServer*> candidates; |
897 | 0 | std::set<string> blacklist; // TODO: empty set for now. |
898 | 0 | Status lookup_status = client->data_->GetTabletServer( |
899 | 0 | client, |
900 | 0 | remote_, |
901 | 0 | YBClient::ReplicaSelection::LEADER_ONLY, |
902 | 0 | blacklist, |
903 | 0 | &candidates, |
904 | 0 | &ts); |
905 | | |
906 | | // If we get ServiceUnavailable, this indicates that the tablet doesn't |
907 | | // currently have any known leader. We should sleep and retry, since |
908 | | // it's likely that the tablet is undergoing a leader election and will |
909 | | // soon have one. |
910 | 0 | if (lookup_status.IsServiceUnavailable() && CoarseMonoClock::Now() < deadline) { |
911 | 0 | const int sleep_ms = attempt * 100; |
912 | 0 | VLOG(1) << "Tablet " << remote_->tablet_id() << " current unavailable: " |
913 | 0 | << lookup_status.ToString() << ". Sleeping for " << sleep_ms << "ms " |
914 | 0 | << "and retrying..."; |
915 | 0 | SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); |
916 | 0 | continue; |
917 | 0 | } |
918 | 0 | RETURN_NOT_OK(lookup_status); |
919 | | |
920 | 0 | auto now = CoarseMonoClock::Now(); |
921 | 0 | if (deadline < now) { |
922 | 0 | return STATUS(TimedOut, "Op timed out, deadline expired"); |
923 | 0 | } |
924 | | |
925 | | // Recalculate the deadlines. |
926 | | // If we have other replicas beyond this one to try, then we'll use the default RPC timeout. |
927 | | // That gives us time to try other replicas later. Otherwise, use the full remaining deadline |
928 | | // for the user's call. |
929 | 0 | CoarseTimePoint rpc_deadline; |
930 | 0 | if (static_cast<int>(candidates.size()) - blacklist.size() > 1) { |
931 | 0 | rpc_deadline = now + client->default_rpc_timeout(); |
932 | 0 | rpc_deadline = std::min(deadline, rpc_deadline); |
933 | 0 | } else { |
934 | 0 | rpc_deadline = deadline; |
935 | 0 | } |
936 | |
|
937 | 0 | rpc::RpcController controller; |
938 | 0 | controller.set_deadline(rpc_deadline); |
939 | |
|
940 | 0 | CHECK(ts->proxy()); |
941 | 0 | const Status rpc_status = ts->proxy()->NoOp(noop_req, &noop_resp, &controller); |
942 | 0 | if (rpc_status.ok() && !noop_resp.has_error()) { |
943 | 0 | break; |
944 | 0 | } |
945 | | |
946 | 0 | LOG(INFO) << rpc_status.CodeAsString(); |
947 | 0 | if (noop_resp.has_error()) { |
948 | 0 | Status s = StatusFromPB(noop_resp.error().status()); |
949 | 0 | LOG(INFO) << rpc_status.CodeAsString(); |
950 | 0 | } |
951 | | /* |
952 | | * TODO: For now, we just try a few attempts and exit. Ideally, we should check for |
953 | | * errors that are retriable, and retry if so. |
954 | | * RETURN_NOT_OK(CanBeRetried(true, rpc_status, server_status, rpc_deadline, deadline, |
955 | | * candidates, blacklist)); |
956 | | */ |
957 | 0 | } |
958 | | |
959 | 0 | return Status::OK(); |
960 | 0 | } |
961 | | |
962 | 3.36M | bool YBPgsqlReadOp::should_add_intents(IsolationLevel isolation_level) { |
963 | 3.36M | return isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION || |
964 | 3.36M | IsValidRowMarkType(GetRowMarkTypeFromPB(*request_))1.18M ; |
965 | 3.36M | } |
966 | | |
967 | | CHECKED_STATUS InitPartitionKey( |
968 | | const Schema& schema, const PartitionSchema& partition_schema, |
969 | 4.37M | const std::string& last_partition, PgsqlReadRequestPB* request) { |
970 | 4.37M | if (schema.num_hash_key_columns() > 0) { |
971 | 3.11M | return InitHashPartitionKey(schema, partition_schema, request); |
972 | 3.11M | } |
973 | | |
974 | 1.25M | return InitRangePartitionKey(schema, last_partition, request); |
975 | 4.37M | } |
976 | | |
977 | | CHECKED_STATUS InitPartitionKey( |
978 | 7.21M | const Schema& schema, const PartitionSchema& partition_schema, PgsqlWriteRequestPB* request) { |
979 | 7.21M | const auto& ybctid = request->ybctid_column_value().value(); |
980 | 7.21M | if (schema.num_hash_key_columns() > 0) { |
981 | 5.61M | if (!IsNull(ybctid)) { |
982 | 5.09M | const uint16 hash_code = VERIFY_RESULT(docdb::DocKey::DecodeHash(ybctid.binary_value())); |
983 | 0 | request->set_hash_code(hash_code); |
984 | 5.09M | request->set_partition_key(PartitionSchema::EncodeMultiColumnHashValue(hash_code)); |
985 | 5.09M | return Status::OK(); |
986 | 5.09M | } |
987 | | |
988 | | // Computing the partition_key. |
989 | 515k | return partition_schema.EncodeKey( |
990 | 515k | request->partition_column_values(), request->mutable_partition_key()); |
991 | 5.61M | } else { |
992 | | // Range partitioned table |
993 | 1.59M | if (!IsNull(ybctid)) { |
994 | 764k | request->set_partition_key(ybctid.binary_value()); |
995 | 764k | return Status::OK(); |
996 | 764k | } |
997 | | |
998 | | // Computing the range key. |
999 | 833k | request->set_partition_key(VERIFY_RESULT(GetRangePartitionKey( |
1000 | 833k | schema, request->range_column_values()))); |
1001 | 0 | return Status::OK(); |
1002 | 833k | } |
1003 | 7.21M | } |
1004 | | |
1005 | | Result<std::vector<docdb::PrimitiveValue>> GetRangeComponents( |
1006 | | const Schema& schema, |
1007 | | const google::protobuf::RepeatedPtrField<PgsqlExpressionPB>& range_cols, |
1008 | 2.18M | bool lower_bound) { |
1009 | 2.18M | int i = 0; |
1010 | 2.18M | auto num_range_key_columns = narrow_cast<int>(schema.num_range_key_columns()); |
1011 | 2.18M | std::vector<docdb::PrimitiveValue> result; |
1012 | 4.64M | for (const auto& col_id : schema.column_ids()) { |
1013 | 4.64M | if (!schema.is_range_column(col_id)) { |
1014 | 0 | continue; |
1015 | 0 | } |
1016 | | |
1017 | 4.64M | const ColumnSchema& column_schema = VERIFY_RESULT(schema.column_by_id(col_id)); |
1018 | 4.64M | if (i >= range_cols.size() || range_cols[i].value().value_case() == QLValuePB::VALUE_NOT_SET4.29M ) { |
1019 | 539k | if (lower_bound) { |
1020 | 360k | result.emplace_back(docdb::ValueType::kLowest); |
1021 | 360k | } else { |
1022 | 178k | result.emplace_back(docdb::ValueType::kHighest); |
1023 | 178k | } |
1024 | 4.11M | } else { |
1025 | 4.11M | result.push_back(docdb::PrimitiveValue::FromQLValuePB( |
1026 | 4.11M | range_cols[i].value(), column_schema.sorting_type())); |
1027 | 4.11M | } |
1028 | | |
1029 | 4.64M | if (++i == num_range_key_columns) { |
1030 | 2.18M | break; |
1031 | 2.18M | } |
1032 | | |
1033 | 2.46M | if (!lower_bound) { |
1034 | 340k | result.emplace_back(docdb::ValueType::kHighest); |
1035 | 340k | } |
1036 | 2.46M | } |
1037 | 2.18M | return result; |
1038 | 2.18M | } |
1039 | | |
1040 | | Status GetRangePartitionBounds(const Schema& schema, |
1041 | | const PgsqlReadRequestPB& request, |
1042 | | vector<docdb::PrimitiveValue>* lower_bound, |
1043 | 1.21M | vector<docdb::PrimitiveValue>* upper_bound) { |
1044 | 1.21M | SCHECK(!schema.num_hash_key_columns(), IllegalState, |
1045 | 1.21M | "Cannot set range partition key for hash partitioned table"); |
1046 | 1.21M | const auto& range_cols = request.range_column_values(); |
1047 | 1.21M | const auto& condition_expr = request.condition_expr(); |
1048 | 1.21M | if (condition_expr.has_condition() && |
1049 | 1.21M | implicit_cast<size_t>(range_cols.size()) < schema.num_range_key_columns()4.99k ) { |
1050 | 4.99k | auto prefixed_range_components = VERIFY_RESULT(docdb::InitKeyColumnPrimitiveValues( |
1051 | 4.99k | range_cols, schema, schema.num_hash_key_columns())); |
1052 | 0 | QLScanRange scan_range(schema, condition_expr.condition()); |
1053 | 4.99k | *lower_bound = docdb::GetRangeKeyScanSpec( |
1054 | 4.99k | schema, &prefixed_range_components, &scan_range, true /* lower_bound */); |
1055 | 4.99k | *upper_bound = docdb::GetRangeKeyScanSpec( |
1056 | 4.99k | schema, &prefixed_range_components, &scan_range, false /* upper_bound */); |
1057 | 1.20M | } else if (!range_cols.empty()) { |
1058 | 677k | *lower_bound = VERIFY_RESULT(GetRangeComponents(schema, range_cols, true)); |
1059 | 677k | *upper_bound = VERIFY_RESULT(GetRangeComponents(schema, range_cols, false)); |
1060 | 677k | } |
1061 | 1.21M | return Status::OK(); |
1062 | 1.21M | } |
1063 | | |
1064 | | } // namespace client |
1065 | | } // namespace yb |