/Users/deen/code/yugabyte-db/src/yb/docdb/pgsql_operation.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/pgsql_operation.h" |
15 | | |
16 | | #include <limits> |
17 | | #include <string> |
18 | | #include <unordered_set> |
19 | | #include <vector> |
20 | | |
21 | | #include <boost/optional/optional_io.hpp> |
22 | | |
23 | | #include "yb/common/partition.h" |
24 | | #include "yb/common/pg_system_attr.h" |
25 | | #include "yb/common/ql_value.h" |
26 | | |
27 | | #include "yb/docdb/doc_path.h" |
28 | | #include "yb/docdb/doc_pg_expr.h" |
29 | | #include "yb/docdb/doc_pgsql_scanspec.h" |
30 | | #include "yb/docdb/doc_rowwise_iterator.h" |
31 | | #include "yb/docdb/doc_write_batch.h" |
32 | | #include "yb/docdb/docdb.pb.h" |
33 | | #include "yb/docdb/docdb_debug.h" |
34 | | #include "yb/docdb/docdb_pgapi.h" |
35 | | #include "yb/docdb/docdb_rocksdb_util.h" |
36 | | #include "yb/docdb/intent_aware_iterator.h" |
37 | | #include "yb/docdb/primitive_value_util.h" |
38 | | #include "yb/docdb/ql_storage_interface.h" |
39 | | |
40 | | #include "yb/util/flag_tags.h" |
41 | | #include "yb/util/result.h" |
42 | | #include "yb/util/scope_exit.h" |
43 | | #include "yb/util/status_format.h" |
44 | | #include "yb/util/trace.h" |
45 | | |
46 | | #include "yb/yql/pggate/util/pg_doc_data.h" |
47 | | |
48 | | using namespace std::literals; |
49 | | |
50 | | DECLARE_bool(ysql_disable_index_backfill); |
51 | | |
52 | | DEFINE_double(ysql_scan_timeout_multiplier, 0.5, |
53 | | "DEPRECATED. Has no affect, use ysql_scan_deadline_margin_ms to control the client " |
54 | | "timeout"); |
55 | | |
56 | | DEFINE_uint64(ysql_scan_deadline_margin_ms, 1000, |
57 | | "Scan deadline is calculated by adding client timeout to the time when the request " |
58 | | "was received. It defines the moment in time when client has definitely timed out " |
59 | | "and if the request is yet in processing after the deadline, it can be canceled. " |
60 | | "Therefore to prevent client timeout, the request handler should return partial " |
61 | | "result and paging information some time before the deadline. That's what the " |
62 | | "ysql_scan_deadline_margin_ms is for. It should account for network and processing " |
63 | | "delays."); |
64 | | |
65 | | DEFINE_bool(pgsql_consistent_transactional_paging, true, |
66 | | "Whether to enforce consistency of data returned for second page and beyond for YSQL " |
67 | | "queries on transactional tables. If true, read restart errors could be returned to " |
68 | | "prevent inconsistency. If false, no read restart errors are returned but the data may " |
69 | | "be stale. The latter is preferable for long scans. The data returned for the first " |
70 | | "page of results is never stale regardless of this flag."); |
71 | | |
72 | | DEFINE_test_flag(int32, slowdown_pgsql_aggregate_read_ms, 0, |
73 | | "If set > 0, slows down the response to pgsql aggregate read by this amount."); |
74 | | |
75 | | namespace yb { |
76 | | namespace docdb { |
77 | | |
78 | | namespace { |
79 | | |
80 | | // Compatibility: accept column references from a legacy nodes as a list of column ids only |
81 | | CHECKED_STATUS CreateProjection(const Schema& schema, |
82 | | const PgsqlColumnRefsPB& column_refs, |
83 | 2.03M | Schema* projection) { |
84 | | // Create projection of non-primary key columns. Primary key columns are implicitly read by DocDB. |
85 | | // It will also sort the columns before scanning. |
86 | 2.03M | vector<ColumnId> column_ids; |
87 | 2.03M | column_ids.reserve(column_refs.ids_size()); |
88 | 403k | for (int32_t id : column_refs.ids()) { |
89 | 403k | const ColumnId column_id(id); |
90 | 403k | if (!schema.is_key_column(column_id)) { |
91 | 357k | column_ids.emplace_back(column_id); |
92 | 357k | } |
93 | 403k | } |
94 | 2.03M | return schema.CreateProjectionByIdsIgnoreMissing(column_ids, projection); |
95 | 2.03M | } |
96 | | |
97 | | CHECKED_STATUS CreateProjection( |
98 | | const Schema& schema, |
99 | | const google::protobuf::RepeatedPtrField<PgsqlColRefPB> &column_refs, |
100 | 1.49M | Schema* projection) { |
101 | 1.49M | vector<ColumnId> column_ids; |
102 | 1.49M | column_ids.reserve(column_refs.size()); |
103 | 9.87M | for (const PgsqlColRefPB& column_ref : column_refs) { |
104 | 9.87M | const ColumnId column_id(column_ref.column_id()); |
105 | 9.87M | if (!schema.is_key_column(column_id)) { |
106 | 7.40M | column_ids.emplace_back(column_id); |
107 | 7.40M | } |
108 | 9.87M | } |
109 | 1.49M | return schema.CreateProjectionByIdsIgnoreMissing(column_ids, projection); |
110 | 1.49M | } |
111 | | |
112 | 1.43M | void AddIntent(const std::string& encoded_key, WaitPolicy wait_policy, KeyValueWriteBatchPB *out) { |
113 | 1.43M | auto pair = out->mutable_read_pairs()->Add(); |
114 | 1.43M | pair->set_key(encoded_key); |
115 | 1.43M | pair->set_value(std::string(1, ValueTypeAsChar::kNullLow)); |
116 | | // Since we don't batch read RPCs that lock rows, we can get away with using a singular |
117 | | // wait_policy field. Once we start batching read requests (issue #2495), we will need a repeated |
118 | | // wait policies field. |
119 | 1.43M | out->set_wait_policy(wait_policy); |
120 | 1.43M | } |
121 | | |
122 | | CHECKED_STATUS AddIntent(const PgsqlExpressionPB& ybctid, WaitPolicy wait_policy, |
123 | 100k | KeyValueWriteBatchPB* out) { |
124 | 100k | const auto &val = ybctid.value().binary_value(); |
125 | 100k | SCHECK(!val.empty(), InternalError, "empty ybctid"); |
126 | 100k | AddIntent(val, wait_policy, out); |
127 | 100k | return Status::OK(); |
128 | 100k | } |
129 | | |
130 | | template<class R, class Request, class DocKeyProcessor, class EncodedDocKeyProcessor> |
131 | | Result<R> FetchDocKeyImpl(const Schema& schema, |
132 | | const Request& req, |
133 | | const DocKeyProcessor& dk_processor, |
134 | 4.44M | const EncodedDocKeyProcessor& edk_processor) { |
135 | | // Init DocDB key using either ybctid or partition and range values. |
136 | 4.44M | if (req.has_ybctid_column_value()) { |
137 | 1.78M | const auto& ybctid = req.ybctid_column_value().value().binary_value(); |
138 | 1.78M | SCHECK(!ybctid.empty(), InternalError, "empty ybctid"); |
139 | 1.78M | return edk_processor(ybctid); |
140 | 2.65M | } else { |
141 | 2.65M | auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( |
142 | 2.65M | req.partition_column_values(), schema, 0 /* start_idx */)); |
143 | 2.65M | auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( |
144 | 2.65M | req.range_column_values(), schema, schema.num_hash_key_columns())); |
145 | 2.65M | return dk_processor(hashed_components.empty() |
146 | 1.18M | ? DocKey(schema, std::move(range_components)) |
147 | 1.46M | : DocKey( |
148 | 1.46M | schema, req.hash_code(), std::move(hashed_components), std::move(range_components))); |
149 | 2.65M | } |
150 | 4.44M | } pgsql_operation.cc:_ZN2yb5docdb12_GLOBAL__N_115FetchDocKeyImplINS0_6DocKeyENS_19PgsqlWriteRequestPBEZNS1_11FetchDocKeyERKNS_6SchemaERKS4_E3$_4ZNS1_11FetchDocKeyES7_S9_E3$_5EENS_6ResultIT_EES7_RKT0_RKT1_RKT2_ Line | Count | Source | 134 | 3.11M | const EncodedDocKeyProcessor& edk_processor) { | 135 | | // Init DocDB key using either ybctid or partition and range values. | 136 | 3.11M | if (req.has_ybctid_column_value()) { | 137 | 1.77M | const auto& ybctid = req.ybctid_column_value().value().binary_value(); | 138 | 1.77M | SCHECK(!ybctid.empty(), InternalError, "empty ybctid"); | 139 | 1.77M | return edk_processor(ybctid); | 140 | 1.33M | } else { | 141 | 1.33M | auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( | 142 | 1.33M | req.partition_column_values(), schema, 0 /* start_idx */)); | 143 | 1.33M | auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( | 144 | 1.33M | req.range_column_values(), schema, schema.num_hash_key_columns())); | 145 | 1.33M | return dk_processor(hashed_components.empty() | 146 | 1.18M | ? DocKey(schema, std::move(range_components)) | 147 | 146k | : DocKey( | 148 | 146k | schema, req.hash_code(), std::move(hashed_components), std::move(range_components))); | 149 | 1.33M | } | 150 | 3.11M | } |
pgsql_operation.cc:_ZN2yb5docdb12_GLOBAL__N_115FetchDocKeyImplINSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEENS_18PgsqlReadRequestPBEZNS1_18FetchEncodedDocKeyERKNS_6SchemaERKSA_E3$_3ZNS1_18FetchEncodedDocKeyESD_SF_E3$_2EENS_6ResultIT_EESD_RKT0_RKT1_RKT2_ Line | Count | Source | 134 | 1.33M | const EncodedDocKeyProcessor& edk_processor) { | 135 | | // Init DocDB key using either ybctid or partition and range values. | 136 | 1.33M | if (req.has_ybctid_column_value()) { | 137 | 6.84k | const auto& ybctid = req.ybctid_column_value().value().binary_value(); | 138 | 6.84k | SCHECK(!ybctid.empty(), InternalError, "empty ybctid"); | 139 | 6.84k | return edk_processor(ybctid); | 140 | 1.32M | } else { | 141 | 1.32M | auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( | 142 | 1.32M | req.partition_column_values(), schema, 0 /* start_idx */)); | 143 | 1.32M | auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( | 144 | 1.32M | req.range_column_values(), schema, schema.num_hash_key_columns())); | 145 | 1.32M | return dk_processor(hashed_components.empty() | 146 | 705 | ? DocKey(schema, std::move(range_components)) | 147 | 1.32M | : DocKey( | 148 | 1.32M | schema, req.hash_code(), std::move(hashed_components), std::move(range_components))); | 149 | 1.32M | } | 150 | 1.33M | } |
|
151 | | |
152 | 1.33M | Result<string> FetchEncodedDocKey(const Schema& schema, const PgsqlReadRequestPB& request) { |
153 | 1.33M | return FetchDocKeyImpl<string>( |
154 | 1.33M | schema, request, |
155 | 1.32M | [](const auto& doc_key) { return doc_key.Encode().ToStringBuffer(); }, |
156 | 6.84k | [](const auto& encoded_doc_key) { return encoded_doc_key; }); |
157 | 1.33M | } |
158 | | |
159 | 3.11M | Result<DocKey> FetchDocKey(const Schema& schema, const PgsqlWriteRequestPB& request) { |
160 | 3.11M | return FetchDocKeyImpl<DocKey>( |
161 | 3.11M | schema, request, |
162 | 1.33M | [](const auto& doc_key) { return doc_key; }, |
163 | 1.77M | [&schema](const auto& encoded_doc_key) -> Result<DocKey> { |
164 | 1.77M | DocKey key(schema); |
165 | 1.77M | RETURN_NOT_OK(key.DecodeFrom(encoded_doc_key)); |
166 | 1.77M | return key; |
167 | 1.77M | }); |
168 | 3.11M | } |
169 | | |
170 | | Result<YQLRowwiseIteratorIf::UniPtr> CreateIterator( |
171 | | const YQLStorageIf& ql_storage, |
172 | | const PgsqlReadRequestPB& request, |
173 | | const Schema& projection, |
174 | | const Schema& schema, |
175 | | const TransactionOperationContext& txn_op_context, |
176 | | CoarseTimePoint deadline, |
177 | | const ReadHybridTime& read_time, |
178 | 1.64M | bool is_explicit_request_read_time) { |
179 | 1.13k | VLOG_IF(2, request.is_for_backfill()) << "Creating iterator for " << yb::ToString(request); |
180 | | |
181 | 1.64M | YQLRowwiseIteratorIf::UniPtr result; |
182 | | // TODO(neil) Remove the following IF block when it is completely obsolete. |
183 | | // The following IF block has not been used since 2.1 release. |
184 | | // We keep it here only for rolling upgrade purpose. |
185 | 1.64M | if (request.has_ybctid_column_value()) { |
186 | 4.73k | SCHECK(!request.has_paging_state(), |
187 | 4.73k | InternalError, |
188 | 4.73k | "Each ybctid value identifies one row in the table while paging state " |
189 | 4.73k | "is only used for multi-row queries."); |
190 | 4.73k | RETURN_NOT_OK(ql_storage.GetIterator( |
191 | 4.73k | request.stmt_id(), projection, schema, txn_op_context, |
192 | 4.73k | deadline, read_time, request.ybctid_column_value().value(), &result)); |
193 | 1.64M | } else { |
194 | 1.64M | SubDocKey start_sub_doc_key; |
195 | 1.64M | auto actual_read_time = read_time; |
196 | | // Decode the start SubDocKey from the paging state and set scan start key. |
197 | 1.64M | if (request.has_paging_state() && |
198 | 54.9k | request.paging_state().has_next_row_key() && |
199 | 23.6k | !request.paging_state().next_row_key().empty()) { |
200 | 23.6k | KeyBytes start_key_bytes(request.paging_state().next_row_key()); |
201 | 23.6k | RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice())); |
202 | | // TODO(dmitry) Remove backward compatibility block when obsolete. |
203 | 23.6k | if (!is_explicit_request_read_time) { |
204 | 0 | if (request.paging_state().has_read_time()) { |
205 | 0 | actual_read_time = ReadHybridTime::FromPB(request.paging_state().read_time()); |
206 | 0 | } else { |
207 | 0 | actual_read_time.read = start_sub_doc_key.hybrid_time(); |
208 | 0 | } |
209 | 0 | } |
210 | 1.61M | } else if (request.is_for_backfill()) { |
211 | 257 | RSTATUS_DCHECK(is_explicit_request_read_time, InvalidArgument, |
212 | 257 | "Backfill request should already be using explicit read times."); |
213 | 257 | PgsqlBackfillSpecPB spec; |
214 | 257 | spec.ParseFromString(a2b_hex(request.backfill_spec())); |
215 | 257 | if (!spec.next_row_key().empty()) { |
216 | 22 | KeyBytes start_key_bytes(spec.next_row_key()); |
217 | 22 | RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice())); |
218 | 22 | } |
219 | 257 | } |
220 | 1.64M | RETURN_NOT_OK(ql_storage.GetIterator( |
221 | 1.64M | request, projection, schema, txn_op_context, |
222 | 1.64M | deadline, read_time, start_sub_doc_key.doc_key(), &result)); |
223 | 1.64M | } |
224 | 1.64M | return std::move(result); |
225 | 1.64M | } |
226 | | |
227 | | class DocKeyColumnPathBuilder { |
228 | | public: |
229 | | explicit DocKeyColumnPathBuilder(const RefCntPrefix& doc_key) |
230 | 4.83M | : doc_key_(doc_key.as_slice()) { |
231 | 4.83M | } |
232 | | |
233 | 7.86M | RefCntPrefix Build(ColumnIdRep column_id) { |
234 | 7.86M | buffer_.Clear(); |
235 | 7.86M | buffer_.AppendValueType(ValueType::kColumnId); |
236 | 7.86M | buffer_.AppendColumnId(ColumnId(column_id)); |
237 | 7.86M | RefCntBuffer path(doc_key_.size() + buffer_.size()); |
238 | 7.86M | doc_key_.CopyTo(path.data()); |
239 | 7.86M | buffer_.AsSlice().CopyTo(path.data() + doc_key_.size()); |
240 | 7.86M | return path; |
241 | 7.86M | } |
242 | | |
243 | | private: |
244 | | Slice doc_key_; |
245 | | KeyBytes buffer_; |
246 | | }; |
247 | | |
248 | | } // namespace |
249 | | |
250 | | //-------------------------------------------------------------------------------------------------- |
251 | | |
252 | 3.11M | Status PgsqlWriteOperation::Init(PgsqlResponsePB* response) { |
253 | | // Initialize operation inputs. |
254 | 3.11M | response_ = response; |
255 | | |
256 | 3.11M | doc_key_ = VERIFY_RESULT(FetchDocKey(schema_, request_)); |
257 | 3.11M | encoded_doc_key_ = doc_key_->EncodeAsRefCntPrefix(); |
258 | | |
259 | 3.11M | return Status::OK(); |
260 | 3.11M | } |
261 | | |
262 | | // Check if a duplicate value is inserted into a unique index. |
263 | 5 | Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue(const DocOperationApplyData& data) { |
264 | 0 | VLOG(3) << "Looking for collisions in\n" |
265 | 0 | << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db()); |
266 | | // We need to check backwards only for backfilled entries. |
267 | 5 | bool ret = |
268 | 5 | VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kForward)) || |
269 | 4 | (request_.is_backfill() && |
270 | 4 | VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kBackward))); |
271 | 5 | if (!ret) { |
272 | 0 | VLOG(3) << "No collisions found"; |
273 | 4 | } |
274 | 5 | return ret; |
275 | 5 | } |
276 | | |
277 | | Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue( |
278 | 9 | const DocOperationApplyData& data, Direction direction) { |
279 | 0 | VLOG(2) << "Looking for collision while going " << yb::ToString(direction) |
280 | 0 | << ". Trying to insert " << *doc_key_; |
281 | 9 | auto requested_read_time = data.read_time; |
282 | 9 | if (direction == Direction::kForward) { |
283 | 5 | return HasDuplicateUniqueIndexValue(data, requested_read_time); |
284 | 5 | } |
285 | | |
286 | 4 | auto iter = CreateIntentAwareIterator( |
287 | 4 | data.doc_write_batch->doc_db(), |
288 | 4 | BloomFilterMode::USE_BLOOM_FILTER, |
289 | 4 | doc_key_->Encode().AsSlice(), |
290 | 4 | rocksdb::kDefaultQueryId, |
291 | 4 | txn_op_context_, |
292 | 4 | data.deadline, |
293 | 4 | ReadHybridTime::Max()); |
294 | | |
295 | 4 | HybridTime oldest_past_min_ht = VERIFY_RESULT(FindOldestOverwrittenTimestamp( |
296 | 4 | iter.get(), SubDocKey(*doc_key_), requested_read_time.read)); |
297 | 4 | const HybridTime oldest_past_min_ht_liveness = |
298 | 4 | VERIFY_RESULT(FindOldestOverwrittenTimestamp( |
299 | 4 | iter.get(), |
300 | 4 | SubDocKey(*doc_key_, PrimitiveValue::kLivenessColumn), |
301 | 4 | requested_read_time.read)); |
302 | 4 | oldest_past_min_ht.MakeAtMost(oldest_past_min_ht_liveness); |
303 | 4 | if (!oldest_past_min_ht.is_valid()) { |
304 | 4 | return false; |
305 | 4 | } |
306 | 0 | return HasDuplicateUniqueIndexValue( |
307 | 0 | data, ReadHybridTime::SingleTime(oldest_past_min_ht)); |
308 | 0 | } |
309 | | |
310 | | Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue( |
311 | 5 | const DocOperationApplyData& data, ReadHybridTime read_time) { |
312 | | // Set up the iterator to read the current primary key associated with the index key. |
313 | 5 | DocPgsqlScanSpec spec(schema_, request_.stmt_id(), *doc_key_); |
314 | 5 | DocRowwiseIterator iterator(schema_, |
315 | 5 | schema_, |
316 | 5 | txn_op_context_, |
317 | 5 | data.doc_write_batch->doc_db(), |
318 | 5 | data.deadline, |
319 | 5 | read_time); |
320 | 5 | RETURN_NOT_OK(iterator.Init(spec)); |
321 | | |
322 | | // It is a duplicate value if the index key exists already and the index value (corresponding to |
323 | | // the indexed table's primary key) is not the same. |
324 | 5 | if (!VERIFY_RESULT(iterator.HasNext())) { |
325 | 0 | VLOG(2) << "No collision found while checking at " << yb::ToString(read_time); |
326 | 4 | return false; |
327 | 4 | } |
328 | | |
329 | 1 | QLTableRow table_row; |
330 | 1 | RETURN_NOT_OK(iterator.NextRow(&table_row)); |
331 | 1 | for (const auto& column_value : request_.column_values()) { |
332 | | // Get the column. |
333 | 1 | if (!column_value.has_column_id()) { |
334 | 0 | return STATUS(InternalError, "column id missing", column_value.DebugString()); |
335 | 0 | } |
336 | 1 | const ColumnId column_id(column_value.column_id()); |
337 | | |
338 | | // Check column-write operator. |
339 | 0 | CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert) |
340 | 0 | << "Illegal write instruction"; |
341 | | |
342 | | // Evaluate column value. |
343 | 1 | QLExprResult expr_result; |
344 | 1 | RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer())); |
345 | | |
346 | 1 | boost::optional<const QLValuePB&> existing_value = table_row.GetValue(column_id); |
347 | 1 | const QLValuePB& new_value = expr_result.Value(); |
348 | 1 | if (existing_value && *existing_value != new_value) { |
349 | 0 | VLOG(2) << "Found collision while checking at " << yb::ToString(read_time) |
350 | 0 | << "\nExisting: " << yb::ToString(*existing_value) |
351 | 0 | << " vs New: " << yb::ToString(new_value) |
352 | 0 | << "\nUsed read time as " << yb::ToString(data.read_time); |
353 | 0 | DVLOG(3) << "DocDB is now:\n" |
354 | 0 | << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db()); |
355 | 1 | return true; |
356 | 1 | } |
357 | 1 | } |
358 | | |
359 | 0 | VLOG(2) << "No collision while checking at " << yb::ToString(read_time); |
360 | 0 | return false; |
361 | 1 | } |
362 | | |
363 | | Result<HybridTime> PgsqlWriteOperation::FindOldestOverwrittenTimestamp( |
364 | | IntentAwareIterator* iter, |
365 | | const SubDocKey& sub_doc_key, |
366 | 8 | HybridTime min_read_time) { |
367 | 8 | HybridTime result; |
368 | 0 | VLOG(3) << "Doing iter->Seek " << *doc_key_; |
369 | 8 | iter->Seek(*doc_key_); |
370 | 8 | if (iter->valid()) { |
371 | 2 | const KeyBytes bytes = sub_doc_key.EncodeWithoutHt(); |
372 | 2 | const Slice& sub_key_slice = bytes.AsSlice(); |
373 | 2 | result = VERIFY_RESULT( |
374 | 2 | iter->FindOldestRecord(sub_key_slice, min_read_time)); |
375 | 0 | VLOG(2) << "iter->FindOldestRecord returned " << result << " for " |
376 | 0 | << SubDocKey::DebugSliceToString(sub_key_slice); |
377 | 6 | } else { |
378 | 0 | VLOG(3) << "iter->Seek " << *doc_key_ << " turned out to be invalid"; |
379 | 6 | } |
380 | 8 | return result; |
381 | 8 | } |
382 | | |
383 | 3.09M | Status PgsqlWriteOperation::Apply(const DocOperationApplyData& data) { |
384 | 218 | VLOG(4) << "Write, read time: " << data.read_time << ", txn: " << txn_op_context_; |
385 | | |
386 | 3.09M | auto scope_exit = ScopeExit([this] { |
387 | 3.09M | if (!result_buffer_.empty()) { |
388 | 3.08M | NetworkByteOrder::Store64(result_buffer_.data(), result_rows_); |
389 | 3.08M | } |
390 | 3.09M | }); |
391 | | |
392 | 3.09M | switch (request_.stmt_type()) { |
393 | 1.61M | case PgsqlWriteRequestPB::PGSQL_INSERT: |
394 | 1.61M | return ApplyInsert(data, IsUpsert::kFalse); |
395 | | |
396 | 154k | case PgsqlWriteRequestPB::PGSQL_UPDATE: |
397 | 154k | return ApplyUpdate(data); |
398 | | |
399 | 116k | case PgsqlWriteRequestPB::PGSQL_DELETE: |
400 | 116k | return ApplyDelete(data, request_.is_delete_persist_needed()); |
401 | | |
402 | 1.20M | case PgsqlWriteRequestPB::PGSQL_UPSERT: { |
403 | | // Upserts should not have column refs (i.e. require read). |
404 | 1.20M | RSTATUS_DCHECK(request_.col_refs().empty(), |
405 | 1.20M | IllegalState, |
406 | 1.20M | "Upsert operation should not have column references"); |
407 | 1.20M | return ApplyInsert(data, IsUpsert::kTrue); |
408 | 0 | } |
409 | | |
410 | 19 | case PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED: |
411 | 19 | return ApplyTruncateColocated(data); |
412 | 0 | } |
413 | 0 | return Status::OK(); |
414 | 0 | } |
415 | | |
416 | 2.82M | Status PgsqlWriteOperation::ApplyInsert(const DocOperationApplyData& data, IsUpsert is_upsert) { |
417 | 2.82M | QLTableRow table_row; |
418 | 2.82M | if (!is_upsert) { |
419 | 1.61M | if (request_.is_backfill()) { |
420 | 5 | if (VERIFY_RESULT(HasDuplicateUniqueIndexValue(data))) { |
421 | | // Unique index value conflict found. |
422 | 1 | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR); |
423 | 1 | response_->set_error_message("Duplicate key found in unique index"); |
424 | 1 | return Status::OK(); |
425 | 1 | } |
426 | 1.61M | } else { |
427 | | // Non-backfill requests shouldn't use HasDuplicateUniqueIndexValue because |
428 | | // - they should error even if the conflicting row matches |
429 | | // - retrieving and calculating whether the conflicting row matches is a waste |
430 | 1.61M | RETURN_NOT_OK(ReadColumns(data, &table_row)); |
431 | 1.61M | if (!table_row.IsEmpty()) { |
432 | 18.4E | VLOG(4) << "Duplicate row: " << table_row.ToString(); |
433 | | // Primary key or unique index value found. |
434 | 1.02k | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR); |
435 | 1.02k | response_->set_error_message("Duplicate key found in primary key or unique index"); |
436 | 1.02k | return Status::OK(); |
437 | 1.02k | } |
438 | 2.81M | } |
439 | 1.61M | } |
440 | | |
441 | 2.81M | RETURN_NOT_OK(data.doc_write_batch->SetPrimitive( |
442 | 2.81M | DocPath(encoded_doc_key_.as_slice(), PrimitiveValue::kLivenessColumn), |
443 | 2.81M | Value(PrimitiveValue()), |
444 | 2.81M | data.read_time, data.deadline, request_.stmt_id())); |
445 | | |
446 | 10.0M | for (const auto& column_value : request_.column_values()) { |
447 | | // Get the column. |
448 | 10.0M | if (!column_value.has_column_id()) { |
449 | 0 | return STATUS(InternalError, "column id missing", column_value.DebugString()); |
450 | 0 | } |
451 | 10.0M | const ColumnId column_id(column_value.column_id()); |
452 | 10.0M | const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id)); |
453 | | |
454 | | // Check column-write operator. |
455 | 432 | CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert) |
456 | 432 | << "Illegal write instruction"; |
457 | | |
458 | | // Evaluate column value. |
459 | 10.0M | QLExprResult expr_result; |
460 | 10.0M | RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer())); |
461 | 10.0M | const SubDocument sub_doc = |
462 | 10.0M | SubDocument::FromQLValuePB(expr_result.Value(), column.sorting_type()); |
463 | | |
464 | | // Inserting into specified column. |
465 | 10.0M | DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id)); |
466 | 10.0M | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
467 | 10.0M | sub_path, sub_doc, data.read_time, data.deadline, request_.stmt_id())); |
468 | 10.0M | } |
469 | | |
470 | 2.81M | RETURN_NOT_OK(PopulateResultSet(table_row)); |
471 | | |
472 | 2.81M | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK); |
473 | 2.81M | return Status::OK(); |
474 | 2.81M | } |
475 | | |
476 | 154k | Status PgsqlWriteOperation::ApplyUpdate(const DocOperationApplyData& data) { |
477 | 154k | QLTableRow table_row; |
478 | 154k | RETURN_NOT_OK(ReadColumns(data, &table_row)); |
479 | 154k | if (table_row.IsEmpty()) { |
480 | | // Row not found. |
481 | 2 | response_->set_skipped(true); |
482 | 2 | return Status::OK(); |
483 | 2 | } |
484 | 154k | QLTableRow returning_table_row; |
485 | 154k | if (request_.targets_size()) { |
486 | 0 | returning_table_row = table_row; |
487 | 0 | } |
488 | | |
489 | | // skipped is set to false if this operation produces some data to write. |
490 | 154k | bool skipped = true; |
491 | | |
492 | 154k | if (request_.has_ybctid_column_value()) { |
493 | 154k | DocPgExprExecutor expr_exec(&schema_); |
494 | 154k | std::vector<QLExprResult> results; |
495 | 154k | int num_exprs = 0; |
496 | 154k | int cur_expr = 0; |
497 | 246k | for (const auto& column_value : request_.column_new_values()) { |
498 | 246k | if (GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kPgEvalExprCall) { |
499 | 5.74k | RETURN_NOT_OK(expr_exec.AddTargetExpression(column_value.expr())); |
500 | 0 | VLOG(1) << "Added target expression to the executor"; |
501 | 5.74k | num_exprs++; |
502 | 5.74k | } |
503 | 246k | } |
504 | 154k | if (num_exprs > 0) { |
505 | 3.92k | bool match; |
506 | 5.74k | for (const PgsqlColRefPB& column_ref : request_.col_refs()) { |
507 | 5.74k | RETURN_NOT_OK(expr_exec.AddColumnRef(column_ref)); |
508 | 0 | VLOG(1) << "Added column reference to the executor"; |
509 | 5.74k | } |
510 | 3.92k | results.resize(num_exprs); |
511 | 3.92k | RETURN_NOT_OK(expr_exec.Exec(table_row, &results, &match)); |
512 | 3.92k | } |
513 | 246k | for (const auto& column_value : request_.column_new_values()) { |
514 | | // Get the column. |
515 | 246k | if (!column_value.has_column_id()) { |
516 | 0 | return STATUS(InternalError, "column id missing", column_value.DebugString()); |
517 | 0 | } |
518 | 246k | const ColumnId column_id(column_value.column_id()); |
519 | 246k | const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id)); |
520 | | // Evaluate column value. |
521 | 246k | QLExprResult expr_result; |
522 | | |
523 | 246k | if (GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kPgEvalExprCall) { |
524 | 5.74k | expr_result = std::move(results[cur_expr++]); |
525 | 240k | } else { |
526 | | // Check column-write operator. |
527 | 240k | SCHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert, |
528 | 240k | InternalError, |
529 | 240k | "Unsupported DocDB Expression"); |
530 | | |
531 | 240k | RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer(), &schema_)); |
532 | 240k | } |
533 | | |
534 | | // Update RETURNING values |
535 | 246k | if (request_.targets_size()) { |
536 | 0 | returning_table_row.AllocColumn(column_id, expr_result.Value()); |
537 | 0 | } |
538 | | |
539 | | // Inserting into specified column. |
540 | 246k | const SubDocument sub_doc = |
541 | 246k | SubDocument::FromQLValuePB(expr_result.Value(), column.sorting_type()); |
542 | | |
543 | 246k | DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id)); |
544 | 246k | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
545 | 246k | sub_path, sub_doc, data.read_time, data.deadline, request_.stmt_id())); |
546 | 246k | skipped = false; |
547 | 246k | } |
548 | 33 | } else { |
549 | | // This UPDATE is calling PGGATE directly without going thru PostgreSQL layer. |
550 | | // Keep it here as we might need it. |
551 | | |
552 | | // Very limited support for where expressions. Only used for updates to the sequences data |
553 | | // table. |
554 | 33 | bool is_match = true; |
555 | 33 | if (request_.has_where_expr()) { |
556 | 33 | QLExprResult match; |
557 | 33 | RETURN_NOT_OK(EvalExpr(request_.where_expr(), table_row, match.Writer())); |
558 | 33 | is_match = match.Value().bool_value(); |
559 | 33 | } |
560 | | |
561 | 33 | if (is_match) { |
562 | 66 | for (const auto &column_value : request_.column_new_values()) { |
563 | | // Get the column. |
564 | 66 | if (!column_value.has_column_id()) { |
565 | 0 | return STATUS(InternalError, "column id missing", column_value.DebugString()); |
566 | 0 | } |
567 | 66 | const ColumnId column_id(column_value.column_id()); |
568 | 66 | const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id)); |
569 | | |
570 | | // Check column-write operator. |
571 | 0 | CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert) |
572 | 0 | << "Illegal write instruction"; |
573 | | |
574 | | // Evaluate column value. |
575 | 66 | QLExprResult expr_result; |
576 | 66 | RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer())); |
577 | | |
578 | 66 | const SubDocument sub_doc = |
579 | 66 | SubDocument::FromQLValuePB(expr_result.Value(), column.sorting_type()); |
580 | | |
581 | | // Inserting into specified column. |
582 | 66 | DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id)); |
583 | 66 | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
584 | 66 | sub_path, sub_doc, data.read_time, data.deadline, request_.stmt_id())); |
585 | 66 | skipped = false; |
586 | 66 | } |
587 | 33 | } |
588 | 33 | } |
589 | | |
590 | 154k | if (request_.targets_size()) { |
591 | | // Returning the values after the update. |
592 | 0 | RETURN_NOT_OK(PopulateResultSet(returning_table_row)); |
593 | 154k | } else { |
594 | | // Returning the values before the update. |
595 | 154k | RETURN_NOT_OK(PopulateResultSet(table_row)); |
596 | 154k | } |
597 | | |
598 | 154k | if (skipped) { |
599 | 0 | response_->set_skipped(true); |
600 | 0 | } |
601 | 154k | response_->set_rows_affected_count(1); |
602 | 154k | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK); |
603 | 154k | return Status::OK(); |
604 | 154k | } |
605 | | |
606 | | Status PgsqlWriteOperation::ApplyDelete( |
607 | | const DocOperationApplyData& data, |
608 | 116k | const bool is_persist_needed) { |
609 | 116k | int num_deleted = 1; |
610 | 116k | QLTableRow table_row; |
611 | 116k | RETURN_NOT_OK(ReadColumns(data, &table_row)); |
612 | 116k | if (table_row.IsEmpty()) { |
613 | | // Row not found. |
614 | | // Return early unless we still want to apply the delete for backfill purposes. Deletes to |
615 | | // nonexistent rows are expected to get written to the index when the index has the delete |
616 | | // permission during an online schema migration. num_deleted should be 0 because we don't want |
617 | | // to report back to the user that we deleted 1 row; response_ should not set skipped because it |
618 | | // will prevent tombstone intents from getting applied. |
619 | 15 | if (!is_persist_needed) { |
620 | 15 | response_->set_skipped(true); |
621 | 15 | return Status::OK(); |
622 | 15 | } |
623 | 0 | num_deleted = 0; |
624 | 0 | } |
625 | | |
626 | | // TODO(neil) Add support for WHERE clause. |
627 | 4 | CHECK(request_.column_values_size() == 0) << "WHERE clause condition is not yet fully supported"; |
628 | | |
629 | | // Otherwise, delete the referenced row (all columns). |
630 | 116k | RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(DocPath( |
631 | 116k | encoded_doc_key_.as_slice()), data.read_time, data.deadline)); |
632 | | |
633 | 116k | RETURN_NOT_OK(PopulateResultSet(table_row)); |
634 | | |
635 | 116k | response_->set_rows_affected_count(num_deleted); |
636 | 116k | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK); |
637 | 116k | return Status::OK(); |
638 | 116k | } |
639 | | |
640 | 19 | Status PgsqlWriteOperation::ApplyTruncateColocated(const DocOperationApplyData& data) { |
641 | 19 | RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(DocPath( |
642 | 19 | encoded_doc_key_.as_slice()), data.read_time, data.deadline)); |
643 | 19 | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK); |
644 | 19 | return Status::OK(); |
645 | 19 | } |
646 | | |
647 | | Status PgsqlWriteOperation::ReadColumns(const DocOperationApplyData& data, |
648 | 1.88M | QLTableRow* table_row) { |
649 | | // Filter the columns using primary key. |
650 | 1.88M | if (doc_key_) { |
651 | 1.88M | Schema projection; |
652 | 1.88M | RETURN_NOT_OK(CreateProjection(schema_, request_.column_refs(), &projection)); |
653 | 1.88M | DocPgsqlScanSpec spec(projection, request_.stmt_id(), *doc_key_); |
654 | 1.88M | DocRowwiseIterator iterator(projection, |
655 | 1.88M | schema_, |
656 | 1.88M | txn_op_context_, |
657 | 1.88M | data.doc_write_batch->doc_db(), |
658 | 1.88M | data.deadline, |
659 | 1.88M | data.read_time); |
660 | 1.88M | RETURN_NOT_OK(iterator.Init(spec)); |
661 | 1.88M | if (VERIFY_RESULT(iterator.HasNext())) { |
662 | 271k | RETURN_NOT_OK(iterator.NextRow(table_row)); |
663 | 1.61M | } else { |
664 | 1.61M | table_row->Clear(); |
665 | 1.61M | } |
666 | 1.88M | data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt()); |
667 | 1.88M | } |
668 | | |
669 | 1.88M | return Status::OK(); |
670 | 1.88M | } |
671 | | |
672 | 3.08M | Status PgsqlWriteOperation::PopulateResultSet(const QLTableRow& table_row) { |
673 | 3.08M | if (result_buffer_.empty()) { |
674 | | // Reserve space for num rows. |
675 | 3.08M | pggate::PgWire::WriteInt64(0, &result_buffer_); |
676 | 3.08M | } |
677 | 3.08M | ++result_rows_; |
678 | 3.08M | int rscol_index = 0; |
679 | 0 | for (const PgsqlExpressionPB& expr : request_.targets()) { |
680 | 0 | if (expr.has_column_id()) { |
681 | 0 | QLExprResult value; |
682 | 0 | if (expr.column_id() == static_cast<int>(PgSystemAttrNum::kYBTupleId)) { |
683 | | // Strip cotable id / pgtable id from the serialized DocKey before returning it as ybctid. |
684 | 0 | Slice tuple_id = encoded_doc_key_.as_slice(); |
685 | 0 | if (tuple_id.starts_with(ValueTypeAsChar::kTableId)) { |
686 | 0 | tuple_id.remove_prefix(1 + kUuidSize); |
687 | 0 | } else if (tuple_id.starts_with(ValueTypeAsChar::kPgTableOid)) { |
688 | 0 | tuple_id.remove_prefix(1 + sizeof(PgTableOid)); |
689 | 0 | } |
690 | 0 | value.Writer().NewValue().set_binary_value(tuple_id.data(), tuple_id.size()); |
691 | 0 | } else { |
692 | 0 | RETURN_NOT_OK(EvalExpr(expr, table_row, value.Writer())); |
693 | 0 | } |
694 | 0 | RETURN_NOT_OK(pggate::WriteColumn(value.Value(), &result_buffer_)); |
695 | 0 | } |
696 | 0 | rscol_index++; |
697 | 0 | } |
698 | 3.08M | return Status::OK(); |
699 | 3.08M | } |
700 | | |
701 | | Status PgsqlWriteOperation::GetDocPaths(GetDocPathsMode mode, |
702 | | DocPathsToLock *paths, |
703 | 5.27M | IsolationLevel *level) const { |
704 | | // When this write operation requires a read, it requires a read snapshot so paths will be locked |
705 | | // in snapshot isolation for consistency. Otherwise, pure writes will happen in serializable |
706 | | // isolation so that they will serialize but do not conflict with one another. |
707 | | // |
708 | | // Currently, only keys that are being written are locked, no lock is taken on read at the |
709 | | // snapshot isolation level. |
710 | 3.83M | *level = RequireReadSnapshot() ? IsolationLevel::SNAPSHOT_ISOLATION |
711 | 1.44M | : IsolationLevel::SERIALIZABLE_ISOLATION; |
712 | | |
713 | 5.27M | switch (mode) { |
714 | 3.14M | case GetDocPathsMode::kLock: { |
715 | | // Weak intent is required to lock the row and prevent it from being removed. |
716 | | // For this purpose path for row's SystemColumnIds::kLivenessColumn column is returned. |
717 | | // The caller code will create strong intent for returned path (raw's column doc key) |
718 | | // and weak intents for all its prefixes (including row's doc key). |
719 | 3.14M | if (!encoded_doc_key_) { |
720 | 0 | return Status::OK(); |
721 | 0 | } |
722 | 3.14M | if (request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) { |
723 | | // In case of UPDATE some columns may have expressions instead of exact value. |
724 | | // These expressions may read column value. |
725 | | // Potentially expression for updating column v1 may read value of column v2. |
726 | | // |
727 | | // UPDATE t SET v = v + 10 WHERE k = 1 |
728 | | // UPDATE t SET v1 = v2 + 10 WHERE k = 1 |
729 | | // |
730 | | // Strong intent for the whole row is required in this case as it may be too expensive to |
731 | | // determine what exact columns are read by the expression. |
732 | | |
733 | 257k | for (const auto& column_value : request_.column_new_values()) { |
734 | 257k | if (!column_value.expr().has_value()) { |
735 | 3.92k | paths->push_back(encoded_doc_key_); |
736 | 3.92k | return Status::OK(); |
737 | 3.92k | } |
738 | 257k | } |
739 | 166k | } |
740 | 3.13M | DocKeyColumnPathBuilder builder(encoded_doc_key_); |
741 | 3.13M | paths->push_back(builder.Build(to_underlying(SystemColumnIds::kLivenessColumn))); |
742 | 3.13M | break; |
743 | 3.14M | } |
744 | 2.12M | case GetDocPathsMode::kIntents: { |
745 | 2.12M | const google::protobuf::RepeatedPtrField<PgsqlColumnValuePB>* column_values = nullptr; |
746 | 2.12M | if (request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_INSERT || |
747 | 1.84M | request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPSERT) { |
748 | 1.84M | column_values = &request_.column_values(); |
749 | 282k | } else if (request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) { |
750 | 166k | column_values = &request_.column_new_values(); |
751 | 166k | } |
752 | | |
753 | 2.12M | if (column_values != nullptr && !column_values->empty()) { |
754 | 1.69M | DocKeyColumnPathBuilder builder(encoded_doc_key_); |
755 | 4.72M | for (const auto& column_value : *column_values) { |
756 | 4.72M | paths->push_back(builder.Build(column_value.column_id())); |
757 | 4.72M | } |
758 | 436k | } else if (encoded_doc_key_) { |
759 | 436k | paths->push_back(encoded_doc_key_); |
760 | 436k | } |
761 | 2.12M | break; |
762 | 5.26M | } |
763 | 5.26M | } |
764 | 5.26M | return Status::OK(); |
765 | 5.26M | } |
766 | | |
767 | | //-------------------------------------------------------------------------------------------------- |
768 | | |
769 | | Result<size_t> PgsqlReadOperation::Execute(const YQLStorageIf& ql_storage, |
770 | | CoarseTimePoint deadline, |
771 | | const ReadHybridTime& read_time, |
772 | | bool is_explicit_request_read_time, |
773 | | const Schema& schema, |
774 | | const Schema *index_schema, |
775 | | faststring *result_buffer, |
776 | 1.50M | HybridTime *restart_read_ht) { |
777 | 1.50M | size_t fetched_rows = 0; |
778 | | // Reserve space for fetched rows count. |
779 | 1.50M | pggate::PgWire::WriteInt64(0, result_buffer); |
780 | 1.49M | auto se = ScopeExit([&fetched_rows, result_buffer] { |
781 | 1.49M | NetworkByteOrder::Store64(result_buffer->data(), fetched_rows); |
782 | 1.49M | }); |
783 | 405 | VLOG(4) << "Read, read time: " << read_time << ", txn: " << txn_op_context_; |
784 | | |
785 | | // Fetching data. |
786 | 1.50M | bool has_paging_state = false; |
787 | 1.50M | if (request_.batch_arguments_size() > 0) { |
788 | 1.84k | SCHECK(request_.has_ybctid_column_value(), |
789 | 1.84k | InternalError, |
790 | 1.84k | "ybctid arguments can be batched only"); |
791 | 1.84k | fetched_rows = VERIFY_RESULT(ExecuteBatchYbctid( |
792 | 1.84k | ql_storage, deadline, read_time, schema, |
793 | 1.84k | result_buffer, restart_read_ht)); |
794 | 1.49M | } else if (request_.has_sampling_state()) { |
795 | 109 | fetched_rows = VERIFY_RESULT(ExecuteSample( |
796 | 109 | ql_storage, deadline, read_time, is_explicit_request_read_time, schema, |
797 | 109 | result_buffer, restart_read_ht, &has_paging_state)); |
798 | 1.49M | } else { |
799 | 1.49M | fetched_rows = VERIFY_RESULT(ExecuteScalar( |
800 | 1.49M | ql_storage, deadline, read_time, is_explicit_request_read_time, schema, index_schema, |
801 | 1.49M | result_buffer, restart_read_ht, &has_paging_state)); |
802 | 1.49M | } |
803 | | |
804 | 1.50M | VTRACE(1, "Fetched $0 rows. $1 paging state", fetched_rows, (has_paging_state ? "No" : "Has")); |
805 | 1.50M | *restart_read_ht = table_iter_->RestartReadHt(); |
806 | 1.50M | return fetched_rows; |
807 | 1.50M | } |
808 | | |
809 | | Result<size_t> PgsqlReadOperation::ExecuteSample(const YQLStorageIf& ql_storage, |
810 | | CoarseTimePoint deadline, |
811 | | const ReadHybridTime& read_time, |
812 | | bool is_explicit_request_read_time, |
813 | | const Schema& schema, |
814 | | faststring *result_buffer, |
815 | | HybridTime *restart_read_ht, |
816 | 109 | bool *has_paging_state) { |
817 | 109 | *has_paging_state = false; |
818 | 109 | size_t scanned_rows = 0; |
819 | 109 | PgsqlSamplingStatePB sampling_state = request_.sampling_state(); |
820 | | // Requested total number of rows to collect |
821 | 109 | int targrows = sampling_state.targrows(); |
822 | | // Number of rows collected so far |
823 | 109 | int numrows = sampling_state.numrows(); |
824 | | // Total number of rows scanned |
825 | 109 | double samplerows = sampling_state.samplerows(); |
826 | | // Current number of rows to skip before collecting next one for sample |
827 | 109 | double rowstoskip = sampling_state.rowstoskip(); |
828 | | // Variables for the random numbers generator |
829 | 109 | YbgPrepareMemoryContext(); |
830 | 109 | YbgReservoirState rstate = NULL; |
831 | 109 | YbgSamplerCreate(sampling_state.rstate_w(), sampling_state.rand_state(), &rstate); |
832 | | // Buffer to hold selected row ids from the current page |
833 | 109 | std::unique_ptr<QLValuePB[]> reservoir = std::make_unique<QLValuePB[]>(targrows); |
834 | | // Number of rows to scan for the current page. |
835 | | // Too low row count limit is inefficient since we have to allocate and initialize a reservoir |
836 | | // capable to hold potentially large (targrows) number of tuples. The row count has to be at least |
837 | | // targrows for a chance to fill up the reservoir. Actually, the algorithm selects targrows only |
838 | | // for very first page of the table, then it starts to skip tuples, the further it goes, the more |
839 | | // it skips. For a large enough table it eventually starts to select less than targrows per page, |
840 | | // regardless of the row_count_limit. |
841 | | // Anyways, double targrows seems like reasonable minimum for the row_count_limit. |
842 | 109 | size_t row_count_limit = 2 * targrows; |
843 | 109 | if (request_.has_limit() && request_.limit() > row_count_limit) { |
844 | 0 | row_count_limit = request_.limit(); |
845 | 0 | } |
846 | | // Request is not supposed to contain any column refs, we just need the liveness column. |
847 | 109 | Schema projection; |
848 | 109 | RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection)); |
849 | | // Request may carry paging state, CreateIterator takes care of positioning |
850 | 109 | table_iter_ = VERIFY_RESULT(CreateIterator( |
851 | 109 | ql_storage, request_, projection, schema, txn_op_context_, |
852 | 109 | deadline, read_time, is_explicit_request_read_time)); |
853 | 109 | bool scan_time_exceeded = false; |
854 | 109 | CoarseTimePoint stop_scan = deadline - FLAGS_ysql_scan_deadline_margin_ms * 1ms; |
855 | 86.6k | while (scanned_rows++ < row_count_limit && |
856 | 86.6k | VERIFY_RESULT(table_iter_->HasNext()) && |
857 | 86.5k | !scan_time_exceeded) { |
858 | 86.5k | if (numrows < targrows) { |
859 | | // Select first targrows of the table. If first partition(s) have less than that, next |
860 | | // partition starts to continue populating it's reservoir starting from the numrows' position: |
861 | | // the numrows, as well as other sampling state variables is returned and copied over to the |
862 | | // next sampling request |
863 | 86.5k | Slice ybctid = VERIFY_RESULT(table_iter_->GetTupleId()); |
864 | 86.5k | reservoir[numrows++].set_binary_value(ybctid.data(), ybctid.size()); |
865 | 0 | } else { |
866 | | // At least targrows tuples have already been collected, now algorithm skips increasing number |
867 | | // of row before taking next one into the reservoir |
868 | 0 | if (rowstoskip <= 0) { |
869 | | // Take ybctid of the current row |
870 | 0 | Slice ybctid = VERIFY_RESULT(table_iter_->GetTupleId()); |
871 | | // Pick random tuple in the reservoir to replace |
872 | 0 | double rvalue; |
873 | 0 | int k; |
874 | 0 | YbgSamplerRandomFract(rstate, &rvalue); |
875 | 0 | k = static_cast<int>(targrows * rvalue); |
876 | | // Overwrite previous value with new one |
877 | 0 | reservoir[k].set_binary_value(ybctid.data(), ybctid.size()); |
878 | | // Choose next number of rows to skip |
879 | 0 | YbgReservoirGetNextS(rstate, samplerows, targrows, &rowstoskip); |
880 | 0 | } else { |
881 | 0 | rowstoskip -= 1; |
882 | 0 | } |
883 | 0 | } |
884 | | // Taking tuple ID does not advance the table iterator. Move it now. |
885 | 86.5k | table_iter_->SkipRow(); |
886 | | // Check if we are running out of time |
887 | 86.5k | scan_time_exceeded = CoarseMonoClock::now() >= stop_scan; |
888 | 86.5k | } |
889 | | // Count live rows we have scanned TODO how to count dead rows? |
890 | 109 | samplerows += (scanned_rows - 1); |
891 | | // Return collected tuples from the reservoir. |
892 | | // Tuples are returned as (index, ybctid) pairs, where index is in [0..targrows-1] range. |
893 | | // As mentioned above, for large tables reservoirs become increasingly sparse from page to page. |
894 | | // So we hope to save by sending variable number of index/ybctid pairs vs exactly targrows of |
895 | | // nullable ybctids. It also helps in case of extremely small table or partition. |
896 | 109 | int fetched_rows = 0; |
897 | 153k | for (int i = 0; i < numrows; i++) { |
898 | 153k | QLValuePB index; |
899 | 153k | if (reservoir[i].has_binary_value()) { |
900 | 86.5k | index.set_int32_value(i); |
901 | 86.5k | RETURN_NOT_OK(pggate::WriteColumn(index, result_buffer)); |
902 | 86.5k | RETURN_NOT_OK(pggate::WriteColumn(reservoir[i], result_buffer)); |
903 | 86.5k | fetched_rows++; |
904 | 86.5k | } |
905 | 153k | } |
906 | | |
907 | | // Return sampling state to continue with next page |
908 | 109 | PgsqlSamplingStatePB *new_sampling_state = response_.mutable_sampling_state(); |
909 | 109 | new_sampling_state->set_numrows(numrows); |
910 | 109 | new_sampling_state->set_targrows(targrows); |
911 | 109 | new_sampling_state->set_samplerows(samplerows); |
912 | 109 | new_sampling_state->set_rowstoskip(rowstoskip); |
913 | 109 | uint64_t randstate = 0; |
914 | 109 | double rstate_w = 0; |
915 | 109 | YbgSamplerGetState(rstate, &rstate_w, &randstate); |
916 | 109 | new_sampling_state->set_rstate_w(rstate_w); |
917 | 109 | new_sampling_state->set_rand_state(randstate); |
918 | 109 | YbgDeleteMemoryContext(); |
919 | | |
920 | | // Return paging state if scan has not been completed |
921 | 109 | RETURN_NOT_OK(SetPagingStateIfNecessary(table_iter_.get(), scanned_rows, row_count_limit, |
922 | 109 | scan_time_exceeded, &schema, read_time, |
923 | 109 | has_paging_state)); |
924 | 109 | return fetched_rows; |
925 | 109 | } |
926 | | |
927 | | Result<size_t> PgsqlReadOperation::ExecuteScalar(const YQLStorageIf& ql_storage, |
928 | | CoarseTimePoint deadline, |
929 | | const ReadHybridTime& read_time, |
930 | | bool is_explicit_request_read_time, |
931 | | const Schema& schema, |
932 | | const Schema *index_schema, |
933 | | faststring *result_buffer, |
934 | | HybridTime *restart_read_ht, |
935 | 1.49M | bool *has_paging_state) { |
936 | 1.49M | *has_paging_state = false; |
937 | | |
938 | 1.49M | size_t fetched_rows = 0; |
939 | 1.49M | size_t row_count_limit = std::numeric_limits<std::size_t>::max(); |
940 | 1.49M | if (request_.has_limit()) { |
941 | 1.49M | if (request_.limit() == 0) { |
942 | 0 | return fetched_rows; |
943 | 0 | } |
944 | 1.49M | row_count_limit = request_.limit(); |
945 | 1.49M | } |
946 | | |
947 | | // Create the projection of regular columns selected by the row block plus any referenced in |
948 | | // the WHERE condition. When DocRowwiseIterator::NextRow() populates the value map, it uses this |
949 | | // projection only to scan sub-documents. The query schema is used to select only referenced |
950 | | // columns and key columns. |
951 | 1.49M | Schema projection; |
952 | 1.49M | Schema index_projection; |
953 | 1.49M | YQLRowwiseIteratorIf *iter; |
954 | 1.49M | const Schema* scan_schema; |
955 | 1.49M | DocPgExprExecutor expr_exec(&schema); |
956 | | |
957 | 1.49M | if (!request_.col_refs().empty()) { |
958 | 1.49M | RETURN_NOT_OK(CreateProjection(schema, request_.col_refs(), &projection)); |
959 | 2.85k | } else { |
960 | | // Compatibility: Either request indeed has no column refs, or it comes from a legacy node. |
961 | 2.85k | RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection)); |
962 | 2.85k | } |
963 | 1.49M | table_iter_ = VERIFY_RESULT(CreateIterator( |
964 | 1.49M | ql_storage, request_, projection, schema, txn_op_context_, |
965 | 1.49M | deadline, read_time, is_explicit_request_read_time)); |
966 | | |
967 | 1.49M | ColumnId ybbasectid_id; |
968 | 1.49M | if (request_.has_index_request()) { |
969 | 151k | const PgsqlReadRequestPB& index_request = request_.index_request(); |
970 | 151k | RETURN_NOT_OK(CreateProjection(*index_schema, index_request.column_refs(), &index_projection)); |
971 | 151k | index_iter_ = VERIFY_RESULT(CreateIterator( |
972 | 151k | ql_storage, index_request, index_projection, *index_schema, txn_op_context_, |
973 | 151k | deadline, read_time, is_explicit_request_read_time)); |
974 | 151k | iter = index_iter_.get(); |
975 | 151k | const auto idx = index_schema->find_column("ybidxbasectid"); |
976 | 151k | SCHECK_NE(idx, Schema::kColumnNotFound, Corruption, "ybidxbasectid not found in index schema"); |
977 | 151k | ybbasectid_id = index_schema->column_id(idx); |
978 | 151k | scan_schema = index_schema; |
979 | 1.34M | } else { |
980 | 1.34M | iter = table_iter_.get(); |
981 | 1.34M | scan_schema = &schema; |
982 | 8.02M | for (const PgsqlColRefPB& column_ref : request_.col_refs()) { |
983 | 8.02M | RETURN_NOT_OK(expr_exec.AddColumnRef(column_ref)); |
984 | 885 | VLOG(1) << "Added column reference to the executor"; |
985 | 8.02M | } |
986 | 1.34M | for (const PgsqlExpressionPB& expr : request_.where_clauses()) { |
987 | 368 | RETURN_NOT_OK(expr_exec.AddWhereExpression(expr)); |
988 | 0 | VLOG(1) << "Added where expression to the executor"; |
989 | 368 | } |
990 | 1.34M | } |
991 | | |
992 | 1.45k | VLOG(1) << "Started iterator"; |
993 | | |
994 | | // Set scan start time. |
995 | 1.49M | bool scan_time_exceeded = false; |
996 | 1.49M | CoarseTimePoint stop_scan = deadline - FLAGS_ysql_scan_deadline_margin_ms * 1ms; |
997 | | |
998 | | // Fetching data. |
999 | 1.49M | int match_count = 0; |
1000 | 1.49M | QLTableRow row; |
1001 | 23.6M | while (fetched_rows < row_count_limit && VERIFY_RESULT(iter->HasNext()) && |
1002 | 22.1M | !scan_time_exceeded) { |
1003 | 22.1M | row.Clear(); |
1004 | | |
1005 | | // If there is an index request, fetch ybbasectid from the index and use it as ybctid |
1006 | | // to fetch from the base table. Otherwise, fetch from the base table directly. |
1007 | 22.1M | if (request_.has_index_request()) { |
1008 | 291k | RETURN_NOT_OK(iter->NextRow(&row)); |
1009 | 291k | const auto& tuple_id = row.GetValue(ybbasectid_id); |
1010 | 291k | SCHECK_NE(tuple_id, boost::none, Corruption, "ybbasectid not found in index row"); |
1011 | 291k | if (!VERIFY_RESULT(table_iter_->SeekTuple(tuple_id->binary_value()))) { |
1012 | 0 | DocKey doc_key; |
1013 | 0 | RETURN_NOT_OK(doc_key.DecodeFrom(tuple_id->binary_value())); |
1014 | 0 | return STATUS_FORMAT(Corruption, "ybctid $0 not found in indexed table", doc_key); |
1015 | 291k | } |
1016 | 291k | row.Clear(); |
1017 | 291k | RETURN_NOT_OK(table_iter_->NextRow(projection, &row)); |
1018 | 21.8M | } else { |
1019 | 21.8M | RETURN_NOT_OK(iter->NextRow(projection, &row)); |
1020 | 21.8M | } |
1021 | | |
1022 | | // Match the row with the where condition before adding to the row block. |
1023 | 22.1M | bool is_match = true; |
1024 | 22.1M | RETURN_NOT_OK(expr_exec.Exec(row, nullptr, &is_match)); |
1025 | 22.1M | if (is_match) { |
1026 | 22.0M | match_count++; |
1027 | 22.0M | if (request_.is_aggregate()) { |
1028 | 1.71M | RETURN_NOT_OK(EvalAggregate(row)); |
1029 | 20.3M | } else { |
1030 | 20.3M | RETURN_NOT_OK(PopulateResultSet(row, result_buffer)); |
1031 | 20.3M | ++fetched_rows; |
1032 | 20.3M | } |
1033 | 22.0M | } |
1034 | | |
1035 | | // Check if we are running out of time |
1036 | 22.1M | scan_time_exceeded = CoarseMonoClock::now() >= stop_scan; |
1037 | 22.1M | } |
1038 | | |
1039 | 2.34k | VLOG(1) << "Stopped iterator after " << match_count << " matches, " |
1040 | 2.34k | << fetched_rows << " rows fetched"; |
1041 | 1.71k | VLOG(1) << "Deadline is " << (scan_time_exceeded ? "" : "not ") << "exceeded"; |
1042 | | |
1043 | 1.49M | if (request_.is_aggregate() && match_count > 0) { |
1044 | 345 | RETURN_NOT_OK(PopulateAggregate(row, result_buffer)); |
1045 | 345 | ++fetched_rows; |
1046 | 345 | } |
1047 | | |
1048 | 1.49M | if (PREDICT_FALSE(FLAGS_TEST_slowdown_pgsql_aggregate_read_ms > 0) && request_.is_aggregate()) { |
1049 | 418 | TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_pgsql_aggregate_read_ms); |
1050 | 418 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_pgsql_aggregate_read_ms)); |
1051 | 418 | } |
1052 | | |
1053 | 1.49M | RETURN_NOT_OK(SetPagingStateIfNecessary( |
1054 | 1.49M | iter, fetched_rows, row_count_limit, scan_time_exceeded, scan_schema, |
1055 | 1.49M | read_time, has_paging_state)); |
1056 | 1.49M | return fetched_rows; |
1057 | 1.49M | } |
1058 | | |
1059 | | Result<size_t> PgsqlReadOperation::ExecuteBatchYbctid(const YQLStorageIf& ql_storage, |
1060 | | CoarseTimePoint deadline, |
1061 | | const ReadHybridTime& read_time, |
1062 | | const Schema& schema, |
1063 | | faststring *result_buffer, |
1064 | 1.84k | HybridTime *restart_read_ht) { |
1065 | 1.84k | Schema projection; |
1066 | 1.84k | RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection)); |
1067 | | |
1068 | 1.84k | QLTableRow row; |
1069 | 1.84k | size_t row_count = 0; |
1070 | 796k | for (const PgsqlBatchArgumentPB& batch_argument : request_.batch_arguments()) { |
1071 | | // Get the row. |
1072 | 796k | RETURN_NOT_OK(ql_storage.GetIterator(request_.stmt_id(), projection, schema, txn_op_context_, |
1073 | 796k | deadline, read_time, batch_argument.ybctid().value(), |
1074 | 796k | &table_iter_)); |
1075 | | |
1076 | 796k | if (VERIFY_RESULT(table_iter_->HasNext())) { |
1077 | 796k | row.Clear(); |
1078 | 796k | RETURN_NOT_OK(table_iter_->NextRow(projection, &row)); |
1079 | | |
1080 | | // Populate result set. |
1081 | 796k | RETURN_NOT_OK(PopulateResultSet(row, result_buffer)); |
1082 | 796k | response_.add_batch_orders(batch_argument.order()); |
1083 | 796k | row_count++; |
1084 | 796k | } |
1085 | 796k | } |
1086 | | |
1087 | | // Set status for this batch. |
1088 | | // Mark all rows were processed even in case some of the ybctids were not found. |
1089 | 1.84k | response_.set_batch_arg_count(request_.batch_arguments_size()); |
1090 | | |
1091 | 1.84k | return row_count; |
1092 | 1.84k | } |
1093 | | |
1094 | | Status PgsqlReadOperation::SetPagingStateIfNecessary(const YQLRowwiseIteratorIf* iter, |
1095 | | size_t fetched_rows, |
1096 | | const size_t row_count_limit, |
1097 | | const bool scan_time_exceeded, |
1098 | | const Schema* schema, |
1099 | | const ReadHybridTime& read_time, |
1100 | 1.49M | bool *has_paging_state) { |
1101 | 1.49M | *has_paging_state = false; |
1102 | 1.49M | if (!request_.return_paging_state()) { |
1103 | 69 | return Status::OK(); |
1104 | 69 | } |
1105 | | |
1106 | | // Set the paging state for next row. |
1107 | 1.49M | if (fetched_rows >= row_count_limit || scan_time_exceeded) { |
1108 | 25.8k | SubDocKey next_row_key; |
1109 | 25.8k | RETURN_NOT_OK(iter->GetNextReadSubDocKey(&next_row_key)); |
1110 | | // When the "limit" number of rows are returned and we are asked to return the paging state, |
1111 | | // return the partition key and row key of the next row to read in the paging state if there are |
1112 | | // still more rows to read. Otherwise, leave the paging state empty which means we are done |
1113 | | // reading from this tablet. |
1114 | 25.8k | if (!next_row_key.doc_key().empty()) { |
1115 | 23.7k | const auto& keybytes = next_row_key.Encode(); |
1116 | 23.7k | PgsqlPagingStatePB* paging_state = response_.mutable_paging_state(); |
1117 | 23.7k | RSTATUS_DCHECK(schema != nullptr, IllegalState, "Missing schema"); |
1118 | 23.7k | if (schema->num_hash_key_columns() > 0) { |
1119 | 6.20k | paging_state->set_next_partition_key( |
1120 | 6.20k | PartitionSchema::EncodeMultiColumnHashValue(next_row_key.doc_key().hash())); |
1121 | 17.5k | } else { |
1122 | 17.5k | paging_state->set_next_partition_key(keybytes.ToStringBuffer()); |
1123 | 17.5k | } |
1124 | 23.7k | paging_state->set_next_row_key(keybytes.ToStringBuffer()); |
1125 | 23.7k | *has_paging_state = true; |
1126 | 23.7k | } |
1127 | 25.8k | } |
1128 | 1.49M | if (*has_paging_state) { |
1129 | 23.7k | if (FLAGS_pgsql_consistent_transactional_paging) { |
1130 | 23.7k | read_time.AddToPB(response_.mutable_paging_state()); |
1131 | 0 | } else { |
1132 | | // Using SingleTime will help avoid read restarts on second page and later but will |
1133 | | // potentially produce stale results on those pages. |
1134 | 0 | auto per_row_consistent_read_time = ReadHybridTime::SingleTime(read_time.read); |
1135 | 0 | per_row_consistent_read_time.AddToPB(response_.mutable_paging_state()); |
1136 | 0 | } |
1137 | 23.7k | } |
1138 | | |
1139 | 1.49M | return Status::OK(); |
1140 | 1.49M | } |
1141 | | |
1142 | | Status PgsqlReadOperation::PopulateResultSet(const QLTableRow& table_row, |
1143 | 21.1M | faststring *result_buffer) { |
1144 | 21.1M | QLExprResult result; |
1145 | 228M | for (const PgsqlExpressionPB& expr : request_.targets()) { |
1146 | 228M | RETURN_NOT_OK(EvalExpr(expr, table_row, result.Writer())); |
1147 | 228M | RETURN_NOT_OK(pggate::WriteColumn(result.Value(), result_buffer)); |
1148 | 228M | } |
1149 | 21.1M | return Status::OK(); |
1150 | 21.1M | } |
1151 | | |
1152 | 15.2M | Status PgsqlReadOperation::GetTupleId(QLValue *result) const { |
1153 | | // Get row key and save to QLValue. |
1154 | | // TODO(neil) Check if we need to append a table_id and other info to TupleID. For example, we |
1155 | | // might need info to make sure the TupleId by itself is a valid reference to a specific row of |
1156 | | // a valid table. |
1157 | 15.2M | const Slice tuple_id = VERIFY_RESULT(table_iter_->GetTupleId()); |
1158 | 15.2M | result->set_binary_value(tuple_id.data(), tuple_id.size()); |
1159 | 15.2M | return Status::OK(); |
1160 | 15.2M | } |
1161 | | |
1162 | 1.71M | Status PgsqlReadOperation::EvalAggregate(const QLTableRow& table_row) { |
1163 | 1.71M | if (aggr_result_.empty()) { |
1164 | 341 | int column_count = request_.targets().size(); |
1165 | 341 | aggr_result_.resize(column_count); |
1166 | 341 | } |
1167 | | |
1168 | 1.71M | int aggr_index = 0; |
1169 | 1.71M | for (const PgsqlExpressionPB& expr : request_.targets()) { |
1170 | 1.71M | RETURN_NOT_OK(EvalExpr(expr, table_row, aggr_result_[aggr_index++].Writer())); |
1171 | 1.71M | } |
1172 | 1.71M | return Status::OK(); |
1173 | 1.71M | } |
1174 | | |
1175 | | Status PgsqlReadOperation::PopulateAggregate(const QLTableRow& table_row, |
1176 | 345 | faststring *result_buffer) { |
1177 | 345 | int column_count = request_.targets().size(); |
1178 | 755 | for (int rscol_index = 0; rscol_index < column_count; rscol_index++) { |
1179 | 410 | RETURN_NOT_OK(pggate::WriteColumn(aggr_result_[rscol_index].Value(), result_buffer)); |
1180 | 410 | } |
1181 | 345 | return Status::OK(); |
1182 | 345 | } |
1183 | | |
1184 | 1.33M | Status PgsqlReadOperation::GetIntents(const Schema& schema, KeyValueWriteBatchPB* out) { |
1185 | 1.33M | if (request_.batch_arguments_size() > 0 && request_.has_ybctid_column_value()) { |
1186 | 100k | for (const auto& batch_argument : request_.batch_arguments()) { |
1187 | 100k | SCHECK(batch_argument.has_ybctid(), InternalError, "ybctid batch argument is expected"); |
1188 | 100k | RETURN_NOT_OK(AddIntent(batch_argument.ybctid(), request_.wait_policy(), out)); |
1189 | 100k | } |
1190 | 1.33M | } else { |
1191 | 1.33M | AddIntent(VERIFY_RESULT(FetchEncodedDocKey(schema, request_)), request_.wait_policy(), out); |
1192 | 1.33M | } |
1193 | 1.33M | return Status::OK(); |
1194 | 1.33M | } |
1195 | | |
1196 | | } // namespace docdb |
1197 | | } // namespace yb |