/Users/deen/code/yugabyte-db/src/yb/docdb/cql_operation.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/cql_operation.h" |
15 | | |
16 | | #include <limits> |
17 | | #include <memory> |
18 | | #include <string> |
19 | | #include <unordered_set> |
20 | | #include <utility> |
21 | | #include <vector> |
22 | | |
23 | | #include "yb/bfpg/tserver_opcodes.h" |
24 | | |
25 | | #include "yb/common/index.h" |
26 | | #include "yb/common/index_column.h" |
27 | | #include "yb/common/jsonb.h" |
28 | | #include "yb/common/partition.h" |
29 | | #include "yb/common/ql_protocol_util.h" |
30 | | #include "yb/common/ql_resultset.h" |
31 | | #include "yb/common/ql_rowblock.h" |
32 | | #include "yb/common/ql_value.h" |
33 | | |
34 | | #include "yb/docdb/doc_path.h" |
35 | | #include "yb/docdb/doc_ql_scanspec.h" |
36 | | #include "yb/docdb/doc_rowwise_iterator.h" |
37 | | #include "yb/docdb/doc_write_batch.h" |
38 | | #include "yb/docdb/docdb.pb.h" |
39 | | #include "yb/docdb/docdb_debug.h" |
40 | | #include "yb/docdb/docdb_rocksdb_util.h" |
41 | | #include "yb/docdb/primitive_value_util.h" |
42 | | #include "yb/docdb/ql_storage_interface.h" |
43 | | |
44 | | #include "yb/util/debug-util.h" |
45 | | #include "yb/util/flag_tags.h" |
46 | | #include "yb/util/result.h" |
47 | | #include "yb/util/status.h" |
48 | | #include "yb/util/status_format.h" |
49 | | #include "yb/util/trace.h" |
50 | | |
51 | | #include "yb/yql/cql/ql/util/errcodes.h" |
52 | | |
53 | | DEFINE_test_flag(bool, pause_write_apply_after_if, false, |
54 | | "Pause application of QLWriteOperation after evaluating if condition."); |
55 | | |
56 | | DEFINE_bool(ycql_consistent_transactional_paging, false, |
57 | | "Whether to enforce consistency of data returned for second page and beyond for YCQL " |
58 | | "queries on transactional tables. If true, read restart errors could be returned to " |
59 | | "prevent inconsistency. If false, no read restart errors are returned but the data may " |
60 | | "be stale. The latter is preferable for long scans. The data returned for the first " |
61 | | "page of results is never stale regardless of this flag."); |
62 | | |
63 | | DEFINE_bool(ycql_disable_index_updating_optimization, false, |
64 | | "If true all secondary indexes must be updated even if the update does not change " |
65 | | "the index data."); |
66 | | TAG_FLAG(ycql_disable_index_updating_optimization, advanced); |
67 | | |
68 | | namespace yb { |
69 | | namespace docdb { |
70 | | |
71 | | using std::pair; |
72 | | using std::unordered_map; |
73 | | using std::unordered_set; |
74 | | using std::vector; |
75 | | |
76 | | namespace { |
77 | | |
78 | | // Append dummy entries in schema to table_row |
79 | | // TODO(omer): this should most probably be added somewhere else |
80 | 23 | void AddProjection(const Schema& schema, QLTableRow* table_row) { |
81 | 47 | for (size_t i = 0; i < schema.num_columns(); i++24 ) { |
82 | 24 | const auto& column_id = schema.column_id(i); |
83 | 24 | table_row->AllocColumn(column_id); |
84 | 24 | } |
85 | 23 | } |
86 | | |
87 | | // Create projection schemas of static and non-static columns from a rowblock projection schema |
88 | | // (for read) and a WHERE / IF condition (for read / write). "schema" is the full table schema |
89 | | // and "rowblock_schema" is the selected columns from which we are splitting into static and |
90 | | // non-static column portions. |
91 | | CHECKED_STATUS CreateProjections(const Schema& schema, const QLReferencedColumnsPB& column_refs, |
92 | 7.57M | Schema* static_projection, Schema* non_static_projection) { |
93 | | // The projection schemas are used to scan docdb. |
94 | 7.57M | unordered_set<ColumnId> static_columns, non_static_columns; |
95 | | |
96 | | // Add regular columns. |
97 | 24.0M | for (int32_t id : column_refs.ids()) { |
98 | 24.0M | const ColumnId column_id(id); |
99 | 24.0M | if (!schema.is_key_column(column_id)) { |
100 | 9.05M | non_static_columns.insert(column_id); |
101 | 9.05M | } |
102 | 24.0M | } |
103 | | |
104 | | // Add static columns. |
105 | 7.57M | for (int32_t id : column_refs.static_ids()) { |
106 | 440 | const ColumnId column_id(id); |
107 | 440 | static_columns.insert(column_id); |
108 | 440 | } |
109 | | |
110 | 7.57M | RETURN_NOT_OK( |
111 | 7.57M | schema.CreateProjectionByIdsIgnoreMissing( |
112 | 7.57M | vector<ColumnId>(static_columns.begin(), static_columns.end()), |
113 | 7.57M | static_projection)); |
114 | 7.57M | RETURN_NOT_OK( |
115 | 7.57M | schema.CreateProjectionByIdsIgnoreMissing( |
116 | 7.57M | vector<ColumnId>(non_static_columns.begin(), non_static_columns.end()), |
117 | 7.57M | non_static_projection)); |
118 | | |
119 | 7.57M | return Status::OK(); |
120 | 7.57M | } |
121 | | |
122 | | CHECKED_STATUS PopulateRow(const QLTableRow& table_row, const Schema& schema, |
123 | | const size_t begin_idx, const size_t col_count, |
124 | 66 | QLRow* row, size_t *col_idx) { |
125 | 144 | for (size_t i = begin_idx; i < begin_idx + col_count; i++78 ) { |
126 | 78 | RETURN_NOT_OK(table_row.GetValue(schema.column_id(i), row->mutable_column((*col_idx)++))); |
127 | 78 | } |
128 | 66 | return Status::OK(); |
129 | 66 | } |
130 | | |
131 | | CHECKED_STATUS PopulateRow(const QLTableRow& table_row, const Schema& projection, |
132 | 44 | QLRow* row, size_t* col_idx) { |
133 | 44 | return PopulateRow(table_row, projection, 0, projection.num_columns(), row, col_idx); |
134 | 44 | } |
135 | | |
136 | | // Outer join a static row with a non-static row. |
137 | | // A join is successful if and only if for every hash key, the values in the static and the |
138 | | // non-static row are either non-NULL and the same, or one of them is NULL. Therefore we say that |
139 | | // a join is successful if the static row is empty, and in turn return true. |
140 | | // Copies the entries from the static row into the non-static one. |
141 | | bool JoinStaticRow( |
142 | | const Schema& schema, const Schema& static_projection, const QLTableRow& static_row, |
143 | 346 | QLTableRow* non_static_row) { |
144 | | // The join is successful if the static row is empty |
145 | 346 | if (static_row.IsEmpty()) { |
146 | 11 | return true; |
147 | 11 | } |
148 | | |
149 | | // Now we know that the static row is not empty. The non-static row cannot be empty, therefore |
150 | | // we know that both the static row and the non-static one have non-NULL entries for all |
151 | | // hash keys. Therefore if MatchColumn returns false, we know the join is unsuccessful. |
152 | | // TODO(neil) |
153 | | // - Need to assign TTL and WriteTime to their default values. |
154 | | // - Check if they should be compared and copied over. Most likely not needed as we don't allow |
155 | | // selecting TTL and WriteTime for static columns. |
156 | | // - This copying function should be moved to QLTableRow class. |
157 | 931 | for (size_t i = 0; 335 i < schema.num_hash_key_columns(); i++596 ) { |
158 | 596 | if (!non_static_row->MatchColumn(schema.column_id(i), static_row)) { |
159 | 0 | return false; |
160 | 0 | } |
161 | 596 | } |
162 | | |
163 | | // Join the static columns in the static row into the non-static row. |
164 | 738 | for (size_t i = 0; 335 i < static_projection.num_columns(); i++403 ) { |
165 | 403 | non_static_row->CopyColumn(static_projection.column_id(i), static_row); |
166 | 403 | } |
167 | | |
168 | 335 | return true; |
169 | 335 | } |
170 | | |
171 | | // Join a non-static row with a static row. |
172 | | // Returns true if the two rows match |
173 | | bool JoinNonStaticRow( |
174 | | const Schema& schema, const Schema& static_projection, const QLTableRow& non_static_row, |
175 | 78 | QLTableRow* static_row) { |
176 | 78 | bool join_successful = true; |
177 | | |
178 | 197 | for (size_t i = 0; i < schema.num_hash_key_columns(); i++119 ) { |
179 | 129 | if (!static_row->MatchColumn(schema.column_id(i), non_static_row)) { |
180 | 10 | join_successful = false; |
181 | 10 | break; |
182 | 10 | } |
183 | 129 | } |
184 | | |
185 | 78 | if (!join_successful) { |
186 | 10 | static_row->Clear(); |
187 | 16 | for (size_t i = 0; i < static_projection.num_columns(); i++6 ) { |
188 | 6 | static_row->AllocColumn(static_projection.column_id(i)); |
189 | 6 | } |
190 | | |
191 | 20 | for (size_t i = 0; i < schema.num_hash_key_columns(); i++10 ) { |
192 | 10 | static_row->CopyColumn(schema.column_id(i), non_static_row); |
193 | 10 | } |
194 | 10 | } |
195 | 78 | return join_successful; |
196 | 78 | } |
197 | | |
198 | | CHECKED_STATUS FindMemberForIndex(const QLColumnValuePB& column_value, |
199 | | int index, |
200 | | rapidjson::Value* document, |
201 | | rapidjson::Value::MemberIterator* memberit, |
202 | | rapidjson::Value::ValueIterator* valueit, |
203 | | bool* last_elem_object, |
204 | 209 | bool is_insert) { |
205 | 209 | *last_elem_object = false; |
206 | | |
207 | 209 | int64_t array_index; |
208 | 209 | if (document->IsArray()) { |
209 | 10 | util::VarInt varint; |
210 | 10 | RETURN_NOT_OK(varint.DecodeFromComparable( |
211 | 10 | column_value.json_args(index).operand().value().varint_value())); |
212 | 8 | array_index = VERIFY_RESULT(varint.ToInt64()); |
213 | | |
214 | 8 | if (array_index >= document->GetArray().Size() || array_index < 06 ) { |
215 | 2 | return STATUS_SUBSTITUTE(QLError, "Array index out of bounds: ", array_index); |
216 | 2 | } |
217 | 6 | *valueit = document->Begin(); |
218 | 6 | std::advance(*valueit, array_index); |
219 | 199 | } else if (document->IsObject()) { |
220 | 199 | if (!is_insert) { |
221 | 199 | util::VarInt varint; |
222 | 199 | auto status = |
223 | 199 | varint.DecodeFromComparable(column_value.json_args(index).operand().value().varint_value()); |
224 | 199 | if (status.ok()) { |
225 | 2 | array_index = VERIFY_RESULT(varint.ToInt64()); |
226 | 2 | return STATUS_SUBSTITUTE(QLError, "Cannot use array index $0 to access object", |
227 | 2 | array_index); |
228 | 2 | } |
229 | 199 | } |
230 | | |
231 | 197 | *last_elem_object = true; |
232 | | |
233 | 197 | const auto& member = column_value.json_args(index).operand().value().string_value().c_str(); |
234 | 197 | *memberit = document->FindMember(member); |
235 | 197 | if (*memberit == document->MemberEnd()) { |
236 | 103 | return STATUS_SUBSTITUTE(QLError, "Could not find member: ", member); |
237 | 103 | } |
238 | 197 | } else { |
239 | 0 | return STATUS_SUBSTITUTE(QLError, "JSON field is invalid", column_value.ShortDebugString()); |
240 | 0 | } |
241 | 100 | return Status::OK(); |
242 | 209 | } |
243 | | |
244 | 78 | CHECKED_STATUS CheckUserTimestampForCollections(const UserTimeMicros user_timestamp) { |
245 | 78 | if (user_timestamp != ValueControlFields::kInvalidUserTimestamp) { |
246 | 0 | return STATUS(InvalidArgument, "User supplied timestamp is only allowed for " |
247 | 0 | "replacing the whole collection"); |
248 | 0 | } |
249 | 78 | return Status::OK(); |
250 | 78 | } |
251 | | |
252 | | } // namespace |
253 | | |
254 | | QLWriteOperation::QLWriteOperation(std::reference_wrapper<const QLWriteRequestPB> request, |
255 | | std::shared_ptr<const Schema> schema, |
256 | | std::reference_wrapper<const IndexMap> index_map, |
257 | | const Schema* unique_index_key_schema, |
258 | | const TransactionOperationContext& txn_op_context) |
259 | | : DocOperationBase(request), |
260 | | schema_(std::move(schema)), |
261 | | index_map_(index_map), |
262 | | unique_index_key_schema_(unique_index_key_schema), |
263 | | txn_op_context_(txn_op_context) |
264 | 4.64M | {} |
265 | | |
266 | 4.64M | QLWriteOperation::~QLWriteOperation() = default; |
267 | | |
268 | 4.64M | Status QLWriteOperation::Init(QLResponsePB* response) { |
269 | 4.64M | response_ = response; |
270 | 4.64M | insert_into_unique_index_ = request_.type() == QLWriteRequestPB::QL_STMT_INSERT && |
271 | 4.64M | unique_index_key_schema_ != nullptr4.12M ; |
272 | 4.64M | require_read_ = RequireRead(request_, *schema_) || insert_into_unique_index_4.58M |
273 | 4.64M | || !index_map_.empty()4.58M ; |
274 | 4.64M | update_indexes_ = !request_.update_index_ids().empty(); |
275 | | |
276 | | // Determine if static / non-static columns are being written. |
277 | 4.64M | bool write_static_columns = false; |
278 | 4.64M | bool write_non_static_columns = false; |
279 | | // TODO(Amit): Remove the DVLOGS after backfill features stabilize. |
280 | 4.64M | DVLOG(4) << "Processing request " << yb::ToString(request_)3.38k ; |
281 | 6.76M | for (const auto& column : request_.column_values()) { |
282 | 6.76M | DVLOG(4) << "Looking at column : " << yb::ToString(column)915 ; |
283 | 6.76M | auto schema_column = schema_->column_by_id(ColumnId(column.column_id())); |
284 | 18.4E | DVLOG(4) << "schema column : " << yb::ToString(schema_column); |
285 | 6.76M | RETURN_NOT_OK(schema_column); |
286 | 6.76M | if (schema_column->is_static()) { |
287 | 10.3k | write_static_columns = true; |
288 | 6.75M | } else { |
289 | 6.75M | write_non_static_columns = true; |
290 | 6.75M | } |
291 | 6.76M | if (write_static_columns && write_non_static_columns20.5k ) { |
292 | 10.1k | break; |
293 | 10.1k | } |
294 | 6.76M | } |
295 | | |
296 | 4.64M | bool is_range_operation = IsRangeOperation(request_, *schema_); |
297 | | |
298 | | // We need the hashed key if writing to the static columns, and need primary key if writing to |
299 | | // non-static columns or writing the full primary key (i.e. range columns are present or table |
300 | | // does not have range columns). |
301 | 4.64M | return InitializeKeys( |
302 | 4.64M | write_static_columns || is_range_operation4.63M , |
303 | 4.64M | write_non_static_columns || !request_.range_column_values().empty()60.2k || |
304 | 4.64M | schema_->num_range_key_columns() == 019.6k ); |
305 | 4.64M | } |
306 | | |
307 | 4.67M | Status QLWriteOperation::InitializeKeys(const bool hashed_key, const bool primary_key) { |
308 | | // Populate the hashed and range components in the same order as they are in the table schema. |
309 | 4.67M | const auto& hashed_column_values = request_.hashed_column_values(); |
310 | 4.67M | const auto& range_column_values = request_.range_column_values(); |
311 | 4.67M | std::vector<PrimitiveValue> hashed_components; |
312 | 4.67M | std::vector<PrimitiveValue> range_components; |
313 | 4.67M | RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues( |
314 | 4.67M | hashed_column_values, *schema_, 0, |
315 | 4.67M | schema_->num_hash_key_columns(), &hashed_components)); |
316 | 4.67M | RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues( |
317 | 4.67M | range_column_values, *schema_, schema_->num_hash_key_columns(), |
318 | 4.67M | schema_->num_range_key_columns(), &range_components)); |
319 | | |
320 | | // need_pk - true is we should construct pk_key_key_ |
321 | 4.67M | const bool need_pk = 4.67M primary_key4.67M && !pk_doc_key_; |
322 | | |
323 | | // We need the hash key if writing to the static columns. |
324 | 4.67M | if (hashed_key && !hashed_doc_key_10.2k ) { |
325 | 10.2k | if (need_pk) { |
326 | 10.1k | hashed_doc_key_.emplace(request_.hash_code(), hashed_components); |
327 | 10.1k | } else { |
328 | 75 | hashed_doc_key_.emplace(request_.hash_code(), std::move(hashed_components)); |
329 | 75 | } |
330 | 10.2k | encoded_hashed_doc_key_ = hashed_doc_key_->EncodeAsRefCntPrefix(); |
331 | 10.2k | } |
332 | | |
333 | | // We need the primary key if writing to non-static columns or writing the full primary key |
334 | | // (i.e. range columns are present). |
335 | 4.67M | if (need_pk) { |
336 | 4.64M | if (request_.has_hash_code() && !hashed_column_values.empty()4.22M ) { |
337 | 4.22M | pk_doc_key_.emplace( |
338 | 4.22M | request_.hash_code(), std::move(hashed_components), std::move(range_components)); |
339 | 4.22M | } else { |
340 | | // In case of syscatalog tables, we don't have any hash components. |
341 | 420k | pk_doc_key_.emplace(std::move(range_components)); |
342 | 420k | } |
343 | 4.64M | encoded_pk_doc_key_ = pk_doc_key_->EncodeAsRefCntPrefix(); |
344 | 4.64M | } |
345 | | |
346 | 4.67M | return Status::OK(); |
347 | 4.67M | } |
348 | | |
349 | | Status QLWriteOperation::GetDocPaths( |
350 | 4.69M | GetDocPathsMode mode, DocPathsToLock *paths, IsolationLevel *level) const { |
351 | 4.69M | if (mode == GetDocPathsMode::kLock || request_.column_values().empty()175k || !index_map_.empty()139k ) { |
352 | 4.60M | if (encoded_hashed_doc_key_) { |
353 | 282 | paths->push_back(encoded_hashed_doc_key_); |
354 | 282 | } |
355 | 4.60M | if (encoded_pk_doc_key_) { |
356 | 4.59M | paths->push_back(encoded_pk_doc_key_); |
357 | 4.59M | } |
358 | 4.60M | } else { |
359 | 89.8k | KeyBytes buffer; |
360 | 93.8k | for (const auto& column_value : request_.column_values()) { |
361 | 93.8k | ColumnId column_id(column_value.column_id()); |
362 | 93.8k | const ColumnSchema& column = VERIFY_RESULT(schema_->column_by_id(column_id)); |
363 | | |
364 | 93.8k | Slice doc_key = column.is_static() ? encoded_hashed_doc_key_.as_slice()2 |
365 | 93.8k | : encoded_pk_doc_key_.as_slice()93.8k ; |
366 | 93.8k | buffer.Clear(); |
367 | 93.8k | buffer.AppendValueType(ValueType::kColumnId); |
368 | 93.8k | buffer.AppendColumnId(column_id); |
369 | 93.8k | RefCntBuffer path(doc_key.size() + buffer.size()); |
370 | 93.8k | memcpy(path.data(), doc_key.data(), doc_key.size()); |
371 | 93.8k | buffer.AsSlice().CopyTo(path.data() + doc_key.size()); |
372 | 93.8k | paths->push_back(RefCntPrefix(path)); |
373 | 93.8k | } |
374 | 89.8k | } |
375 | | |
376 | | // When this write operation requires a read, it requires a read snapshot so paths will be locked |
377 | | // in snapshot isolation for consistency. Otherwise, pure writes will happen in serializable |
378 | | // isolation so that they will serialize but do not conflict with one another. |
379 | | // |
380 | | // Currently, only keys that are being written are locked, no lock is taken on read at the |
381 | | // snapshot isolation level. |
382 | 4.69M | *level = require_read_ ? IsolationLevel::SNAPSHOT_ISOLATION106k |
383 | 4.69M | : IsolationLevel::SERIALIZABLE_ISOLATION4.58M ; |
384 | | |
385 | 4.69M | return Status::OK(); |
386 | 4.69M | } |
387 | | |
388 | | Status QLWriteOperation::ReadColumns(const DocOperationApplyData& data, |
389 | | Schema *param_static_projection, |
390 | | Schema *param_non_static_projection, |
391 | 27.5k | QLTableRow* table_row) { |
392 | 27.5k | Schema *static_projection = param_static_projection; |
393 | 27.5k | Schema *non_static_projection = param_non_static_projection; |
394 | | |
395 | 27.5k | Schema local_static_projection; |
396 | 27.5k | Schema local_non_static_projection; |
397 | 27.5k | if (static_projection == nullptr) { |
398 | 27.5k | static_projection = &local_static_projection; |
399 | 27.5k | } |
400 | 27.5k | if (non_static_projection == nullptr) { |
401 | 27.5k | non_static_projection = &local_non_static_projection; |
402 | 27.5k | } |
403 | | |
404 | | // Create projections to scan docdb. |
405 | 27.5k | RETURN_NOT_OK(CreateProjections(*schema_, request_.column_refs(), |
406 | 27.5k | static_projection, non_static_projection)); |
407 | | |
408 | | // Generate hashed / primary key depending on if static / non-static columns are referenced in |
409 | | // the if-condition. |
410 | 27.5k | RETURN_NOT_OK(InitializeKeys( |
411 | 27.5k | !static_projection->columns().empty(), !non_static_projection->columns().empty())); |
412 | | |
413 | | // Scan docdb for the static and non-static columns of the row using the hashed / primary key. |
414 | 27.5k | if (hashed_doc_key_) { |
415 | 28 | DocQLScanSpec spec(*static_projection, *hashed_doc_key_, request_.query_id()); |
416 | 28 | DocRowwiseIterator iterator(*static_projection, *schema_, txn_op_context_, |
417 | 28 | data.doc_write_batch->doc_db(), |
418 | 28 | data.deadline, data.read_time); |
419 | 28 | RETURN_NOT_OK(iterator.Init(spec)); |
420 | 28 | if (VERIFY_RESULT(iterator.HasNext())) { |
421 | 22 | RETURN_NOT_OK(iterator.NextRow(table_row)); |
422 | 22 | } |
423 | 28 | data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt()); |
424 | 28 | } |
425 | 27.6k | if (27.5k pk_doc_key_27.5k ) { |
426 | 27.6k | DocQLScanSpec spec(*non_static_projection, *pk_doc_key_, request_.query_id()); |
427 | 27.6k | DocRowwiseIterator iterator(*non_static_projection, *schema_, txn_op_context_, |
428 | 27.6k | data.doc_write_batch->doc_db(), |
429 | 27.6k | data.deadline, data.read_time); |
430 | 27.6k | RETURN_NOT_OK(iterator.Init(spec)); |
431 | 27.6k | if (VERIFY_RESULT(iterator.HasNext())) { |
432 | 12.5k | RETURN_NOT_OK(iterator.NextRow(table_row)); |
433 | | // If there are indexes to update, check if liveness column exists for update/delete because |
434 | | // that will affect whether the row will still exist after the DML and whether we need to |
435 | | // remove the key from the indexes. |
436 | 12.5k | if (update_indexes_ && (12.2k request_.type() == QLWriteRequestPB::QL_STMT_UPDATE12.2k || |
437 | 12.2k | request_.type() == QLWriteRequestPB::QL_STMT_DELETE11.2k )) { |
438 | 1.25k | liveness_column_exists_ = iterator.LivenessColumnExists(); |
439 | 1.25k | } |
440 | 15.0k | } else { |
441 | | // If no non-static column is found, the row does not exist and we should clear the static |
442 | | // columns in the map to indicate the row does not exist. |
443 | 15.0k | table_row->Clear(); |
444 | 15.0k | } |
445 | 27.6k | data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt()); |
446 | 27.6k | } |
447 | | |
448 | 27.5k | return Status::OK(); |
449 | 27.5k | } |
450 | | |
451 | | Status QLWriteOperation::PopulateConditionalDmlRow(const DocOperationApplyData& data, |
452 | | const bool should_apply, |
453 | | const QLTableRow& table_row, |
454 | | Schema static_projection, |
455 | | Schema non_static_projection, |
456 | 102 | std::unique_ptr<QLRowBlock>* rowblock) { |
457 | | // Populate the result set to return the "applied" status, and optionally the hash / primary key |
458 | | // and the present column values if the condition is not satisfied and the row does exist |
459 | | // (value_map is not empty). |
460 | 102 | const bool return_present_values = !should_apply && !table_row.IsEmpty()23 ; |
461 | 102 | const size_t num_key_columns = |
462 | 102 | pk_doc_key_ ? schema_->num_key_columns()98 : schema_->num_hash_key_columns()4 ; |
463 | 102 | std::vector<ColumnSchema> columns; |
464 | 102 | columns.emplace_back(ColumnSchema("[applied]", BOOL)); |
465 | 102 | if (return_present_values) { |
466 | 22 | columns.insert(columns.end(), schema_->columns().begin(), |
467 | 22 | schema_->columns().begin() + num_key_columns); |
468 | 22 | columns.insert(columns.end(), static_projection.columns().begin(), |
469 | 22 | static_projection.columns().end()); |
470 | 22 | columns.insert(columns.end(), non_static_projection.columns().begin(), |
471 | 22 | non_static_projection.columns().end()); |
472 | 22 | } |
473 | 102 | rowblock->reset(new QLRowBlock(Schema(columns, 0))); |
474 | 102 | QLRow& row = rowblock->get()->Extend(); |
475 | 102 | row.mutable_column(0)->set_bool_value(should_apply); |
476 | 102 | size_t col_idx = 1; |
477 | 102 | if (return_present_values) { |
478 | 22 | RETURN_NOT_OK(PopulateRow(table_row, *schema_, 0, num_key_columns, &row, &col_idx)); |
479 | 22 | RETURN_NOT_OK(PopulateRow(table_row, static_projection, &row, &col_idx)); |
480 | 22 | RETURN_NOT_OK(PopulateRow(table_row, non_static_projection, &row, &col_idx)); |
481 | 22 | } |
482 | | |
483 | 102 | return Status::OK(); |
484 | 102 | } |
485 | | |
486 | | Status QLWriteOperation::PopulateStatusRow(const DocOperationApplyData& data, |
487 | | const bool should_apply, |
488 | | const QLTableRow& table_row, |
489 | 271 | std::unique_ptr<QLRowBlock>* rowblock) { |
490 | 271 | std::vector<ColumnSchema> columns; |
491 | 271 | columns.emplace_back(ColumnSchema("[applied]", BOOL)); |
492 | 271 | columns.emplace_back(ColumnSchema("[message]", STRING)); |
493 | 271 | columns.insert(columns.end(), schema_->columns().begin(), schema_->columns().end()); |
494 | | |
495 | 271 | rowblock->reset(new QLRowBlock(Schema(columns, 0))); |
496 | 271 | QLRow& row = rowblock->get()->Extend(); |
497 | 271 | row.mutable_column(0)->set_bool_value(should_apply); |
498 | | // No message unless there is an error (then message will be set in executor). |
499 | | |
500 | | // If not applied report the existing row values as for regular if clause. |
501 | 271 | if (!should_apply) { |
502 | 25 | for (size_t i = 0; i < schema_->num_columns(); i++20 ) { |
503 | 20 | boost::optional<const QLValuePB&> col_val = table_row.GetValue(schema_->column_id(i)); |
504 | 20 | if (col_val.is_initialized()) { |
505 | 16 | *(row.mutable_column(i + 2)) = *col_val; |
506 | 16 | } |
507 | 20 | } |
508 | 5 | } |
509 | | |
510 | 271 | return Status::OK(); |
511 | 271 | } |
512 | | |
513 | | // Check if a duplicate value is inserted into a unique index. |
514 | 2.31k | Result<bool> QLWriteOperation::HasDuplicateUniqueIndexValue(const DocOperationApplyData& data) { |
515 | 2.31k | VLOG(3) << "Looking for collisions in\n" |
516 | 3 | << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db()); |
517 | | // We need to check backwards only for backfilled entries. |
518 | 2.31k | bool ret = |
519 | 2.31k | VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kForward)) || |
520 | 2.31k | (1.85k request_.is_backfill()1.85k && |
521 | 1.85k | VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kBackward))); |
522 | 2.31k | if (!ret) { |
523 | 18.4E | VLOG(3) << "No collisions found"; |
524 | 1.85k | } |
525 | 2.31k | return ret; |
526 | 2.31k | } |
527 | | |
528 | | Result<bool> QLWriteOperation::HasDuplicateUniqueIndexValue( |
529 | 2.85k | const DocOperationApplyData& data, Direction direction) { |
530 | 2.85k | VLOG(2) << "Looking for collision while going " << yb::ToString(direction) |
531 | 3 | << ". Trying to insert " << *pk_doc_key_; |
532 | 2.85k | auto requested_read_time = data.read_time; |
533 | 2.85k | if (direction == Direction::kForward) { |
534 | 2.31k | return HasDuplicateUniqueIndexValue(data, requested_read_time); |
535 | 2.31k | } |
536 | | |
537 | 539 | auto iter = CreateIntentAwareIterator( |
538 | 539 | data.doc_write_batch->doc_db(), |
539 | 539 | BloomFilterMode::USE_BLOOM_FILTER, |
540 | 539 | pk_doc_key_->Encode().AsSlice(), |
541 | 539 | request_.query_id(), |
542 | 539 | txn_op_context_, |
543 | 539 | data.deadline, |
544 | 539 | ReadHybridTime::Max()); |
545 | | |
546 | 539 | HybridTime oldest_past_min_ht = VERIFY_RESULT(FindOldestOverwrittenTimestamp( |
547 | 539 | iter.get(), SubDocKey(*pk_doc_key_), requested_read_time.read)); |
548 | 0 | const HybridTime oldest_past_min_ht_liveness = |
549 | 539 | VERIFY_RESULT(FindOldestOverwrittenTimestamp( |
550 | 539 | iter.get(), |
551 | 539 | SubDocKey(*pk_doc_key_, PrimitiveValue::kLivenessColumn), |
552 | 539 | requested_read_time.read)); |
553 | 0 | oldest_past_min_ht.MakeAtMost(oldest_past_min_ht_liveness); |
554 | 539 | if (!oldest_past_min_ht.is_valid()) { |
555 | 533 | return false; |
556 | 533 | } |
557 | 6 | return HasDuplicateUniqueIndexValue( |
558 | 6 | data, ReadHybridTime::SingleTime(oldest_past_min_ht)); |
559 | 539 | } |
560 | | |
561 | | Result<bool> QLWriteOperation::HasDuplicateUniqueIndexValue( |
562 | 2.31k | const DocOperationApplyData& data, ReadHybridTime read_time) { |
563 | | // Set up the iterator to read the current primary key associated with the index key. |
564 | 2.31k | DocQLScanSpec spec(*unique_index_key_schema_, *pk_doc_key_, request_.query_id(), true); |
565 | 2.31k | DocRowwiseIterator iterator( |
566 | 2.31k | *unique_index_key_schema_, |
567 | 2.31k | *schema_, |
568 | 2.31k | txn_op_context_, |
569 | 2.31k | data.doc_write_batch->doc_db(), |
570 | 2.31k | data.deadline, |
571 | 2.31k | read_time); |
572 | 2.31k | RETURN_NOT_OK(iterator.Init(spec)); |
573 | | |
574 | | // It is a duplicate value if the index key exists already and the index value (corresponding to |
575 | | // the indexed table's primary key) is not the same. |
576 | 2.31k | if (!VERIFY_RESULT(iterator.HasNext())) { |
577 | 1.78k | VLOG(2) << "No collision found while checking at " << yb::ToString(read_time)1 ; |
578 | 1.78k | return false; |
579 | 1.78k | } |
580 | 531 | QLTableRow table_row; |
581 | 531 | RETURN_NOT_OK(iterator.NextRow(&table_row)); |
582 | 531 | std::unordered_set<ColumnId> key_column_ids(unique_index_key_schema_->column_ids().begin(), |
583 | 531 | unique_index_key_schema_->column_ids().end()); |
584 | 1.79k | for (const auto& column_value : request_.column_values()) { |
585 | 1.79k | ColumnId column_id(column_value.column_id()); |
586 | 1.79k | if (key_column_ids.count(column_id) > 0) { |
587 | 1.64k | boost::optional<const QLValuePB&> existing_value = table_row.GetValue(column_id); |
588 | 1.64k | const QLValuePB& new_value = column_value.expr().value(); |
589 | 1.64k | if (existing_value && *existing_value != new_value) { |
590 | 451 | VLOG(2) << "Found collision while checking at " << yb::ToString(read_time) |
591 | 0 | << "\nExisting: " << yb::ToString(*existing_value) |
592 | 0 | << " vs New: " << yb::ToString(new_value) |
593 | 0 | << "\nUsed read time as " << yb::ToString(data.read_time); |
594 | 451 | DVLOG(3) << "DocDB is now:\n" |
595 | 0 | << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db()); |
596 | 451 | return true; |
597 | 451 | } |
598 | 1.64k | } |
599 | 1.79k | } |
600 | | |
601 | 18.4E | VLOG(2) << "No collision while checking at " << yb::ToString(read_time); |
602 | 80 | return false; |
603 | 531 | } |
604 | | |
605 | | Result<HybridTime> QLWriteOperation::FindOldestOverwrittenTimestamp( |
606 | | IntentAwareIterator* iter, |
607 | | const SubDocKey& sub_doc_key, |
608 | 1.08k | HybridTime min_read_time) { |
609 | 1.08k | HybridTime result; |
610 | 1.08k | VLOG(3) << "Doing iter->Seek " << *pk_doc_key_6 ; |
611 | 1.08k | iter->Seek(*pk_doc_key_); |
612 | 1.08k | if (iter->valid()) { |
613 | 984 | const KeyBytes bytes = sub_doc_key.EncodeWithoutHt(); |
614 | 984 | const Slice& sub_key_slice = bytes.AsSlice(); |
615 | 984 | result = VERIFY_RESULT( |
616 | 0 | iter->FindOldestRecord(sub_key_slice, min_read_time)); |
617 | 2 | VLOG(2) << "iter->FindOldestRecord returned " << result << " for " |
618 | 2 | << SubDocKey::DebugSliceToString(sub_key_slice); |
619 | 984 | } else { |
620 | 98 | VLOG(3) << "iter->Seek " << *pk_doc_key_ << " turned out to be invalid"0 ; |
621 | 98 | } |
622 | 1.08k | return result; |
623 | 1.08k | } |
624 | | |
625 | | Status QLWriteOperation::ApplyForJsonOperators( |
626 | | const ColumnSchema& column, |
627 | | const ColumnIdRep col_id, |
628 | | const std::unordered_map<ColumnIdRep, vector<int>>& col_map, |
629 | | const DocOperationApplyData& data, |
630 | | const DocPath& sub_path, |
631 | | const MonoDelta& ttl, |
632 | | const UserTimeMicros& user_timestamp, |
633 | | QLTableRow* existing_row, |
634 | 94 | bool is_insert) { |
635 | 94 | using common::Jsonb; |
636 | 94 | rapidjson::Document document; |
637 | 94 | QLValue qlv; |
638 | 94 | bool read_needed = true; |
639 | 182 | for (int idx : col_map.find(col_id)->second) { |
640 | 182 | const auto& column_value = request_.column_values(idx); |
641 | 182 | if (column_value.column_id() != col_id) continue0 ; |
642 | 182 | if (read_needed) { |
643 | | // Read the json column value in order to perform a read modify write. |
644 | 94 | QLExprResult temp; |
645 | 94 | RETURN_NOT_OK(existing_row->ReadColumn(col_id, temp.Writer())); |
646 | 94 | const auto& ql_value = temp.Value(); |
647 | 94 | if (!IsNull(ql_value)) { |
648 | 63 | Jsonb jsonb(std::move(ql_value.jsonb_value())); |
649 | 63 | RETURN_NOT_OK(jsonb.ToRapidJson(&document)); |
650 | 63 | } else { |
651 | 31 | if (!is_insert && column_value.json_args_size() > 1) { |
652 | 1 | return STATUS_SUBSTITUTE(QLError, "JSON path depth should be 1 for upsert", |
653 | 1 | column_value.ShortDebugString()); |
654 | 1 | } |
655 | 30 | common::Jsonb empty_jsonb; |
656 | 30 | RETURN_NOT_OK(empty_jsonb.FromString("{}")); |
657 | 30 | QLTableColumn& column = existing_row->AllocColumn(column_value.column_id()); |
658 | 30 | column.value.set_jsonb_value(empty_jsonb.MoveSerializedJsonb()); |
659 | | |
660 | 30 | Jsonb jsonb(column.value.jsonb_value()); |
661 | 30 | RETURN_NOT_OK(jsonb.ToRapidJson(&document)); |
662 | 30 | } |
663 | 94 | } |
664 | 181 | read_needed = false; |
665 | | |
666 | | // Deserialize the rhs. |
667 | 181 | Jsonb rhs(std::move(column_value.expr().value().jsonb_value())); |
668 | 181 | rapidjson::Document rhs_doc(&document.GetAllocator()); |
669 | 181 | RETURN_NOT_OK(rhs.ToRapidJson(&rhs_doc)); |
670 | | |
671 | | // Update the json value. |
672 | 179 | rapidjson::Value::MemberIterator memberit; |
673 | 179 | rapidjson::Value::ValueIterator valueit; |
674 | 179 | bool last_elem_object; |
675 | 179 | rapidjson::Value* node = &document; |
676 | | |
677 | 179 | int i = 0; |
678 | 179 | auto status = FindMemberForIndex(column_value, i, node, &memberit, &valueit, |
679 | 179 | &last_elem_object, is_insert); |
680 | 209 | for (i = 1; i < column_value.json_args_size() && status.ok()34 ; i++30 ) { |
681 | 30 | node = (last_elem_object) ? &(memberit->value)25 : &(*valueit)5 ; |
682 | 30 | status = FindMemberForIndex(column_value, i, node, &memberit, &valueit, |
683 | 30 | &last_elem_object, is_insert); |
684 | 30 | } |
685 | | |
686 | 179 | bool update_missing = false; |
687 | 179 | if (is_insert) { |
688 | 0 | RETURN_NOT_OK(status); |
689 | 179 | } else { |
690 | 179 | update_missing = !status.ok(); |
691 | 179 | } |
692 | | |
693 | 179 | if (update_missing) { |
694 | | // NOTE: lhs path cannot exceed by more than one hop |
695 | 112 | if (last_elem_object && i == column_value.json_args_size()106 ) { |
696 | 105 | auto val = column_value.json_args(i - 1).operand().value().string_value(); |
697 | 105 | rapidjson::Value v( |
698 | 105 | val.c_str(), narrow_cast<rapidjson::SizeType>(val.size()), document.GetAllocator()); |
699 | 105 | node->AddMember(v, rhs_doc, document.GetAllocator()); |
700 | 105 | } else { |
701 | 7 | RETURN_NOT_OK(status); |
702 | 7 | } |
703 | 112 | } else if (67 last_elem_object67 ) { |
704 | 66 | memberit->value = rhs_doc.Move(); |
705 | 66 | } else { |
706 | 1 | *valueit = rhs_doc.Move(); |
707 | 1 | } |
708 | 179 | } // end of column processing |
709 | | // Now write the new json value back. |
710 | 84 | Jsonb jsonb_result; |
711 | 84 | RETURN_NOT_OK(jsonb_result.FromRapidJson(document)); |
712 | | // Update the current row as well so that we can accumulate the result of multiple json |
713 | | // operations and write the final value. |
714 | 84 | *(qlv.mutable_jsonb_value()) = std::move(jsonb_result.MoveSerializedJsonb()); |
715 | | |
716 | 84 | existing_row->AllocColumn(col_id).value = qlv.value(); |
717 | | |
718 | 84 | ValueRef value_ref(qlv.value(), column.sorting_type(), yb::bfql::TSOpcode::kScalarInsert); |
719 | 84 | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
720 | 84 | sub_path, value_ref, data.read_time, data.deadline, request_.query_id(), ttl, |
721 | 84 | user_timestamp)); |
722 | | |
723 | 84 | return Status::OK(); |
724 | 84 | } |
725 | | |
726 | | Status QLWriteOperation::ApplyForSubscriptArgs(const QLColumnValuePB& column_value, |
727 | | const QLTableRow& existing_row, |
728 | | const DocOperationApplyData& data, |
729 | | const MonoDelta& ttl, |
730 | | const UserTimeMicros& user_timestamp, |
731 | | const ColumnSchema& column, |
732 | 30 | DocPath* sub_path) { |
733 | 30 | QLExprResult expr_result; |
734 | 30 | RETURN_NOT_OK(EvalExpr(column_value.expr(), existing_row, expr_result.Writer())); |
735 | 30 | ValueRef value( |
736 | 30 | expr_result.Value(), column.sorting_type(), GetTSWriteInstruction(column_value.expr())); |
737 | 30 | RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp)); |
738 | | |
739 | | // Setting the value for a sub-column |
740 | | // Currently we only support two cases here: `map['key'] = v` and `list[index] = v`) |
741 | | // Any other case should be rejected by the semantic analyser before getting here |
742 | | // Later when we support frozen or nested collections this code may need refactoring |
743 | 30 | DCHECK_EQ(column_value.subscript_args().size(), 1); |
744 | 30 | DCHECK(column_value.subscript_args(0).has_value()) << "An index must be a constant"0 ; |
745 | 30 | switch (column.type()->main()) { |
746 | 15 | case MAP: { |
747 | 15 | const PrimitiveValue &pv = PrimitiveValue::FromQLValuePB( |
748 | 15 | column_value.subscript_args(0).value(), |
749 | 15 | SortingType::kNotSpecified); |
750 | 15 | sub_path->AddSubKey(pv); |
751 | 15 | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
752 | 15 | *sub_path, value, data.read_time, data.deadline, |
753 | 15 | request_.query_id(), ttl, user_timestamp)); |
754 | 15 | break; |
755 | 15 | } |
756 | 15 | case LIST: { |
757 | 15 | MonoDelta default_ttl = schema_->table_properties().HasDefaultTimeToLive() ? |
758 | 0 | MonoDelta::FromMilliseconds(schema_->table_properties().DefaultTimeToLive()) : |
759 | 15 | MonoDelta::kMax; |
760 | | |
761 | 15 | int target_cql_index = column_value.subscript_args(0).value().int32_value(); |
762 | 15 | RETURN_NOT_OK(data.doc_write_batch->ReplaceCqlInList( |
763 | 15 | *sub_path, target_cql_index, value, data.read_time, data.deadline, request_.query_id(), |
764 | 15 | default_ttl, ttl)); |
765 | 10 | break; |
766 | 15 | } |
767 | 10 | default: { |
768 | 0 | LOG(ERROR) << "Unexpected type for setting subcolumn: " |
769 | 0 | << column.type()->ToString(); |
770 | 0 | } |
771 | 30 | } |
772 | 25 | return Status::OK(); |
773 | 30 | } |
774 | | |
775 | | Status QLWriteOperation::ApplyForRegularColumns(const QLColumnValuePB& column_value, |
776 | | const QLTableRow& existing_row, |
777 | | const DocOperationApplyData& data, |
778 | | const DocPath& sub_path, const MonoDelta& ttl, |
779 | | const UserTimeMicros& user_timestamp, |
780 | | const ColumnSchema& column, |
781 | | const ColumnId& column_id, |
782 | 6.73M | QLTableRow* new_row) { |
783 | 6.73M | using yb::bfql::TSOpcode; |
784 | | |
785 | | // Typical case, setting a columns value |
786 | 6.73M | QLExprResult expr_result; |
787 | 6.73M | RETURN_NOT_OK(EvalExpr(column_value.expr(), existing_row, expr_result.Writer())); |
788 | 6.73M | auto write_instruction = GetTSWriteInstruction(column_value.expr()); |
789 | 6.73M | ValueRef value(expr_result.Value(), column.sorting_type(), write_instruction); |
790 | 6.73M | switch (write_instruction) { |
791 | 0 | case TSOpcode::kToJson: FALLTHROUGH_INTENDED; |
792 | 6.73M | case TSOpcode::kScalarInsert: |
793 | 6.73M | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
794 | 6.73M | sub_path, value, data.read_time, data.deadline, |
795 | 6.73M | request_.query_id(), ttl, user_timestamp)); |
796 | 6.73M | break; |
797 | 6.73M | case TSOpcode::kMapExtend: |
798 | 19 | case TSOpcode::kSetExtend: |
799 | 24 | case TSOpcode::kMapRemove: |
800 | 29 | case TSOpcode::kSetRemove: |
801 | 29 | RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp)); |
802 | 29 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
803 | 29 | sub_path, value, data.read_time, data.deadline, request_.query_id(), ttl)); |
804 | 29 | break; |
805 | 29 | case TSOpcode::kListPrepend: |
806 | 5 | value.set_list_extend_order(ListExtendOrder::PREPEND_BLOCK); |
807 | 5 | FALLTHROUGH_INTENDED; |
808 | 14 | case TSOpcode::kListAppend: |
809 | 14 | RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp)); |
810 | 14 | RETURN_NOT_OK(data.doc_write_batch->ExtendList( |
811 | 14 | sub_path, value, data.read_time, data.deadline, request_.query_id(), ttl)); |
812 | 14 | break; |
813 | 14 | case TSOpcode::kListRemove: |
814 | | // TODO(akashnil or mihnea) this should call RemoveFromList once thats implemented |
815 | | // Currently list subtraction is computed in memory using builtin call so this |
816 | | // case should never be reached. Once it is implemented the corresponding case |
817 | | // from EvalQLExpressionPB should be uncommented to enable this optimization. |
818 | 5 | RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp)); |
819 | 5 | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
820 | 5 | sub_path, value, data.read_time, data.deadline, |
821 | 5 | request_.query_id(), ttl, user_timestamp)); |
822 | 5 | break; |
823 | 5 | default: |
824 | 0 | LOG(FATAL) << "Unsupported operation: " << static_cast<int>(write_instruction); |
825 | 0 | break; |
826 | 6.73M | } |
827 | | |
828 | 6.74M | if (update_indexes_) { |
829 | 37.7k | new_row->AllocColumn(column_id, expr_result.Value()); |
830 | 37.7k | } |
831 | 6.74M | return Status::OK(); |
832 | 6.73M | } |
833 | | |
834 | 4.61M | Status QLWriteOperation::Apply(const DocOperationApplyData& data) { |
835 | 4.61M | QLTableRow existing_row; |
836 | 4.61M | if (request_.has_if_expr()) { |
837 | | // Check if the if-condition is satisfied. |
838 | 133 | bool should_apply = true; |
839 | 133 | Schema static_projection, non_static_projection; |
840 | 133 | RETURN_NOT_OK(ReadColumns(data, &static_projection, &non_static_projection, &existing_row)); |
841 | 133 | RETURN_NOT_OK(EvalCondition(request_.if_expr().condition(), existing_row, &should_apply)); |
842 | | // Set the response accordingly. |
843 | 133 | response_->set_applied(should_apply); |
844 | 133 | if (!should_apply && request_.else_error()47 ) { |
845 | 19 | return ql::ErrorStatus(ql::ErrorCode::CONDITION_NOT_SATISFIED); // QLError |
846 | 114 | } else if (request_.returns_status()) { |
847 | 12 | RETURN_NOT_OK(PopulateStatusRow(data, should_apply, existing_row, &rowblock_)); |
848 | 102 | } else { |
849 | 102 | RETURN_NOT_OK(PopulateConditionalDmlRow(data, |
850 | 102 | should_apply, |
851 | 102 | existing_row, |
852 | 102 | static_projection, |
853 | 102 | non_static_projection, |
854 | 102 | &rowblock_)); |
855 | 102 | } |
856 | | |
857 | | // If we do not need to apply we are already done. |
858 | 114 | if (!should_apply) { |
859 | 28 | response_->set_status(QLResponsePB::YQL_STATUS_OK); |
860 | 28 | return Status::OK(); |
861 | 28 | } |
862 | | |
863 | 86 | TEST_PAUSE_IF_FLAG(TEST_pause_write_apply_after_if); |
864 | 4.61M | } else if (RequireReadForExpressions(request_) || request_.returns_status()4.59M ) { |
865 | 27.4k | RETURN_NOT_OK(ReadColumns(data, nullptr, nullptr, &existing_row)); |
866 | 27.4k | if (request_.returns_status()) { |
867 | 259 | RETURN_NOT_OK(PopulateStatusRow(data, /* should_apply = */ true, existing_row, &rowblock_)); |
868 | 259 | } |
869 | 27.4k | } |
870 | | |
871 | 18.4E | VLOG(3) << "insert_into_unique_index_ is " << insert_into_unique_index_; |
872 | 4.61M | if (insert_into_unique_index_ && VERIFY_RESULT2.30k (HasDuplicateUniqueIndexValue(data))) { |
873 | 451 | VLOG(3) << "set_applied is set to " << false << " for over " << yb::ToString(existing_row)0 ; |
874 | 451 | response_->set_applied(false); |
875 | 451 | response_->set_status(QLResponsePB::YQL_STATUS_OK); |
876 | 451 | return Status::OK(); |
877 | 451 | } |
878 | | |
879 | 4.61M | const MonoDelta ttl = request_ttl(); |
880 | | |
881 | 4.61M | const UserTimeMicros user_timestamp = request_.has_user_timestamp_usec() ? |
882 | 4.61M | request_.user_timestamp_usec()58 : ValueControlFields::kInvalidUserTimestamp; |
883 | | |
884 | | // Initialize the new row being written to either the existing row if read, or just populate |
885 | | // the primary key. |
886 | 4.61M | QLTableRow new_row; |
887 | 4.61M | if (!existing_row.IsEmpty()) { |
888 | 12.4k | new_row = existing_row; |
889 | 4.60M | } else { |
890 | 4.60M | size_t idx = 0; |
891 | 4.60M | for (const QLExpressionPB& expr : request_.hashed_column_values()) { |
892 | 4.45M | new_row.AllocColumn(schema_->column_id(idx), expr.value()); |
893 | 4.45M | idx++; |
894 | 4.45M | } |
895 | 4.60M | for (const QLExpressionPB& expr : request_.range_column_values()) { |
896 | 2.82M | new_row.AllocColumn(schema_->column_id(idx), expr.value()); |
897 | 2.82M | idx++; |
898 | 2.82M | } |
899 | 4.60M | } |
900 | | |
901 | 4.61M | switch (request_.type()) { |
902 | | // QL insert == update (upsert) to be consistent with Cassandra's semantics. In either |
903 | | // INSERT or UPDATE, if non-key columns are specified, they will be inserted which will cause |
904 | | // the primary key to be inserted also when necessary. Otherwise, we should insert the |
905 | | // primary key at least. |
906 | 4.10M | case QLWriteRequestPB::QL_STMT_INSERT: |
907 | 4.61M | case QLWriteRequestPB::QL_STMT_UPDATE: { |
908 | | // Add the appropriate liveness column only for inserts. |
909 | | // We never use init markers for QL to ensure we perform writes without any reads to |
910 | | // ensure our write path is fast while complicating the read path a bit. |
911 | 4.61M | auto is_insert = request_.type() == QLWriteRequestPB::QL_STMT_INSERT; |
912 | 4.61M | if (is_insert && encoded_pk_doc_key_4.10M ) { |
913 | 4.10M | const DocPath sub_path(encoded_pk_doc_key_.as_slice(), PrimitiveValue::kLivenessColumn); |
914 | 4.10M | const auto control_fields = ValueControlFields { |
915 | 4.10M | .ttl = ttl, |
916 | 4.10M | .user_timestamp = user_timestamp, |
917 | 4.10M | }; |
918 | 4.10M | RETURN_NOT_OK(data.doc_write_batch->SetPrimitive( |
919 | 4.10M | sub_path, control_fields, ValueRef(ValueType::kNullLow), |
920 | 4.10M | data.read_time, data.deadline, request_.query_id())); |
921 | 4.10M | } |
922 | | |
923 | 4.61M | std::unordered_map<ColumnIdRep, vector<int>> col_map; |
924 | 11.3M | for (int idx = 0; idx < request_.column_values_size(); idx++6.74M ) { |
925 | 6.74M | const auto& column_value = request_.column_values(idx); |
926 | 6.74M | if (!column_value.has_column_id()) { |
927 | 0 | return STATUS_FORMAT(InvalidArgument, "column id missing: $0", |
928 | 0 | column_value.DebugString()); |
929 | 0 | } |
930 | 6.74M | const ColumnId column_id(column_value.column_id()); |
931 | 6.74M | const auto maybe_column = schema_->column_by_id(column_id); |
932 | 6.74M | RETURN_NOT_OK(maybe_column); |
933 | 6.74M | const ColumnSchema& column = *maybe_column; |
934 | | |
935 | 6.74M | DocPath sub_path( |
936 | 6.74M | column.is_static() ? |
937 | 6.73M | encoded_hashed_doc_key_.as_slice()10.3k : encoded_pk_doc_key_.as_slice(), |
938 | 6.74M | PrimitiveValue(column_id)); |
939 | | |
940 | 6.74M | QLValue expr_result; |
941 | 6.74M | if (!column_value.json_args().empty()) { |
942 | 185 | auto iter = col_map.find(column_value.column_id()); |
943 | 185 | if (iter == col_map.end()) { |
944 | | // record column id of jsonb column |
945 | 95 | col_map.emplace(column_value.column_id(), vector<int>()); |
946 | 95 | iter = col_map.find(column_value.column_id()); |
947 | 95 | } |
948 | 185 | iter->second.emplace_back(idx); |
949 | 6.74M | } else if (!column_value.subscript_args().empty()) { |
950 | 30 | RETURN_NOT_OK(ApplyForSubscriptArgs(column_value, existing_row, data, ttl, |
951 | 30 | user_timestamp, column, &sub_path)); |
952 | 6.74M | } else { |
953 | 6.74M | RETURN_NOT_OK(ApplyForRegularColumns(column_value, existing_row, data, sub_path, ttl, |
954 | 6.74M | user_timestamp, column, column_id, &new_row)); |
955 | 6.74M | } |
956 | 6.74M | } |
957 | 4.61M | for (const auto& entry : col_map) { |
958 | 94 | const ColumnId column_id(entry.first); |
959 | 94 | const auto maybe_column = schema_->column_by_id(column_id); |
960 | 94 | RETURN_NOT_OK(maybe_column); |
961 | 94 | const ColumnSchema& column = *maybe_column; |
962 | 94 | DocPath sub_path( |
963 | 94 | column.is_static() ? |
964 | 94 | encoded_hashed_doc_key_.as_slice()0 : encoded_pk_doc_key_.as_slice(), |
965 | 94 | PrimitiveValue(column_id)); |
966 | 94 | RETURN_NOT_OK(ApplyForJsonOperators( |
967 | 94 | column, entry.first, col_map, data, sub_path, ttl, user_timestamp, &new_row, |
968 | 94 | is_insert)); |
969 | 94 | } |
970 | | |
971 | 4.61M | if (update_indexes_) { |
972 | 26.9k | RETURN_NOT_OK(UpdateIndexes(existing_row, new_row)); |
973 | 26.9k | } |
974 | 4.61M | break; |
975 | 4.61M | } |
976 | 4.61M | case QLWriteRequestPB::QL_STMT_DELETE: { |
977 | | // We have three cases: |
978 | | // 1. If non-key columns are specified, we delete only those columns. |
979 | | // 2. Otherwise, if range cols are missing, this must be a range delete. |
980 | | // 3. Otherwise, this is a normal delete. |
981 | | // Analyzer ensures these are the only cases before getting here (e.g. range deletes cannot |
982 | | // specify non-key columns). |
983 | 5.90k | if (request_.column_values_size() > 0) { |
984 | | // Delete the referenced columns only. |
985 | 46 | for (const auto& column_value : request_.column_values()) { |
986 | 46 | CHECK(column_value.has_column_id()) |
987 | 0 | << "column id missing: " << column_value.DebugString(); |
988 | 46 | const ColumnId column_id(column_value.column_id()); |
989 | 46 | const auto& column = VERIFY_RESULT_REF(schema_->column_by_id(column_id)); |
990 | 0 | const DocPath sub_path( |
991 | 46 | column.is_static() ? |
992 | 45 | encoded_hashed_doc_key_.as_slice()1 : encoded_pk_doc_key_.as_slice(), |
993 | 46 | PrimitiveValue(column_id)); |
994 | 46 | RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(sub_path, |
995 | 46 | data.read_time, data.deadline, request_.query_id(), user_timestamp)); |
996 | 46 | if (update_indexes_) { |
997 | 39 | new_row.MarkTombstoned(column_id); |
998 | 39 | } |
999 | 46 | } |
1000 | 37 | if (update_indexes_) { |
1001 | 31 | RETURN_NOT_OK(UpdateIndexes(existing_row, new_row)); |
1002 | 31 | } |
1003 | 5.86k | } else if (IsRangeOperation(request_, *schema_)) { |
1004 | | // If the range columns are not specified, we read everything and delete all rows for |
1005 | | // which the where condition matches. |
1006 | | |
1007 | | // Create the schema projection -- range deletes cannot reference non-primary key columns, |
1008 | | // so the non-static projection is all we need, it should contain all referenced columns. |
1009 | 20 | Schema static_projection; |
1010 | 20 | Schema projection; |
1011 | 20 | RETURN_NOT_OK(CreateProjections(*schema_, request_.column_refs(), |
1012 | 20 | &static_projection, &projection)); |
1013 | | |
1014 | | // Construct the scan spec basing on the WHERE condition. |
1015 | 20 | vector<PrimitiveValue> hashed_components; |
1016 | 20 | RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues( |
1017 | 20 | request_.hashed_column_values(), *schema_, 0, |
1018 | 20 | schema_->num_hash_key_columns(), &hashed_components)); |
1019 | | |
1020 | 20 | boost::optional<int32_t> hash_code = request_.has_hash_code() |
1021 | 20 | ? boost::make_optional<int32_t>(request_.hash_code()) |
1022 | 20 | : boost::none0 ; |
1023 | 20 | const auto range_covers_whole_partition_key = !request_.has_where_expr(); |
1024 | 20 | const auto include_static_columns_in_scan = range_covers_whole_partition_key && |
1025 | 20 | schema_->has_statics()10 ; |
1026 | 20 | DocQLScanSpec spec(*schema_, |
1027 | 20 | hash_code, |
1028 | 20 | hash_code, // max hash code. |
1029 | 20 | hashed_components, |
1030 | 20 | request_.has_where_expr() ? &request_.where_expr().condition()10 : nullptr10 , |
1031 | 20 | nullptr, |
1032 | 20 | request_.query_id(), |
1033 | 20 | true /* is_forward_scan */, |
1034 | 20 | include_static_columns_in_scan); |
1035 | | |
1036 | | // Create iterator. |
1037 | 20 | DocRowwiseIterator iterator( |
1038 | 20 | projection, *schema_, txn_op_context_, |
1039 | 20 | data.doc_write_batch->doc_db(), |
1040 | 20 | data.deadline, data.read_time); |
1041 | 20 | RETURN_NOT_OK(iterator.Init(spec)); |
1042 | | |
1043 | | // Iterate through rows and delete those that match the condition. |
1044 | | // TODO(mihnea): We do not lock here, so other write transactions coming in might appear |
1045 | | // partially applied if they happen in the middle of a ranged delete. |
1046 | 263 | while (VERIFY_RESULT(iterator.HasNext()))20 { |
1047 | 263 | existing_row.Clear(); |
1048 | 263 | RETURN_NOT_OK(iterator.NextRow(&existing_row)); |
1049 | | |
1050 | | // Match the row with the where condition before deleting it. |
1051 | 263 | bool match = false; |
1052 | 263 | RETURN_NOT_OK(spec.Match(existing_row, &match)); |
1053 | 263 | if (match) { |
1054 | 260 | const DocPath row_path(iterator.row_key()); |
1055 | 260 | RETURN_NOT_OK(DeleteRow(row_path, data.doc_write_batch, data.read_time, data.deadline)); |
1056 | 260 | if (update_indexes_) { |
1057 | 7 | liveness_column_exists_ = iterator.LivenessColumnExists(); |
1058 | 7 | RETURN_NOT_OK(UpdateIndexes(existing_row, new_row)); |
1059 | 7 | } |
1060 | 260 | } |
1061 | 263 | } |
1062 | 20 | data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt()); |
1063 | 5.84k | } else { |
1064 | | // Otherwise, delete the referenced row (all columns). |
1065 | 5.84k | RETURN_NOT_OK(DeleteRow(DocPath(encoded_pk_doc_key_.as_slice()), data.doc_write_batch, |
1066 | 5.84k | data.read_time, data.deadline)); |
1067 | 5.84k | if (update_indexes_) { |
1068 | 209 | RETURN_NOT_OK(UpdateIndexes(existing_row, new_row)); |
1069 | 209 | } |
1070 | 5.84k | } |
1071 | 5.90k | break; |
1072 | 5.90k | } |
1073 | 4.61M | } |
1074 | | |
1075 | 4.62M | response_->set_status(QLResponsePB::YQL_STATUS_OK); |
1076 | | |
1077 | 4.62M | return Status::OK(); |
1078 | 4.61M | } |
1079 | | |
1080 | | Status QLWriteOperation::DeleteRow(const DocPath& row_path, DocWriteBatch* doc_write_batch, |
1081 | 6.10k | const ReadHybridTime& read_ht, const CoarseTimePoint deadline) { |
1082 | 6.10k | if (request_.has_user_timestamp_usec()) { |
1083 | | // If user_timestamp is provided, we need to add a tombstone for each individual |
1084 | | // column in the schema since we don't want to analyze this on the read path. |
1085 | 78 | for (auto i = schema_->num_key_columns(); i < schema_->num_columns(); i++52 ) { |
1086 | 52 | const DocPath sub_path(row_path.encoded_doc_key(), |
1087 | 52 | PrimitiveValue(schema_->column_id(i))); |
1088 | 52 | RETURN_NOT_OK(doc_write_batch->DeleteSubDoc(sub_path, |
1089 | 52 | read_ht, |
1090 | 52 | deadline, |
1091 | 52 | request_.query_id(), |
1092 | 52 | request_.user_timestamp_usec())); |
1093 | 52 | } |
1094 | | |
1095 | | // Delete the liveness column as well. |
1096 | 26 | const DocPath liveness_column(row_path.encoded_doc_key(), PrimitiveValue::kLivenessColumn); |
1097 | 26 | RETURN_NOT_OK(doc_write_batch->DeleteSubDoc(liveness_column, |
1098 | 26 | read_ht, |
1099 | 26 | deadline, |
1100 | 26 | request_.query_id(), |
1101 | 26 | request_.user_timestamp_usec())); |
1102 | 6.08k | } else { |
1103 | 6.08k | RETURN_NOT_OK(doc_write_batch->DeleteSubDoc(row_path, read_ht, deadline)); |
1104 | 6.08k | } |
1105 | | |
1106 | 6.10k | return Status::OK(); |
1107 | 6.10k | } |
1108 | | |
1109 | | namespace { |
1110 | | |
1111 | | YB_DEFINE_ENUM(ValueState, (kNull)(kNotNull)(kMissing)); |
1112 | | |
1113 | 1.97k | ValueState GetValueState(const QLTableRow& row, const ColumnId column_id) { |
1114 | 1.97k | const auto value = row.GetValue(column_id); |
1115 | 1.97k | return !value ? ValueState::kMissing4 : IsNull(*value)1.97k ? ValueState::kNull1.52k : ValueState::kNotNull450 ; |
1116 | 1.97k | } |
1117 | | |
1118 | | } // namespace |
1119 | | |
1120 | | Result<bool> QLWriteOperation::IsRowDeleted(const QLTableRow& existing_row, |
1121 | 48.0k | const QLTableRow& new_row) const { |
1122 | | // Delete the whole row? |
1123 | 48.0k | if (request_.type() == QLWriteRequestPB::QL_STMT_DELETE && request_.column_values().empty()550 ) { |
1124 | 311 | return true; |
1125 | 311 | } |
1126 | | |
1127 | 47.7k | if (existing_row.IsEmpty()) { // If the row doesn't exist, don't check further. |
1128 | 34.4k | return false; |
1129 | 34.4k | } |
1130 | | |
1131 | | // For update/delete, if there is no liveness column, the row will be deleted after the DML unless |
1132 | | // a non-null column still remains. |
1133 | 13.2k | if ((request_.type() == QLWriteRequestPB::QL_STMT_UPDATE || |
1134 | 13.2k | request_.type() == QLWriteRequestPB::QL_STMT_DELETE11.8k ) && |
1135 | 13.2k | !liveness_column_exists_1.61k ) { |
1136 | 1.25k | for (size_t idx = schema_->num_key_columns(); idx < schema_->num_columns(); idx++586 ) { |
1137 | 1.03k | if (schema_->column(idx).is_static()) { |
1138 | 2 | continue; |
1139 | 2 | } |
1140 | 1.03k | const ColumnId column_id = schema_->column_id(idx); |
1141 | 1.03k | switch (GetValueState(new_row, column_id)) { |
1142 | 582 | case ValueState::kNull: continue; |
1143 | 448 | case ValueState::kNotNull: return false; |
1144 | 2 | case ValueState::kMissing: break; |
1145 | 1.03k | } |
1146 | 2 | switch (GetValueState(existing_row, column_id)) { |
1147 | 0 | case ValueState::kNull: continue; |
1148 | 0 | case ValueState::kNotNull: return false; |
1149 | 2 | case ValueState::kMissing: break; |
1150 | 2 | } |
1151 | 2 | } |
1152 | | |
1153 | 218 | #if DCHECK_IS_ON() |
1154 | | // If (for all non_pk cols new_row has value NULL/kMissing i.e., the UPDATE statement only sets |
1155 | | // some/all cols to NULL) |
1156 | | // then (existing_row should have a value read from docdb for all non_pk |
1157 | | // cols that are kMissing in new_row so that we can decide if the row is deleted or not). |
1158 | | |
1159 | 218 | bool skip_check = false; |
1160 | 692 | for (size_t idx = schema_->num_key_columns(); idx < schema_->num_columns(); idx++474 ) { |
1161 | 474 | const ColumnId column_id = schema_->column_id(idx); |
1162 | 474 | if (GetValueState(new_row, column_id) == ValueState::kNotNull) skip_check = true2 ; |
1163 | 474 | } |
1164 | | |
1165 | 218 | if (!skip_check) { |
1166 | 684 | for (size_t idx = schema_->num_key_columns(); idx < schema_->num_columns(); idx++468 ) { |
1167 | 468 | const ColumnId column_id = schema_->column_id(idx); |
1168 | 468 | if (GetValueState(new_row, column_id) == ValueState::kMissing) { |
1169 | 0 | DCHECK(GetValueState(existing_row, column_id) != ValueState::kMissing); |
1170 | 0 | } |
1171 | 468 | } |
1172 | 216 | } |
1173 | 218 | #endif |
1174 | | |
1175 | 218 | return true; |
1176 | 666 | } |
1177 | | |
1178 | 12.5k | return false; |
1179 | 13.2k | } |
1180 | | |
1181 | 4.61M | MonoDelta QLWriteOperation::request_ttl() const { |
1182 | 4.61M | return request_.has_ttl() ? MonoDelta::FromMilliseconds(request_.ttl())121k |
1183 | 4.61M | : ValueControlFields::kMaxTtl4.49M ; |
1184 | 4.61M | } |
1185 | | |
1186 | | namespace { |
1187 | | |
1188 | 87.3k | QLExpressionPB* NewKeyColumn(QLWriteRequestPB* request, const IndexInfo& index, const size_t idx) { |
1189 | 87.3k | return (idx < index.hash_column_count() |
1190 | 87.3k | ? request->add_hashed_column_values()41.3k |
1191 | 87.3k | : request->add_range_column_values()45.9k ); |
1192 | 87.3k | } |
1193 | | |
1194 | | QLWriteRequestPB* NewIndexRequest( |
1195 | | const IndexInfo& index, |
1196 | | QLWriteRequestPB::QLStmtType type, |
1197 | 41.1k | vector<pair<const IndexInfo*, QLWriteRequestPB>>* index_requests) { |
1198 | 41.1k | index_requests->emplace_back(&index, QLWriteRequestPB()); |
1199 | 41.1k | QLWriteRequestPB* const request = &index_requests->back().second; |
1200 | 41.1k | request->set_type(type); |
1201 | 41.1k | return request; |
1202 | 41.1k | } |
1203 | | |
1204 | | } // namespace |
1205 | | |
1206 | 27.2k | Status QLWriteOperation::UpdateIndexes(const QLTableRow& existing_row, const QLTableRow& new_row) { |
1207 | | // Prepare the write requests to update the indexes. There should be at most 2 requests for each |
1208 | | // index (one insert and one delete). |
1209 | 27.2k | VLOG(2) << "Updating indexes"59 ; |
1210 | 27.2k | const auto& index_ids = request_.update_index_ids(); |
1211 | 27.2k | index_requests_.reserve(index_ids.size() * 2); |
1212 | 48.0k | for (const TableId& index_id : index_ids) { |
1213 | 48.0k | const IndexInfo* index = VERIFY_RESULT(index_map_.FindIndex(index_id)); |
1214 | 0 | bool index_key_changed = false; |
1215 | 48.0k | bool index_pred_existing_row = true; |
1216 | 48.0k | bool index_pred_new_row = true; |
1217 | 48.0k | bool is_row_deleted = VERIFY_RESULT(IsRowDeleted(existing_row, new_row)); |
1218 | | |
1219 | 48.0k | if (index->where_predicate_spec()) { |
1220 | 4.38k | RETURN_NOT_OK(EvalCondition( |
1221 | 4.38k | index->where_predicate_spec()->where_expr().condition(), existing_row, |
1222 | 4.38k | &index_pred_existing_row)); |
1223 | 4.38k | } |
1224 | | |
1225 | 48.0k | if (is_row_deleted) { |
1226 | | // If it is a partial index and predicate wasn't satisfied for the existing row |
1227 | | // which is being deleted, we need to do nothing. |
1228 | 529 | if (index->where_predicate_spec() && !index_pred_existing_row144 ) { |
1229 | 72 | VLOG(3) << "Skip index entry delete for index_id=" << index->table_id() << |
1230 | 0 | " since predicate not satisfied"; |
1231 | 72 | continue; |
1232 | 72 | } |
1233 | 457 | index_key_changed = true; |
1234 | 47.4k | } else { |
1235 | 47.4k | VERIFY_RESULT(CreateAndSetupIndexInsertRequest( |
1236 | 47.4k | this, index->HasWritePermission(), existing_row, new_row, index, |
1237 | 47.4k | &index_requests_, &index_key_changed, &index_pred_new_row, index_pred_existing_row)); |
1238 | 47.4k | } |
1239 | | |
1240 | 47.9k | bool index_pred_switched_to_false = false; |
1241 | 47.9k | if (index->where_predicate_spec() && |
1242 | 47.9k | !existing_row.IsEmpty()4.31k && index_pred_existing_row1.75k && !index_pred_new_row794 ) |
1243 | 290 | index_pred_switched_to_false = true; |
1244 | | |
1245 | | // If the index key is changed, delete the current key. |
1246 | 47.9k | if ((index_key_changed || index_pred_switched_to_false45.4k ) && index->HasDeletePermission()2.60k ) { |
1247 | 2.60k | if (!index_pred_switched_to_false) { |
1248 | | // 1. In case of a switch of predicate satisfiability to false, we surely have to delete the |
1249 | | // row. (Even if there wasn't any index key change). |
1250 | | // 2. But in case of an index key change without predicate satisfiability switch, if the |
1251 | | // index predicate was already false for the existing row, we have to do nothing. |
1252 | | // TODO(Piyush): Ensure EvalCondition returns an error if some column is missing. |
1253 | 2.31k | if (!index_pred_existing_row) { |
1254 | 506 | VLOG(3) << "Skip index entry delete of existing row for index_id=" << index->table_id() << |
1255 | 0 | " since predicate not satisfied"; |
1256 | 506 | continue; |
1257 | 506 | } |
1258 | 2.31k | } |
1259 | | |
1260 | 2.10k | QLWriteRequestPB* const index_request = |
1261 | 2.10k | NewIndexRequest(*index, QLWriteRequestPB::QL_STMT_DELETE, &index_requests_); |
1262 | 2.10k | VLOG(3) << "Issue index entry delete of existing row for index_id=" << index->table_id() << |
1263 | 0 | " since predicate was satisfied earlier AND (isn't satisfied now (OR) the key has changed)"; |
1264 | | |
1265 | 7.87k | for (size_t idx = 0; idx < index->key_column_count(); idx++5.77k ) { |
1266 | 5.77k | const auto& index_column = index->column(idx); |
1267 | 5.77k | QLExpressionPB *key_column = NewKeyColumn(index_request, *index, idx); |
1268 | | |
1269 | | // For old message expr_case() == NOT SET. |
1270 | | // For new message expr_case == kColumnId when indexing expression is a column-ref. |
1271 | 5.77k | if (index_column.colexpr.expr_case() != QLExpressionPB::ExprCase::EXPR_NOT_SET && |
1272 | 5.77k | index_column.colexpr.expr_case() != QLExpressionPB::ExprCase::kColumnId5.77k ) { |
1273 | 188 | QLExprResult result; |
1274 | 188 | RETURN_NOT_OK(EvalExpr(index_column.colexpr, existing_row, result.Writer())); |
1275 | 188 | result.MoveTo(key_column->mutable_value()); |
1276 | 5.58k | } else { |
1277 | 5.58k | auto result = existing_row.GetValue(index_column.indexed_column_id); |
1278 | 5.58k | if (result5.58k ) { |
1279 | 5.58k | key_column->mutable_value()->CopyFrom(*result); |
1280 | 5.58k | } |
1281 | 5.58k | } |
1282 | 5.77k | } |
1283 | 2.10k | } |
1284 | 47.9k | } |
1285 | | |
1286 | 27.2k | return Status::OK(); |
1287 | 27.2k | } |
1288 | | |
1289 | | Result<QLWriteRequestPB*> CreateAndSetupIndexInsertRequest( |
1290 | | QLExprExecutor* expr_executor, |
1291 | | bool index_has_write_permission, |
1292 | | const QLTableRow& existing_row, |
1293 | | const QLTableRow& new_row, |
1294 | | const IndexInfo* index, |
1295 | | vector<pair<const IndexInfo*, QLWriteRequestPB>>* index_requests, |
1296 | | bool* has_index_key_changed, |
1297 | | bool* index_pred_new_row, |
1298 | 52.4k | bool index_pred_existing_row) { |
1299 | 52.4k | bool index_key_changed = false; |
1300 | 52.4k | bool update_this_index = false; |
1301 | 52.4k | unordered_map<size_t, QLValuePB> values; |
1302 | | |
1303 | | // Prepare the new index key. |
1304 | 163k | for (size_t idx = 0; idx < index->key_column_count(); idx++111k ) { |
1305 | 111k | const auto& index_column = index->column(idx); |
1306 | 111k | bool column_changed = true; |
1307 | | |
1308 | | // Column_id should be used without executing "colexpr" for the following cases (we want |
1309 | | // to avoid executing colexpr as it is less efficient). |
1310 | | // - Old PROTO messages (expr_case() == NOT SET). |
1311 | | // - When indexing expression is just a column-ref (expr_case == kColumnId) |
1312 | 111k | if (index_column.colexpr.expr_case() == QLExpressionPB::ExprCase::EXPR_NOT_SET || |
1313 | 111k | index_column.colexpr.expr_case() == QLExpressionPB::ExprCase::kColumnId) { |
1314 | 107k | auto result = new_row.GetValue(index_column.indexed_column_id); |
1315 | 107k | if (!existing_row.IsEmpty()) { |
1316 | | // For each column in the index key, if there is a new value, see if the value is |
1317 | | // changed from the current value. Else, use the current value. |
1318 | 28.5k | if (result) { |
1319 | 28.4k | if (new_row.MatchColumn(index_column.indexed_column_id, existing_row)) { |
1320 | 26.5k | column_changed = false; |
1321 | 26.5k | } else { |
1322 | 1.90k | index_key_changed = true; |
1323 | 1.90k | } |
1324 | 28.4k | } else { |
1325 | | // TODO(Piyush): This else is possibly dead code. It can never happen that the new_row |
1326 | | // doesn't have some column but the existing one does because we copy the existing one |
1327 | | // into the new one before this function call. |
1328 | 90 | result = existing_row.GetValue(index_column.indexed_column_id); |
1329 | 90 | } |
1330 | 28.5k | } |
1331 | 107k | if (result) { |
1332 | 104k | values[idx] = std::move(*result); |
1333 | 104k | } |
1334 | 107k | } else { |
1335 | 3.84k | QLExprResult result; |
1336 | 3.84k | if (existing_row.IsEmpty()) { |
1337 | 3.57k | RETURN_NOT_OK(expr_executor->EvalExpr(index_column.colexpr, new_row, result.Writer())); |
1338 | 3.57k | } else { |
1339 | | // For each column in the index key, if there is a new value, see if the value is |
1340 | | // specified in the new value. Otherwise, use the current value. |
1341 | 335 | if (new_row.IsColumnSpecified(index_column.indexed_column_id)268 ) { |
1342 | 335 | RETURN_NOT_OK(expr_executor->EvalExpr(index_column.colexpr, new_row, result.Writer())); |
1343 | | // Compare new and existing results of the expression, if the results are equal |
1344 | | // that means the key is NOT changed in fact even if the column value is changed. |
1345 | 335 | QLExprResult existing_result; |
1346 | 335 | RETURN_NOT_OK(expr_executor->EvalExpr( |
1347 | 335 | index_column.colexpr, existing_row, existing_result.Writer())); |
1348 | 335 | if (result.Value() == existing_result.Value()) { |
1349 | 179 | column_changed = false; |
1350 | 179 | } else { |
1351 | 156 | index_key_changed = true; |
1352 | 156 | } |
1353 | 18.4E | } else { |
1354 | | // TODO(Piyush): This else is possibly dead code. |
1355 | 18.4E | RETURN_NOT_OK(expr_executor->EvalExpr( |
1356 | 18.4E | index_column.colexpr, existing_row, result.Writer())); |
1357 | 18.4E | } |
1358 | 268 | } |
1359 | | |
1360 | 3.84k | result.MoveTo(&values[idx]); |
1361 | 3.84k | } |
1362 | | |
1363 | 111k | if (column_changed) { |
1364 | 84.4k | update_this_index = true; |
1365 | 84.4k | } |
1366 | 111k | } |
1367 | | |
1368 | | // Prepare the covering columns. |
1369 | 67.3k | for (size_t idx = index->key_column_count(); 52.4k idx < index->columns().size(); idx++14.9k ) { |
1370 | 14.9k | const auto& index_column = index->column(idx); |
1371 | 14.9k | auto result = new_row.GetValue(index_column.indexed_column_id); |
1372 | 14.9k | bool column_changed = true; |
1373 | | |
1374 | | // If the index value is changed and there is no new covering column value set, use the |
1375 | | // current value. |
1376 | 14.9k | if (index_key_changed) { |
1377 | 2.77k | if (!result) { |
1378 | | // TODO(Piyush): This if is possibly dead code. |
1379 | 0 | result = existing_row.GetValue(index_column.indexed_column_id); |
1380 | 0 | } |
1381 | 12.1k | } else if (!FLAGS_ycql_disable_index_updating_optimization && |
1382 | 12.1k | result && new_row.MatchColumn(index_column.indexed_column_id, existing_row)12.1k ) { |
1383 | 2.19k | column_changed = false; |
1384 | 2.19k | } |
1385 | 14.9k | if (result) { |
1386 | 14.8k | values[idx] = std::move(*result); |
1387 | 14.8k | } |
1388 | | |
1389 | 14.9k | if (column_changed) { |
1390 | 12.7k | update_this_index = true; |
1391 | 12.7k | } |
1392 | 14.9k | } |
1393 | | |
1394 | 52.4k | if (has_index_key_changed) { |
1395 | 47.5k | *has_index_key_changed = index_key_changed; |
1396 | 47.5k | } |
1397 | | |
1398 | 52.4k | bool new_row_satisfies_idx_pred = true; |
1399 | 52.4k | if (index->where_predicate_spec()) { |
1400 | | // TODO(Piyush): Ensure EvalCondition returns an error if some column is missing. |
1401 | 4.39k | RETURN_NOT_OK(expr_executor->EvalCondition( |
1402 | 4.39k | index->where_predicate_spec()->where_expr().condition(), new_row, |
1403 | 4.39k | &new_row_satisfies_idx_pred)); |
1404 | 4.39k | if (index_pred_new_row) { |
1405 | 4.24k | *index_pred_new_row = new_row_satisfies_idx_pred; |
1406 | 4.24k | } |
1407 | | |
1408 | 4.39k | if (new_row_satisfies_idx_pred && !index_pred_existing_row2.42k ) { |
1409 | | // In case the row is unchanged but the predicate switches to true (can happen if the |
1410 | | // predicate involves no indexed/covering cols). |
1411 | 1.51k | if (!update_this_index) |
1412 | 49 | VLOG(3) << "Indexed/covering cols unchanged but predicate switched to true for index_id=" << |
1413 | 49 | index->table_id(); |
1414 | 1.51k | update_this_index = true; |
1415 | 1.51k | } |
1416 | 4.39k | } |
1417 | | |
1418 | 52.4k | if (index_has_write_permission && |
1419 | 52.4k | (51.2k update_this_index51.2k || FLAGS_ycql_disable_index_updating_optimization10.4k )) { |
1420 | | // If this is a partial index and the index predicate is false for the new row, skip the insert. |
1421 | 40.8k | if (index->where_predicate_spec() && !new_row_satisfies_idx_pred3.94k ) { |
1422 | 1.73k | VLOG(3) << "Skip index entry write for index_id=" << index->table_id() << |
1423 | 0 | " since predicate not satisfied"; |
1424 | 1.73k | return nullptr; |
1425 | 1.73k | } |
1426 | | |
1427 | 39.0k | QLWriteRequestPB* const index_request = |
1428 | 39.0k | NewIndexRequest(*index, QLWriteRequestPB::QL_STMT_INSERT, index_requests); |
1429 | | |
1430 | | // Setup the key columns. |
1431 | 120k | for (size_t idx = 0; idx < index->key_column_count(); idx++81.5k ) { |
1432 | 81.5k | QLExpressionPB* const key_column = NewKeyColumn(index_request, *index, idx); |
1433 | 81.5k | auto it = values.find(idx); |
1434 | 81.5k | if (it != values.end()) { |
1435 | 78.4k | *key_column->mutable_value() = std::move(it->second); |
1436 | 78.4k | } |
1437 | 81.5k | } |
1438 | | |
1439 | | // Setup the covering columns. |
1440 | 47.7k | for (size_t idx = index->key_column_count(); idx < index->columns().size(); idx++8.62k ) { |
1441 | 8.62k | auto it = values.find(idx); |
1442 | 8.62k | if (it != values.end()) { |
1443 | 8.59k | const auto& index_column = index->column(idx); |
1444 | 8.59k | QLColumnValuePB* const covering_column = index_request->add_column_values(); |
1445 | 8.59k | covering_column->set_column_id(index_column.column_id); |
1446 | 8.59k | *covering_column->mutable_expr()->mutable_value() = std::move(it->second); |
1447 | 8.59k | } |
1448 | 8.62k | } |
1449 | | |
1450 | 39.0k | return index_request; |
1451 | 40.8k | } |
1452 | | |
1453 | 11.6k | return nullptr; // The index updating was skipped. |
1454 | 52.4k | } |
1455 | | |
1456 | | Status QLReadOperation::Execute(const YQLStorageIf& ql_storage, |
1457 | | CoarseTimePoint deadline, |
1458 | | const ReadHybridTime& read_time, |
1459 | | const Schema& schema, |
1460 | | const Schema& projection, |
1461 | | QLResultSet* resultset, |
1462 | 7.51M | HybridTime* restart_read_ht) { |
1463 | 7.51M | SimulateTimeoutIfTesting(&deadline); |
1464 | 7.51M | size_t row_count_limit = std::numeric_limits<std::size_t>::max(); |
1465 | 7.51M | size_t num_rows_skipped = 0; |
1466 | 7.51M | size_t offset = 0; |
1467 | 7.51M | if (request_.has_offset()) { |
1468 | 1.69k | offset = request_.offset(); |
1469 | 1.69k | } |
1470 | 7.51M | if (request_.has_limit()) { |
1471 | 7.21M | if (request_.limit() == 0) { |
1472 | 0 | return Status::OK(); |
1473 | 0 | } |
1474 | 7.21M | row_count_limit = request_.limit(); |
1475 | 7.21M | } |
1476 | | |
1477 | | // Create the projections of the non-key columns selected by the row block plus any referenced in |
1478 | | // the WHERE condition. When DocRowwiseIterator::NextRow() populates the value map, it uses this |
1479 | | // projection only to scan sub-documents. The query schema is used to select only referenced |
1480 | | // columns and key columns. |
1481 | 7.51M | Schema static_projection, non_static_projection; |
1482 | 7.51M | RETURN_NOT_OK(CreateProjections(schema, request_.column_refs(), |
1483 | 7.51M | &static_projection, &non_static_projection)); |
1484 | 7.51M | const bool read_static_columns = !static_projection.columns().empty(); |
1485 | 7.51M | const bool read_distinct_columns = request_.distinct(); |
1486 | | |
1487 | 7.51M | std::unique_ptr<YQLRowwiseIteratorIf> iter; |
1488 | 7.51M | std::unique_ptr<QLScanSpec> spec, static_row_spec; |
1489 | 7.51M | RETURN_NOT_OK(ql_storage.BuildYQLScanSpec( |
1490 | 7.51M | request_, read_time, schema, read_static_columns, static_projection, &spec, |
1491 | 7.51M | &static_row_spec)); |
1492 | 7.51M | RETURN_NOT_OK(ql_storage.GetIterator(request_, projection, schema, txn_op_context_, |
1493 | 7.51M | deadline, read_time, *spec, &iter)); |
1494 | 7.51M | VTRACE(1, "Initialized iterator"); |
1495 | | |
1496 | 7.51M | QLTableRow static_row; |
1497 | 7.51M | QLTableRow non_static_row; |
1498 | 7.51M | QLTableRow& selected_row = read_distinct_columns ? static_row117 : non_static_row7.51M ; |
1499 | | |
1500 | | // In case when we are continuing a select with a paging state, or when using a reverse scan, |
1501 | | // the static columns for the next row to fetch are not included in the first iterator and we |
1502 | | // need to fetch them with a separate spec and iterator before beginning the normal fetch below. |
1503 | 7.51M | if (static_row_spec != nullptr) { |
1504 | 3 | std::unique_ptr<YQLRowwiseIteratorIf> static_row_iter; |
1505 | 3 | RETURN_NOT_OK(ql_storage.GetIterator( |
1506 | 3 | request_, static_projection, schema, txn_op_context_, deadline, read_time, |
1507 | 3 | *static_row_spec, &static_row_iter)); |
1508 | 3 | if (VERIFY_RESULT(static_row_iter->HasNext())) { |
1509 | 3 | RETURN_NOT_OK(static_row_iter->NextRow(&static_row)); |
1510 | 3 | } |
1511 | 3 | } |
1512 | | |
1513 | | // Begin the normal fetch. |
1514 | 7.51M | int match_count = 0; |
1515 | 7.51M | bool static_dealt_with = true; |
1516 | 18.6M | while (resultset->rsrow_count() < row_count_limit && VERIFY_RESULT11.5M (iter->HasNext())) { |
1517 | 11.1M | const bool last_read_static = iter->IsNextStaticColumn(); |
1518 | | |
1519 | | // Note that static columns are sorted before non-static columns in DocDB as follows. This is |
1520 | | // because "<empty_range_components>" is empty and terminated by kGroupEnd which sorts before |
1521 | | // all other ValueType characters in a non-empty range component. |
1522 | | // <hash_code><hash_components><empty_range_components><static_column_id> -> value; |
1523 | | // <hash_code><hash_components><range_components><non_static_column_id> -> value; |
1524 | 11.1M | if (last_read_static) { |
1525 | 163 | static_row.Clear(); |
1526 | 163 | RETURN_NOT_OK(iter->NextRow(static_projection, &static_row)); |
1527 | 11.1M | } else { // Reading a regular row that contains non-static columns. |
1528 | | // Read this regular row. |
1529 | | // TODO(omer): this is quite inefficient if read_distinct_column. A better way to do this |
1530 | | // would be to only read the first non-static column for each hash key, and skip the rest |
1531 | 11.1M | non_static_row.Clear(); |
1532 | 11.1M | RETURN_NOT_OK(iter->NextRow(non_static_projection, &non_static_row)); |
1533 | 11.1M | } |
1534 | | |
1535 | | // We have two possible cases: whether we use distinct or not |
1536 | | // If we use distinct, then in general we only need to add the static rows |
1537 | | // However, we might have to add non-static rows, if there is no static row corresponding to |
1538 | | // it. Of course, we add one entry per hash key in non-static row. |
1539 | | // If we do not use distinct, we are generally only adding non-static rows |
1540 | | // However, if there is no non-static row for the static row, we have to add it. |
1541 | 11.1M | if (read_distinct_columns) { |
1542 | 126 | bool join_successful = false; |
1543 | 126 | if (!last_read_static) { |
1544 | 78 | join_successful = JoinNonStaticRow(schema, static_projection, non_static_row, &static_row); |
1545 | 78 | } |
1546 | | |
1547 | | // If the join was not successful, it means that the non-static row we read has no |
1548 | | // corresponding static row, so we have to add it to the result |
1549 | 126 | if (!join_successful) { |
1550 | 58 | RETURN_NOT_OK(AddRowToResult( |
1551 | 58 | spec, static_row, row_count_limit, offset, resultset, &match_count, &num_rows_skipped)); |
1552 | 58 | } |
1553 | 11.1M | } else { |
1554 | 11.1M | if (last_read_static) { |
1555 | | // If the next row to be read is not static, deal with it later, as we do not know whether |
1556 | | // the non-static row corresponds to this static row; if the non-static row doesn't |
1557 | | // correspond to this static row, we will have to add it later, so set static_dealt_with to |
1558 | | // false |
1559 | 115 | if (VERIFY_RESULT(iter->HasNext()) && !iter->IsNextStaticColumn()98 ) { |
1560 | 92 | static_dealt_with = false; |
1561 | 92 | continue; |
1562 | 92 | } |
1563 | | |
1564 | 23 | AddProjection(non_static_projection, &static_row); |
1565 | 23 | RETURN_NOT_OK(AddRowToResult(spec, static_row, row_count_limit, offset, resultset, |
1566 | 23 | &match_count, &num_rows_skipped)); |
1567 | 11.1M | } else { |
1568 | | // We also have to do the join if we are not reading any static columns, as Cassandra |
1569 | | // reports nulls for static rows with no corresponding non-static row |
1570 | 11.1M | if (read_static_columns || !static_dealt_with11.0M ) { |
1571 | 346 | const bool join_successful = JoinStaticRow(schema, |
1572 | 346 | static_projection, |
1573 | 346 | static_row, |
1574 | 346 | &non_static_row); |
1575 | | // Add the static row if the join was not successful and it is the first time we are |
1576 | | // dealing with this static row |
1577 | 346 | if (!join_successful && !static_dealt_with0 ) { |
1578 | 0 | AddProjection(non_static_projection, &static_row); |
1579 | 0 | RETURN_NOT_OK(AddRowToResult( |
1580 | 0 | spec, static_row, row_count_limit, offset, resultset, &match_count, |
1581 | 0 | &num_rows_skipped)); |
1582 | 0 | } |
1583 | 346 | } |
1584 | 11.1M | static_dealt_with = true; |
1585 | 11.1M | RETURN_NOT_OK(AddRowToResult( |
1586 | 11.1M | spec, non_static_row, row_count_limit, offset, resultset, &match_count, |
1587 | 11.1M | &num_rows_skipped)); |
1588 | 11.1M | } |
1589 | 11.1M | } |
1590 | 11.1M | } |
1591 | | |
1592 | 7.51M | if (request_.is_aggregate() && match_count > 0202 ) { |
1593 | 163 | RETURN_NOT_OK(PopulateAggregate(selected_row, resultset)); |
1594 | 163 | } |
1595 | | |
1596 | 7.51M | VTRACE(1, "Fetched $0 rows.", resultset->rsrow_count()); |
1597 | | |
1598 | 7.51M | RETURN_NOT_OK(SetPagingStateIfNecessary( |
1599 | 7.51M | iter.get(), resultset, row_count_limit, num_rows_skipped, read_time)); |
1600 | | |
1601 | | // SetPagingStateIfNecessary could perform read, so we assign restart_read_ht after it. |
1602 | 7.51M | *restart_read_ht = iter->RestartReadHt(); |
1603 | | |
1604 | 7.51M | return Status::OK(); |
1605 | 7.51M | } |
1606 | | |
1607 | | Status QLReadOperation::SetPagingStateIfNecessary(const YQLRowwiseIteratorIf* iter, |
1608 | | const QLResultSet* resultset, |
1609 | | const size_t row_count_limit, |
1610 | | const size_t num_rows_skipped, |
1611 | 7.52M | const ReadHybridTime& read_time) { |
1612 | 7.52M | if ((resultset->rsrow_count() >= row_count_limit || request_.has_offset()373k ) && |
1613 | 7.52M | !request_.is_aggregate()7.13M ) { |
1614 | 7.13M | SubDocKey next_row_key; |
1615 | 7.13M | RETURN_NOT_OK(iter->GetNextReadSubDocKey(&next_row_key)); |
1616 | | // When the "limit" number of rows are returned and we are asked to return the paging state, |
1617 | | // return the partition key and row key of the next row to read in the paging state if there are |
1618 | | // still more rows to read. Otherwise, leave the paging state empty which means we are done |
1619 | | // reading from this tablet. |
1620 | 7.13M | if (request_.return_paging_state()) { |
1621 | 2.41k | if (!next_row_key.doc_key().empty()) { |
1622 | 786 | QLPagingStatePB* paging_state = response_.mutable_paging_state(); |
1623 | 786 | paging_state->set_next_partition_key( |
1624 | 786 | PartitionSchema::EncodeMultiColumnHashValue(next_row_key.doc_key().hash())); |
1625 | 786 | paging_state->set_next_row_key(next_row_key.Encode().ToStringBuffer()); |
1626 | 786 | paging_state->set_total_rows_skipped(request_.paging_state().total_rows_skipped() + |
1627 | 786 | num_rows_skipped); |
1628 | 1.62k | } else if (request_.has_offset()) { |
1629 | 1.60k | QLPagingStatePB* paging_state = response_.mutable_paging_state(); |
1630 | 1.60k | paging_state->set_total_rows_skipped(request_.paging_state().total_rows_skipped() + |
1631 | 1.60k | num_rows_skipped); |
1632 | 1.60k | } |
1633 | 2.41k | } |
1634 | 7.13M | if (response_.has_paging_state()) { |
1635 | 2.39k | if (FLAGS_ycql_consistent_transactional_paging) { |
1636 | 0 | read_time.AddToPB(response_.mutable_paging_state()); |
1637 | 2.39k | } else { |
1638 | | // Using SingleTime will help avoid read restarts on second page and later but will |
1639 | | // potentially produce stale results on those pages. |
1640 | 2.39k | auto per_row_consistent_read_time = ReadHybridTime::SingleTime(read_time.read); |
1641 | 2.39k | per_row_consistent_read_time.AddToPB(response_.mutable_paging_state()); |
1642 | 2.39k | } |
1643 | 2.39k | } |
1644 | 7.13M | } |
1645 | | |
1646 | 7.52M | return Status::OK(); |
1647 | 7.52M | } |
1648 | | |
1649 | 0 | Status QLReadOperation::GetIntents(const Schema& schema, KeyValueWriteBatchPB* out) { |
1650 | 0 | std::vector<PrimitiveValue> hashed_components; |
1651 | 0 | RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues( |
1652 | 0 | request_.hashed_column_values(), schema, 0, schema.num_hash_key_columns(), |
1653 | 0 | &hashed_components)); |
1654 | 0 | auto pair = out->mutable_read_pairs()->Add(); |
1655 | 0 | if (hashed_components.empty()) { |
1656 | | // Empty hashed components mean that we don't have primary key at all, but request |
1657 | | // could still contain hash_code as part of tablet routing. |
1658 | | // So we should ignore it. |
1659 | 0 | pair->set_key(std::string(1, ValueTypeAsChar::kGroupEnd)); |
1660 | 0 | } else { |
1661 | 0 | DocKey doc_key(request_.hash_code(), hashed_components); |
1662 | 0 | pair->set_key(doc_key.Encode().ToStringBuffer()); |
1663 | 0 | } |
1664 | 0 | pair->set_value(std::string(1, ValueTypeAsChar::kNullLow)); |
1665 | | // Wait policies make sense only for YSQL to support different modes like waiting, erroring out |
1666 | | // or skipping on intent conflict. YCQL behaviour matches WAIT_ERROR (see proto for details). |
1667 | 0 | out->set_wait_policy(WAIT_ERROR); |
1668 | 0 | return Status::OK(); |
1669 | 0 | } |
1670 | | |
1671 | | Status QLReadOperation::PopulateResultSet(const std::unique_ptr<QLScanSpec>& spec, |
1672 | | const QLTableRow& table_row, |
1673 | 10.9M | QLResultSet *resultset) { |
1674 | 10.9M | resultset->AllocateRow(); |
1675 | 10.9M | int rscol_index = 0; |
1676 | 45.3M | for (const QLExpressionPB& expr : request_.selected_exprs()) { |
1677 | 45.3M | QLExprResult value; |
1678 | 45.3M | RETURN_NOT_OK(EvalExpr(expr, table_row, value.Writer(), spec->schema())); |
1679 | 45.3M | resultset->AppendColumn(rscol_index, value.Value()); |
1680 | 45.3M | rscol_index++; |
1681 | 45.3M | } |
1682 | | |
1683 | 10.9M | return Status::OK(); |
1684 | 10.9M | } |
1685 | | |
1686 | 14.3k | Status QLReadOperation::EvalAggregate(const QLTableRow& table_row) { |
1687 | 14.3k | if (aggr_result_.empty()) { |
1688 | 163 | int column_count = request_.selected_exprs().size(); |
1689 | 163 | aggr_result_.resize(column_count); |
1690 | 163 | } |
1691 | | |
1692 | 14.3k | int aggr_index = 0; |
1693 | 15.1k | for (const QLExpressionPB& expr : request_.selected_exprs()) { |
1694 | 15.1k | RETURN_NOT_OK(EvalExpr(expr, table_row, aggr_result_[aggr_index++].Writer())); |
1695 | 15.1k | } |
1696 | 14.3k | return Status::OK(); |
1697 | 14.3k | } |
1698 | | |
1699 | 163 | Status QLReadOperation::PopulateAggregate(const QLTableRow& table_row, QLResultSet *resultset) { |
1700 | 163 | resultset->AllocateRow(); |
1701 | 163 | int column_count = request_.selected_exprs().size(); |
1702 | 559 | for (int rscol_index = 0; rscol_index < column_count; rscol_index++396 ) { |
1703 | 396 | resultset->AppendColumn(rscol_index, aggr_result_[rscol_index].Value()); |
1704 | 396 | } |
1705 | 163 | return Status::OK(); |
1706 | 163 | } |
1707 | | |
1708 | | Status QLReadOperation::AddRowToResult(const std::unique_ptr<QLScanSpec>& spec, |
1709 | | const QLTableRow& row, |
1710 | | const size_t row_count_limit, |
1711 | | const size_t offset, |
1712 | | QLResultSet* resultset, |
1713 | | int* match_count, |
1714 | 11.0M | size_t *num_rows_skipped) { |
1715 | 11.0M | VLOG(3) << __FUNCTION__ << " : " << yb::ToString(row)5.27k ; |
1716 | 11.0M | if (resultset->rsrow_count() < row_count_limit11.0M ) { |
1717 | 11.0M | bool match = false; |
1718 | 11.0M | RETURN_NOT_OK(spec->Match(row, &match)); |
1719 | 11.0M | if (match) { |
1720 | 10.9M | if (*num_rows_skipped >= offset) { |
1721 | 10.9M | (*match_count)++; |
1722 | 10.9M | if (request_.is_aggregate()) { |
1723 | 14.3k | RETURN_NOT_OK(EvalAggregate(row)); |
1724 | 10.8M | } else { |
1725 | 10.8M | RETURN_NOT_OK(PopulateResultSet(spec, row, resultset)); |
1726 | 10.8M | } |
1727 | 10.9M | } else { |
1728 | 86 | (*num_rows_skipped)++; |
1729 | 86 | } |
1730 | 10.9M | } |
1731 | 11.0M | } |
1732 | 11.0M | return Status::OK(); |
1733 | 11.0M | } |
1734 | | |
1735 | | } // namespace docdb |
1736 | | } // namespace yb |