YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/cql_operation.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/cql_operation.h"
15
16
#include <limits>
17
#include <memory>
18
#include <string>
19
#include <unordered_set>
20
#include <utility>
21
#include <vector>
22
23
#include "yb/bfpg/tserver_opcodes.h"
24
25
#include "yb/common/index.h"
26
#include "yb/common/index_column.h"
27
#include "yb/common/jsonb.h"
28
#include "yb/common/partition.h"
29
#include "yb/common/ql_protocol_util.h"
30
#include "yb/common/ql_resultset.h"
31
#include "yb/common/ql_rowblock.h"
32
#include "yb/common/ql_value.h"
33
34
#include "yb/docdb/doc_path.h"
35
#include "yb/docdb/doc_ql_scanspec.h"
36
#include "yb/docdb/doc_rowwise_iterator.h"
37
#include "yb/docdb/doc_write_batch.h"
38
#include "yb/docdb/docdb.pb.h"
39
#include "yb/docdb/docdb_debug.h"
40
#include "yb/docdb/docdb_rocksdb_util.h"
41
#include "yb/docdb/primitive_value_util.h"
42
#include "yb/docdb/ql_storage_interface.h"
43
44
#include "yb/util/debug-util.h"
45
#include "yb/util/flag_tags.h"
46
#include "yb/util/result.h"
47
#include "yb/util/status.h"
48
#include "yb/util/status_format.h"
49
#include "yb/util/trace.h"
50
51
#include "yb/yql/cql/ql/util/errcodes.h"
52
53
DEFINE_test_flag(bool, pause_write_apply_after_if, false,
54
                 "Pause application of QLWriteOperation after evaluating if condition.");
55
56
DEFINE_bool(ycql_consistent_transactional_paging, false,
57
            "Whether to enforce consistency of data returned for second page and beyond for YCQL "
58
            "queries on transactional tables. If true, read restart errors could be returned to "
59
            "prevent inconsistency. If false, no read restart errors are returned but the data may "
60
            "be stale. The latter is preferable for long scans. The data returned for the first "
61
            "page of results is never stale regardless of this flag.");
62
63
DEFINE_bool(ycql_disable_index_updating_optimization, false,
64
            "If true all secondary indexes must be updated even if the update does not change "
65
            "the index data.");
66
TAG_FLAG(ycql_disable_index_updating_optimization, advanced);
67
68
namespace yb {
69
namespace docdb {
70
71
using std::pair;
72
using std::unordered_map;
73
using std::unordered_set;
74
using std::vector;
75
76
namespace {
77
78
// Append dummy entries in schema to table_row
79
// TODO(omer): this should most probably be added somewhere else
80
23
void AddProjection(const Schema& schema, QLTableRow* table_row) {
81
47
  for (size_t i = 0; i < schema.num_columns(); 
i++24
) {
82
24
    const auto& column_id = schema.column_id(i);
83
24
    table_row->AllocColumn(column_id);
84
24
  }
85
23
}
86
87
// Create projection schemas of static and non-static columns from a rowblock projection schema
88
// (for read) and a WHERE / IF condition (for read / write). "schema" is the full table schema
89
// and "rowblock_schema" is the selected columns from which we are splitting into static and
90
// non-static column portions.
91
CHECKED_STATUS CreateProjections(const Schema& schema, const QLReferencedColumnsPB& column_refs,
92
7.57M
                                 Schema* static_projection, Schema* non_static_projection) {
93
  // The projection schemas are used to scan docdb.
94
7.57M
  unordered_set<ColumnId> static_columns, non_static_columns;
95
96
  // Add regular columns.
97
24.0M
  for (int32_t id : column_refs.ids()) {
98
24.0M
    const ColumnId column_id(id);
99
24.0M
    if (!schema.is_key_column(column_id)) {
100
9.05M
      non_static_columns.insert(column_id);
101
9.05M
    }
102
24.0M
  }
103
104
  // Add static columns.
105
7.57M
  for (int32_t id : column_refs.static_ids()) {
106
440
    const ColumnId column_id(id);
107
440
    static_columns.insert(column_id);
108
440
  }
109
110
7.57M
  RETURN_NOT_OK(
111
7.57M
      schema.CreateProjectionByIdsIgnoreMissing(
112
7.57M
          vector<ColumnId>(static_columns.begin(), static_columns.end()),
113
7.57M
          static_projection));
114
7.57M
  RETURN_NOT_OK(
115
7.57M
      schema.CreateProjectionByIdsIgnoreMissing(
116
7.57M
          vector<ColumnId>(non_static_columns.begin(), non_static_columns.end()),
117
7.57M
          non_static_projection));
118
119
7.57M
  return Status::OK();
120
7.57M
}
121
122
CHECKED_STATUS PopulateRow(const QLTableRow& table_row, const Schema& schema,
123
                           const size_t begin_idx, const size_t col_count,
124
66
                           QLRow* row, size_t *col_idx) {
125
144
  for (size_t i = begin_idx; i < begin_idx + col_count; 
i++78
) {
126
78
    RETURN_NOT_OK(table_row.GetValue(schema.column_id(i), row->mutable_column((*col_idx)++)));
127
78
  }
128
66
  return Status::OK();
129
66
}
130
131
CHECKED_STATUS PopulateRow(const QLTableRow& table_row, const Schema& projection,
132
44
                           QLRow* row, size_t* col_idx) {
133
44
  return PopulateRow(table_row, projection, 0, projection.num_columns(), row, col_idx);
134
44
}
135
136
// Outer join a static row with a non-static row.
137
// A join is successful if and only if for every hash key, the values in the static and the
138
// non-static row are either non-NULL and the same, or one of them is NULL. Therefore we say that
139
// a join is successful if the static row is empty, and in turn return true.
140
// Copies the entries from the static row into the non-static one.
141
bool JoinStaticRow(
142
    const Schema& schema, const Schema& static_projection, const QLTableRow& static_row,
143
346
    QLTableRow* non_static_row) {
144
  // The join is successful if the static row is empty
145
346
  if (static_row.IsEmpty()) {
146
11
    return true;
147
11
  }
148
149
  // Now we know that the static row is not empty. The non-static row cannot be empty, therefore
150
  // we know that both the static row and the non-static one have non-NULL entries for all
151
  // hash keys. Therefore if MatchColumn returns false, we know the join is unsuccessful.
152
  // TODO(neil)
153
  // - Need to assign TTL and WriteTime to their default values.
154
  // - Check if they should be compared and copied over. Most likely not needed as we don't allow
155
  //   selecting TTL and WriteTime for static columns.
156
  // - This copying function should be moved to QLTableRow class.
157
931
  
for (size_t i = 0; 335
i < schema.num_hash_key_columns();
i++596
) {
158
596
    if (!non_static_row->MatchColumn(schema.column_id(i), static_row)) {
159
0
      return false;
160
0
    }
161
596
  }
162
163
  // Join the static columns in the static row into the non-static row.
164
738
  
for (size_t i = 0; 335
i < static_projection.num_columns();
i++403
) {
165
403
    non_static_row->CopyColumn(static_projection.column_id(i), static_row);
166
403
  }
167
168
335
  return true;
169
335
}
170
171
// Join a non-static row with a static row.
172
// Returns true if the two rows match
173
bool JoinNonStaticRow(
174
    const Schema& schema, const Schema& static_projection, const QLTableRow& non_static_row,
175
78
    QLTableRow* static_row) {
176
78
  bool join_successful = true;
177
178
197
  for (size_t i = 0; i < schema.num_hash_key_columns(); 
i++119
) {
179
129
    if (!static_row->MatchColumn(schema.column_id(i), non_static_row)) {
180
10
      join_successful = false;
181
10
      break;
182
10
    }
183
129
  }
184
185
78
  if (!join_successful) {
186
10
    static_row->Clear();
187
16
    for (size_t i = 0; i < static_projection.num_columns(); 
i++6
) {
188
6
      static_row->AllocColumn(static_projection.column_id(i));
189
6
    }
190
191
20
    for (size_t i = 0; i < schema.num_hash_key_columns(); 
i++10
) {
192
10
      static_row->CopyColumn(schema.column_id(i), non_static_row);
193
10
    }
194
10
  }
195
78
  return join_successful;
196
78
}
197
198
CHECKED_STATUS FindMemberForIndex(const QLColumnValuePB& column_value,
199
                                  int index,
200
                                  rapidjson::Value* document,
201
                                  rapidjson::Value::MemberIterator* memberit,
202
                                  rapidjson::Value::ValueIterator* valueit,
203
                                  bool* last_elem_object,
204
209
                                  bool is_insert) {
205
209
  *last_elem_object = false;
206
207
209
  int64_t array_index;
208
209
  if (document->IsArray()) {
209
10
    util::VarInt varint;
210
10
    RETURN_NOT_OK(varint.DecodeFromComparable(
211
10
        column_value.json_args(index).operand().value().varint_value()));
212
8
    array_index = VERIFY_RESULT(varint.ToInt64());
213
214
8
    if (array_index >= document->GetArray().Size() || 
array_index < 06
) {
215
2
      return STATUS_SUBSTITUTE(QLError, "Array index out of bounds: ", array_index);
216
2
    }
217
6
    *valueit = document->Begin();
218
6
    std::advance(*valueit, array_index);
219
199
  } else if (document->IsObject()) {
220
199
    if (!is_insert) {
221
199
      util::VarInt varint;
222
199
      auto status =
223
199
        varint.DecodeFromComparable(column_value.json_args(index).operand().value().varint_value());
224
199
      if (status.ok()) {
225
2
        array_index = VERIFY_RESULT(varint.ToInt64());
226
2
        return STATUS_SUBSTITUTE(QLError, "Cannot use array index $0 to access object",
227
2
            array_index);
228
2
      }
229
199
    }
230
231
197
    *last_elem_object = true;
232
233
197
    const auto& member = column_value.json_args(index).operand().value().string_value().c_str();
234
197
    *memberit = document->FindMember(member);
235
197
    if (*memberit == document->MemberEnd()) {
236
103
      return STATUS_SUBSTITUTE(QLError, "Could not find member: ", member);
237
103
    }
238
197
  } else {
239
0
    return STATUS_SUBSTITUTE(QLError, "JSON field is invalid", column_value.ShortDebugString());
240
0
  }
241
100
  return Status::OK();
242
209
}
243
244
78
CHECKED_STATUS CheckUserTimestampForCollections(const UserTimeMicros user_timestamp) {
245
78
  if (user_timestamp != ValueControlFields::kInvalidUserTimestamp) {
246
0
    return STATUS(InvalidArgument, "User supplied timestamp is only allowed for "
247
0
        "replacing the whole collection");
248
0
  }
249
78
  return Status::OK();
250
78
}
251
252
} // namespace
253
254
QLWriteOperation::QLWriteOperation(std::reference_wrapper<const QLWriteRequestPB> request,
255
                                   std::shared_ptr<const Schema> schema,
256
                                   std::reference_wrapper<const IndexMap> index_map,
257
                                   const Schema* unique_index_key_schema,
258
                                   const TransactionOperationContext& txn_op_context)
259
    : DocOperationBase(request),
260
      schema_(std::move(schema)),
261
      index_map_(index_map),
262
      unique_index_key_schema_(unique_index_key_schema),
263
      txn_op_context_(txn_op_context)
264
4.64M
{}
265
266
4.64M
QLWriteOperation::~QLWriteOperation() = default;
267
268
4.64M
Status QLWriteOperation::Init(QLResponsePB* response) {
269
4.64M
  response_ = response;
270
4.64M
  insert_into_unique_index_ = request_.type() == QLWriteRequestPB::QL_STMT_INSERT &&
271
4.64M
                              
unique_index_key_schema_ != nullptr4.12M
;
272
4.64M
  require_read_ = RequireRead(request_, *schema_) || 
insert_into_unique_index_4.58M
273
4.64M
                  || 
!index_map_.empty()4.58M
;
274
4.64M
  update_indexes_ = !request_.update_index_ids().empty();
275
276
  // Determine if static / non-static columns are being written.
277
4.64M
  bool write_static_columns = false;
278
4.64M
  bool write_non_static_columns = false;
279
  // TODO(Amit): Remove the DVLOGS after backfill features stabilize.
280
4.64M
  DVLOG
(4) << "Processing request " << yb::ToString(request_)3.38k
;
281
6.76M
  for (const auto& column : request_.column_values()) {
282
6.76M
    DVLOG
(4) << "Looking at column : " << yb::ToString(column)915
;
283
6.76M
    auto schema_column = schema_->column_by_id(ColumnId(column.column_id()));
284
18.4E
    DVLOG(4) << "schema column : " << yb::ToString(schema_column);
285
6.76M
    RETURN_NOT_OK(schema_column);
286
6.76M
    if (schema_column->is_static()) {
287
10.3k
      write_static_columns = true;
288
6.75M
    } else {
289
6.75M
      write_non_static_columns = true;
290
6.75M
    }
291
6.76M
    if (write_static_columns && 
write_non_static_columns20.5k
) {
292
10.1k
      break;
293
10.1k
    }
294
6.76M
  }
295
296
4.64M
  bool is_range_operation = IsRangeOperation(request_, *schema_);
297
298
  // We need the hashed key if writing to the static columns, and need primary key if writing to
299
  // non-static columns or writing the full primary key (i.e. range columns are present or table
300
  // does not have range columns).
301
4.64M
  return InitializeKeys(
302
4.64M
      write_static_columns || 
is_range_operation4.63M
,
303
4.64M
      write_non_static_columns || 
!request_.range_column_values().empty()60.2k
||
304
4.64M
      
schema_->num_range_key_columns() == 019.6k
);
305
4.64M
}
306
307
4.67M
Status QLWriteOperation::InitializeKeys(const bool hashed_key, const bool primary_key) {
308
  // Populate the hashed and range components in the same order as they are in the table schema.
309
4.67M
  const auto& hashed_column_values = request_.hashed_column_values();
310
4.67M
  const auto& range_column_values = request_.range_column_values();
311
4.67M
  std::vector<PrimitiveValue> hashed_components;
312
4.67M
  std::vector<PrimitiveValue> range_components;
313
4.67M
  RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues(
314
4.67M
      hashed_column_values, *schema_, 0,
315
4.67M
      schema_->num_hash_key_columns(), &hashed_components));
316
4.67M
  RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues(
317
4.67M
      range_column_values, *schema_, schema_->num_hash_key_columns(),
318
4.67M
      schema_->num_range_key_columns(), &range_components));
319
320
  // need_pk - true is we should construct pk_key_key_
321
4.67M
  
const bool need_pk = 4.67M
primary_key4.67M
&& !pk_doc_key_;
322
323
  // We need the hash key if writing to the static columns.
324
4.67M
  if (hashed_key && 
!hashed_doc_key_10.2k
) {
325
10.2k
    if (need_pk) {
326
10.1k
      hashed_doc_key_.emplace(request_.hash_code(), hashed_components);
327
10.1k
    } else {
328
75
      hashed_doc_key_.emplace(request_.hash_code(), std::move(hashed_components));
329
75
    }
330
10.2k
    encoded_hashed_doc_key_ = hashed_doc_key_->EncodeAsRefCntPrefix();
331
10.2k
  }
332
333
  // We need the primary key if writing to non-static columns or writing the full primary key
334
  // (i.e. range columns are present).
335
4.67M
  if (need_pk) {
336
4.64M
    if (request_.has_hash_code() && 
!hashed_column_values.empty()4.22M
) {
337
4.22M
      pk_doc_key_.emplace(
338
4.22M
         request_.hash_code(), std::move(hashed_components), std::move(range_components));
339
4.22M
    } else {
340
      // In case of syscatalog tables, we don't have any hash components.
341
420k
      pk_doc_key_.emplace(std::move(range_components));
342
420k
    }
343
4.64M
    encoded_pk_doc_key_ =  pk_doc_key_->EncodeAsRefCntPrefix();
344
4.64M
  }
345
346
4.67M
  return Status::OK();
347
4.67M
}
348
349
Status QLWriteOperation::GetDocPaths(
350
4.69M
    GetDocPathsMode mode, DocPathsToLock *paths, IsolationLevel *level) const {
351
4.69M
  if (mode == GetDocPathsMode::kLock || 
request_.column_values().empty()175k
||
!index_map_.empty()139k
) {
352
4.60M
    if (encoded_hashed_doc_key_) {
353
282
      paths->push_back(encoded_hashed_doc_key_);
354
282
    }
355
4.60M
    if (encoded_pk_doc_key_) {
356
4.59M
      paths->push_back(encoded_pk_doc_key_);
357
4.59M
    }
358
4.60M
  } else {
359
89.8k
    KeyBytes buffer;
360
93.8k
    for (const auto& column_value : request_.column_values()) {
361
93.8k
      ColumnId column_id(column_value.column_id());
362
93.8k
      const ColumnSchema& column = VERIFY_RESULT(schema_->column_by_id(column_id));
363
364
93.8k
      Slice doc_key = column.is_static() ? 
encoded_hashed_doc_key_.as_slice()2
365
93.8k
                                         : 
encoded_pk_doc_key_.as_slice()93.8k
;
366
93.8k
      buffer.Clear();
367
93.8k
      buffer.AppendValueType(ValueType::kColumnId);
368
93.8k
      buffer.AppendColumnId(column_id);
369
93.8k
      RefCntBuffer path(doc_key.size() + buffer.size());
370
93.8k
      memcpy(path.data(), doc_key.data(), doc_key.size());
371
93.8k
      buffer.AsSlice().CopyTo(path.data() + doc_key.size());
372
93.8k
      paths->push_back(RefCntPrefix(path));
373
93.8k
    }
374
89.8k
  }
375
376
  // When this write operation requires a read, it requires a read snapshot so paths will be locked
377
  // in snapshot isolation for consistency. Otherwise, pure writes will happen in serializable
378
  // isolation so that they will serialize but do not conflict with one another.
379
  //
380
  // Currently, only keys that are being written are locked, no lock is taken on read at the
381
  // snapshot isolation level.
382
4.69M
  *level = require_read_ ? 
IsolationLevel::SNAPSHOT_ISOLATION106k
383
4.69M
                         : 
IsolationLevel::SERIALIZABLE_ISOLATION4.58M
;
384
385
4.69M
  return Status::OK();
386
4.69M
}
387
388
Status QLWriteOperation::ReadColumns(const DocOperationApplyData& data,
389
                                     Schema *param_static_projection,
390
                                     Schema *param_non_static_projection,
391
27.5k
                                     QLTableRow* table_row) {
392
27.5k
  Schema *static_projection = param_static_projection;
393
27.5k
  Schema *non_static_projection = param_non_static_projection;
394
395
27.5k
  Schema local_static_projection;
396
27.5k
  Schema local_non_static_projection;
397
27.5k
  if (static_projection == nullptr) {
398
27.5k
    static_projection = &local_static_projection;
399
27.5k
  }
400
27.5k
  if (non_static_projection == nullptr) {
401
27.5k
    non_static_projection = &local_non_static_projection;
402
27.5k
  }
403
404
  // Create projections to scan docdb.
405
27.5k
  RETURN_NOT_OK(CreateProjections(*schema_, request_.column_refs(),
406
27.5k
                                  static_projection, non_static_projection));
407
408
  // Generate hashed / primary key depending on if static / non-static columns are referenced in
409
  // the if-condition.
410
27.5k
  RETURN_NOT_OK(InitializeKeys(
411
27.5k
      !static_projection->columns().empty(), !non_static_projection->columns().empty()));
412
413
  // Scan docdb for the static and non-static columns of the row using the hashed / primary key.
414
27.5k
  if (hashed_doc_key_) {
415
28
    DocQLScanSpec spec(*static_projection, *hashed_doc_key_, request_.query_id());
416
28
    DocRowwiseIterator iterator(*static_projection, *schema_, txn_op_context_,
417
28
                                data.doc_write_batch->doc_db(),
418
28
                                data.deadline, data.read_time);
419
28
    RETURN_NOT_OK(iterator.Init(spec));
420
28
    if (VERIFY_RESULT(iterator.HasNext())) {
421
22
      RETURN_NOT_OK(iterator.NextRow(table_row));
422
22
    }
423
28
    data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt());
424
28
  }
425
27.6k
  
if (27.5k
pk_doc_key_27.5k
) {
426
27.6k
    DocQLScanSpec spec(*non_static_projection, *pk_doc_key_, request_.query_id());
427
27.6k
    DocRowwiseIterator iterator(*non_static_projection, *schema_, txn_op_context_,
428
27.6k
                                data.doc_write_batch->doc_db(),
429
27.6k
                                data.deadline, data.read_time);
430
27.6k
    RETURN_NOT_OK(iterator.Init(spec));
431
27.6k
    if (VERIFY_RESULT(iterator.HasNext())) {
432
12.5k
      RETURN_NOT_OK(iterator.NextRow(table_row));
433
      // If there are indexes to update, check if liveness column exists for update/delete because
434
      // that will affect whether the row will still exist after the DML and whether we need to
435
      // remove the key from the indexes.
436
12.5k
      if (update_indexes_ && 
(12.2k
request_.type() == QLWriteRequestPB::QL_STMT_UPDATE12.2k
||
437
12.2k
                              
request_.type() == QLWriteRequestPB::QL_STMT_DELETE11.2k
)) {
438
1.25k
        liveness_column_exists_ = iterator.LivenessColumnExists();
439
1.25k
      }
440
15.0k
    } else {
441
      // If no non-static column is found, the row does not exist and we should clear the static
442
      // columns in the map to indicate the row does not exist.
443
15.0k
      table_row->Clear();
444
15.0k
    }
445
27.6k
    data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt());
446
27.6k
  }
447
448
27.5k
  return Status::OK();
449
27.5k
}
450
451
Status QLWriteOperation::PopulateConditionalDmlRow(const DocOperationApplyData& data,
452
                                                   const bool should_apply,
453
                                                   const QLTableRow& table_row,
454
                                                   Schema static_projection,
455
                                                   Schema non_static_projection,
456
102
                                                   std::unique_ptr<QLRowBlock>* rowblock) {
457
  // Populate the result set to return the "applied" status, and optionally the hash / primary key
458
  // and the present column values if the condition is not satisfied and the row does exist
459
  // (value_map is not empty).
460
102
  const bool return_present_values = !should_apply && 
!table_row.IsEmpty()23
;
461
102
  const size_t num_key_columns =
462
102
      pk_doc_key_ ? 
schema_->num_key_columns()98
:
schema_->num_hash_key_columns()4
;
463
102
  std::vector<ColumnSchema> columns;
464
102
  columns.emplace_back(ColumnSchema("[applied]", BOOL));
465
102
  if (return_present_values) {
466
22
    columns.insert(columns.end(), schema_->columns().begin(),
467
22
                   schema_->columns().begin() + num_key_columns);
468
22
    columns.insert(columns.end(), static_projection.columns().begin(),
469
22
                   static_projection.columns().end());
470
22
    columns.insert(columns.end(), non_static_projection.columns().begin(),
471
22
                   non_static_projection.columns().end());
472
22
  }
473
102
  rowblock->reset(new QLRowBlock(Schema(columns, 0)));
474
102
  QLRow& row = rowblock->get()->Extend();
475
102
  row.mutable_column(0)->set_bool_value(should_apply);
476
102
  size_t col_idx = 1;
477
102
  if (return_present_values) {
478
22
    RETURN_NOT_OK(PopulateRow(table_row, *schema_, 0, num_key_columns, &row, &col_idx));
479
22
    RETURN_NOT_OK(PopulateRow(table_row, static_projection, &row, &col_idx));
480
22
    RETURN_NOT_OK(PopulateRow(table_row, non_static_projection, &row, &col_idx));
481
22
  }
482
483
102
  return Status::OK();
484
102
}
485
486
Status QLWriteOperation::PopulateStatusRow(const DocOperationApplyData& data,
487
                                           const bool should_apply,
488
                                           const QLTableRow& table_row,
489
271
                                           std::unique_ptr<QLRowBlock>* rowblock) {
490
271
  std::vector<ColumnSchema> columns;
491
271
  columns.emplace_back(ColumnSchema("[applied]", BOOL));
492
271
  columns.emplace_back(ColumnSchema("[message]", STRING));
493
271
  columns.insert(columns.end(), schema_->columns().begin(), schema_->columns().end());
494
495
271
  rowblock->reset(new QLRowBlock(Schema(columns, 0)));
496
271
  QLRow& row = rowblock->get()->Extend();
497
271
  row.mutable_column(0)->set_bool_value(should_apply);
498
  // No message unless there is an error (then message will be set in executor).
499
500
  // If not applied report the existing row values as for regular if clause.
501
271
  if (!should_apply) {
502
25
    for (size_t i = 0; i < schema_->num_columns(); 
i++20
) {
503
20
      boost::optional<const QLValuePB&> col_val = table_row.GetValue(schema_->column_id(i));
504
20
      if (col_val.is_initialized()) {
505
16
        *(row.mutable_column(i + 2)) = *col_val;
506
16
      }
507
20
    }
508
5
  }
509
510
271
  return Status::OK();
511
271
}
512
513
// Check if a duplicate value is inserted into a unique index.
514
2.31k
Result<bool> QLWriteOperation::HasDuplicateUniqueIndexValue(const DocOperationApplyData& data) {
515
2.31k
  VLOG(3) << "Looking for collisions in\n"
516
3
          << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db());
517
  // We need to check backwards only for backfilled entries.
518
2.31k
  bool ret =
519
2.31k
      VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kForward)) ||
520
2.31k
      
(1.85k
request_.is_backfill()1.85k
&&
521
1.85k
       VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kBackward)));
522
2.31k
  if (!ret) {
523
18.4E
    VLOG(3) << "No collisions found";
524
1.85k
  }
525
2.31k
  return ret;
526
2.31k
}
527
528
Result<bool> QLWriteOperation::HasDuplicateUniqueIndexValue(
529
2.85k
    const DocOperationApplyData& data, Direction direction) {
530
2.85k
  VLOG(2) << "Looking for collision while going " << yb::ToString(direction)
531
3
          << ". Trying to insert " << *pk_doc_key_;
532
2.85k
  auto requested_read_time = data.read_time;
533
2.85k
  if (direction == Direction::kForward) {
534
2.31k
    return HasDuplicateUniqueIndexValue(data, requested_read_time);
535
2.31k
  }
536
537
539
  auto iter = CreateIntentAwareIterator(
538
539
      data.doc_write_batch->doc_db(),
539
539
      BloomFilterMode::USE_BLOOM_FILTER,
540
539
      pk_doc_key_->Encode().AsSlice(),
541
539
      request_.query_id(),
542
539
      txn_op_context_,
543
539
      data.deadline,
544
539
      ReadHybridTime::Max());
545
546
539
  HybridTime oldest_past_min_ht = VERIFY_RESULT(FindOldestOverwrittenTimestamp(
547
539
      iter.get(), SubDocKey(*pk_doc_key_), requested_read_time.read));
548
0
  const HybridTime oldest_past_min_ht_liveness =
549
539
      VERIFY_RESULT(FindOldestOverwrittenTimestamp(
550
539
          iter.get(),
551
539
          SubDocKey(*pk_doc_key_, PrimitiveValue::kLivenessColumn),
552
539
          requested_read_time.read));
553
0
  oldest_past_min_ht.MakeAtMost(oldest_past_min_ht_liveness);
554
539
  if (!oldest_past_min_ht.is_valid()) {
555
533
    return false;
556
533
  }
557
6
  return HasDuplicateUniqueIndexValue(
558
6
      data, ReadHybridTime::SingleTime(oldest_past_min_ht));
559
539
}
560
561
Result<bool> QLWriteOperation::HasDuplicateUniqueIndexValue(
562
2.31k
    const DocOperationApplyData& data, ReadHybridTime read_time) {
563
  // Set up the iterator to read the current primary key associated with the index key.
564
2.31k
  DocQLScanSpec spec(*unique_index_key_schema_, *pk_doc_key_, request_.query_id(), true);
565
2.31k
  DocRowwiseIterator iterator(
566
2.31k
      *unique_index_key_schema_,
567
2.31k
      *schema_,
568
2.31k
      txn_op_context_,
569
2.31k
      data.doc_write_batch->doc_db(),
570
2.31k
      data.deadline,
571
2.31k
      read_time);
572
2.31k
  RETURN_NOT_OK(iterator.Init(spec));
573
574
  // It is a duplicate value if the index key exists already and the index value (corresponding to
575
  // the indexed table's primary key) is not the same.
576
2.31k
  if (!VERIFY_RESULT(iterator.HasNext())) {
577
1.78k
    VLOG
(2) << "No collision found while checking at " << yb::ToString(read_time)1
;
578
1.78k
    return false;
579
1.78k
  }
580
531
  QLTableRow table_row;
581
531
  RETURN_NOT_OK(iterator.NextRow(&table_row));
582
531
  std::unordered_set<ColumnId> key_column_ids(unique_index_key_schema_->column_ids().begin(),
583
531
                                              unique_index_key_schema_->column_ids().end());
584
1.79k
  for (const auto& column_value : request_.column_values()) {
585
1.79k
    ColumnId column_id(column_value.column_id());
586
1.79k
    if (key_column_ids.count(column_id) > 0) {
587
1.64k
      boost::optional<const QLValuePB&> existing_value = table_row.GetValue(column_id);
588
1.64k
      const QLValuePB& new_value = column_value.expr().value();
589
1.64k
      if (existing_value && *existing_value != new_value) {
590
451
        VLOG(2) << "Found collision while checking at " << yb::ToString(read_time)
591
0
                << "\nExisting: " << yb::ToString(*existing_value)
592
0
                << " vs New: " << yb::ToString(new_value)
593
0
                << "\nUsed read time as " << yb::ToString(data.read_time);
594
451
        DVLOG(3) << "DocDB is now:\n"
595
0
                 << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db());
596
451
        return true;
597
451
      }
598
1.64k
    }
599
1.79k
  }
600
601
18.4E
  VLOG(2) << "No collision while checking at " << yb::ToString(read_time);
602
80
  return false;
603
531
}
604
605
Result<HybridTime> QLWriteOperation::FindOldestOverwrittenTimestamp(
606
    IntentAwareIterator* iter,
607
    const SubDocKey& sub_doc_key,
608
1.08k
    HybridTime min_read_time) {
609
1.08k
  HybridTime result;
610
1.08k
  VLOG
(3) << "Doing iter->Seek " << *pk_doc_key_6
;
611
1.08k
  iter->Seek(*pk_doc_key_);
612
1.08k
  if (iter->valid()) {
613
984
    const KeyBytes bytes = sub_doc_key.EncodeWithoutHt();
614
984
    const Slice& sub_key_slice = bytes.AsSlice();
615
984
    result = VERIFY_RESULT(
616
0
        iter->FindOldestRecord(sub_key_slice, min_read_time));
617
2
    VLOG(2) << "iter->FindOldestRecord returned " << result << " for "
618
2
            << SubDocKey::DebugSliceToString(sub_key_slice);
619
984
  } else {
620
98
    VLOG
(3) << "iter->Seek " << *pk_doc_key_ << " turned out to be invalid"0
;
621
98
  }
622
1.08k
  return result;
623
1.08k
}
624
625
Status QLWriteOperation::ApplyForJsonOperators(
626
    const ColumnSchema& column,
627
    const ColumnIdRep col_id,
628
    const std::unordered_map<ColumnIdRep, vector<int>>& col_map,
629
    const DocOperationApplyData& data,
630
    const DocPath& sub_path,
631
    const MonoDelta& ttl,
632
    const UserTimeMicros& user_timestamp,
633
    QLTableRow* existing_row,
634
94
    bool is_insert) {
635
94
  using common::Jsonb;
636
94
  rapidjson::Document document;
637
94
  QLValue qlv;
638
94
  bool read_needed = true;
639
182
  for (int idx : col_map.find(col_id)->second) {
640
182
    const auto& column_value = request_.column_values(idx);
641
182
    if (column_value.column_id() != col_id) 
continue0
;
642
182
    if (read_needed) {
643
      // Read the json column value in order to perform a read modify write.
644
94
      QLExprResult temp;
645
94
      RETURN_NOT_OK(existing_row->ReadColumn(col_id, temp.Writer()));
646
94
      const auto& ql_value = temp.Value();
647
94
      if (!IsNull(ql_value)) {
648
63
        Jsonb jsonb(std::move(ql_value.jsonb_value()));
649
63
        RETURN_NOT_OK(jsonb.ToRapidJson(&document));
650
63
      } else {
651
31
        if (!is_insert && column_value.json_args_size() > 1) {
652
1
          return STATUS_SUBSTITUTE(QLError, "JSON path depth should be 1 for upsert",
653
1
            column_value.ShortDebugString());
654
1
        }
655
30
        common::Jsonb empty_jsonb;
656
30
        RETURN_NOT_OK(empty_jsonb.FromString("{}"));
657
30
        QLTableColumn& column = existing_row->AllocColumn(column_value.column_id());
658
30
        column.value.set_jsonb_value(empty_jsonb.MoveSerializedJsonb());
659
660
30
        Jsonb jsonb(column.value.jsonb_value());
661
30
        RETURN_NOT_OK(jsonb.ToRapidJson(&document));
662
30
      }
663
94
    }
664
181
    read_needed = false;
665
666
    // Deserialize the rhs.
667
181
    Jsonb rhs(std::move(column_value.expr().value().jsonb_value()));
668
181
    rapidjson::Document rhs_doc(&document.GetAllocator());
669
181
    RETURN_NOT_OK(rhs.ToRapidJson(&rhs_doc));
670
671
    // Update the json value.
672
179
    rapidjson::Value::MemberIterator memberit;
673
179
    rapidjson::Value::ValueIterator valueit;
674
179
    bool last_elem_object;
675
179
    rapidjson::Value* node = &document;
676
677
179
    int i = 0;
678
179
    auto status = FindMemberForIndex(column_value, i, node, &memberit, &valueit,
679
179
        &last_elem_object, is_insert);
680
209
    for (i = 1; i < column_value.json_args_size() && 
status.ok()34
;
i++30
) {
681
30
      node = (last_elem_object) ? 
&(memberit->value)25
:
&(*valueit)5
;
682
30
      status = FindMemberForIndex(column_value, i, node, &memberit, &valueit,
683
30
          &last_elem_object, is_insert);
684
30
    }
685
686
179
    bool update_missing = false;
687
179
    if (is_insert) {
688
0
      RETURN_NOT_OK(status);
689
179
    } else {
690
179
      update_missing = !status.ok();
691
179
    }
692
693
179
    if (update_missing) {
694
      // NOTE: lhs path cannot exceed by more than one hop
695
112
      if (last_elem_object && 
i == column_value.json_args_size()106
) {
696
105
        auto val = column_value.json_args(i - 1).operand().value().string_value();
697
105
        rapidjson::Value v(
698
105
            val.c_str(), narrow_cast<rapidjson::SizeType>(val.size()), document.GetAllocator());
699
105
        node->AddMember(v, rhs_doc, document.GetAllocator());
700
105
      } else {
701
7
        RETURN_NOT_OK(status);
702
7
      }
703
112
    } else 
if (67
last_elem_object67
) {
704
66
      memberit->value = rhs_doc.Move();
705
66
    } else {
706
1
      *valueit = rhs_doc.Move();
707
1
    }
708
179
  } // end of column processing
709
  // Now write the new json value back.
710
84
  Jsonb jsonb_result;
711
84
  RETURN_NOT_OK(jsonb_result.FromRapidJson(document));
712
  // Update the current row as well so that we can accumulate the result of multiple json
713
  // operations and write the final value.
714
84
  *(qlv.mutable_jsonb_value()) = std::move(jsonb_result.MoveSerializedJsonb());
715
716
84
  existing_row->AllocColumn(col_id).value = qlv.value();
717
718
84
  ValueRef value_ref(qlv.value(), column.sorting_type(), yb::bfql::TSOpcode::kScalarInsert);
719
84
  RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
720
84
      sub_path, value_ref, data.read_time, data.deadline, request_.query_id(), ttl,
721
84
      user_timestamp));
722
723
84
  return Status::OK();
724
84
}
725
726
Status QLWriteOperation::ApplyForSubscriptArgs(const QLColumnValuePB& column_value,
727
                                               const QLTableRow& existing_row,
728
                                               const DocOperationApplyData& data,
729
                                               const MonoDelta& ttl,
730
                                               const UserTimeMicros& user_timestamp,
731
                                               const ColumnSchema& column,
732
30
                                               DocPath* sub_path) {
733
30
  QLExprResult expr_result;
734
30
  RETURN_NOT_OK(EvalExpr(column_value.expr(), existing_row, expr_result.Writer()));
735
30
  ValueRef value(
736
30
      expr_result.Value(), column.sorting_type(), GetTSWriteInstruction(column_value.expr()));
737
30
  RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp));
738
739
  // Setting the value for a sub-column
740
  // Currently we only support two cases here: `map['key'] = v` and `list[index] = v`)
741
  // Any other case should be rejected by the semantic analyser before getting here
742
  // Later when we support frozen or nested collections this code may need refactoring
743
30
  DCHECK_EQ(column_value.subscript_args().size(), 1);
744
30
  DCHECK
(column_value.subscript_args(0).has_value()) << "An index must be a constant"0
;
745
30
  switch (column.type()->main()) {
746
15
    case MAP: {
747
15
      const PrimitiveValue &pv = PrimitiveValue::FromQLValuePB(
748
15
          column_value.subscript_args(0).value(),
749
15
          SortingType::kNotSpecified);
750
15
      sub_path->AddSubKey(pv);
751
15
      RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
752
15
          *sub_path, value, data.read_time, data.deadline,
753
15
          request_.query_id(), ttl, user_timestamp));
754
15
      break;
755
15
    }
756
15
    case LIST: {
757
15
      MonoDelta default_ttl = schema_->table_properties().HasDefaultTimeToLive() ?
758
0
          MonoDelta::FromMilliseconds(schema_->table_properties().DefaultTimeToLive()) :
759
15
          MonoDelta::kMax;
760
761
15
      int target_cql_index = column_value.subscript_args(0).value().int32_value();
762
15
      RETURN_NOT_OK(data.doc_write_batch->ReplaceCqlInList(
763
15
          *sub_path, target_cql_index, value, data.read_time, data.deadline, request_.query_id(),
764
15
          default_ttl, ttl));
765
10
      break;
766
15
    }
767
10
    default: {
768
0
      LOG(ERROR) << "Unexpected type for setting subcolumn: "
769
0
                 << column.type()->ToString();
770
0
    }
771
30
  }
772
25
  return Status::OK();
773
30
}
774
775
Status QLWriteOperation::ApplyForRegularColumns(const QLColumnValuePB& column_value,
776
                                                const QLTableRow& existing_row,
777
                                                const DocOperationApplyData& data,
778
                                                const DocPath& sub_path, const MonoDelta& ttl,
779
                                                const UserTimeMicros& user_timestamp,
780
                                                const ColumnSchema& column,
781
                                                const ColumnId& column_id,
782
6.73M
                                                QLTableRow* new_row) {
783
6.73M
  using yb::bfql::TSOpcode;
784
785
  // Typical case, setting a columns value
786
6.73M
  QLExprResult expr_result;
787
6.73M
  RETURN_NOT_OK(EvalExpr(column_value.expr(), existing_row, expr_result.Writer()));
788
6.73M
  auto write_instruction = GetTSWriteInstruction(column_value.expr());
789
6.73M
  ValueRef value(expr_result.Value(), column.sorting_type(), write_instruction);
790
6.73M
  switch (write_instruction) {
791
0
    case TSOpcode::kToJson: FALLTHROUGH_INTENDED;
792
6.73M
    case TSOpcode::kScalarInsert:
793
6.73M
          RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
794
6.73M
              sub_path, value, data.read_time, data.deadline,
795
6.73M
              request_.query_id(), ttl, user_timestamp));
796
6.73M
      break;
797
6.73M
    case TSOpcode::kMapExtend:
798
19
    case TSOpcode::kSetExtend:
799
24
    case TSOpcode::kMapRemove:
800
29
    case TSOpcode::kSetRemove:
801
29
          RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp));
802
29
          RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
803
29
            sub_path, value, data.read_time, data.deadline, request_.query_id(), ttl));
804
29
      break;
805
29
    case TSOpcode::kListPrepend:
806
5
          value.set_list_extend_order(ListExtendOrder::PREPEND_BLOCK);
807
5
          FALLTHROUGH_INTENDED;
808
14
    case TSOpcode::kListAppend:
809
14
          RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp));
810
14
          RETURN_NOT_OK(data.doc_write_batch->ExtendList(
811
14
              sub_path, value, data.read_time, data.deadline, request_.query_id(), ttl));
812
14
      break;
813
14
    case TSOpcode::kListRemove:
814
      // TODO(akashnil or mihnea) this should call RemoveFromList once thats implemented
815
      // Currently list subtraction is computed in memory using builtin call so this
816
      // case should never be reached. Once it is implemented the corresponding case
817
      // from EvalQLExpressionPB should be uncommented to enable this optimization.
818
5
          RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp));
819
5
          RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
820
5
              sub_path, value, data.read_time, data.deadline,
821
5
              request_.query_id(), ttl, user_timestamp));
822
5
      break;
823
5
    default:
824
0
      LOG(FATAL) << "Unsupported operation: " << static_cast<int>(write_instruction);
825
0
      break;
826
6.73M
  }
827
828
6.74M
  if (update_indexes_) {
829
37.7k
    new_row->AllocColumn(column_id, expr_result.Value());
830
37.7k
  }
831
6.74M
  return Status::OK();
832
6.73M
}
833
834
4.61M
Status QLWriteOperation::Apply(const DocOperationApplyData& data) {
835
4.61M
  QLTableRow existing_row;
836
4.61M
  if (request_.has_if_expr()) {
837
    // Check if the if-condition is satisfied.
838
133
    bool should_apply = true;
839
133
    Schema static_projection, non_static_projection;
840
133
    RETURN_NOT_OK(ReadColumns(data, &static_projection, &non_static_projection, &existing_row));
841
133
    RETURN_NOT_OK(EvalCondition(request_.if_expr().condition(), existing_row, &should_apply));
842
    // Set the response accordingly.
843
133
    response_->set_applied(should_apply);
844
133
    if (!should_apply && 
request_.else_error()47
) {
845
19
      return ql::ErrorStatus(ql::ErrorCode::CONDITION_NOT_SATISFIED); // QLError
846
114
    } else if (request_.returns_status()) {
847
12
      RETURN_NOT_OK(PopulateStatusRow(data, should_apply, existing_row, &rowblock_));
848
102
    } else {
849
102
      RETURN_NOT_OK(PopulateConditionalDmlRow(data,
850
102
          should_apply,
851
102
          existing_row,
852
102
          static_projection,
853
102
          non_static_projection,
854
102
          &rowblock_));
855
102
    }
856
857
    // If we do not need to apply we are already done.
858
114
    if (!should_apply) {
859
28
      response_->set_status(QLResponsePB::YQL_STATUS_OK);
860
28
      return Status::OK();
861
28
    }
862
863
86
    TEST_PAUSE_IF_FLAG(TEST_pause_write_apply_after_if);
864
4.61M
  } else if (RequireReadForExpressions(request_) || 
request_.returns_status()4.59M
) {
865
27.4k
    RETURN_NOT_OK(ReadColumns(data, nullptr, nullptr, &existing_row));
866
27.4k
    if (request_.returns_status()) {
867
259
      RETURN_NOT_OK(PopulateStatusRow(data, /* should_apply = */ true, existing_row, &rowblock_));
868
259
    }
869
27.4k
  }
870
871
18.4E
  VLOG(3) << "insert_into_unique_index_ is " << insert_into_unique_index_;
872
4.61M
  if (insert_into_unique_index_ && 
VERIFY_RESULT2.30k
(HasDuplicateUniqueIndexValue(data))) {
873
451
    VLOG
(3) << "set_applied is set to " << false << " for over " << yb::ToString(existing_row)0
;
874
451
    response_->set_applied(false);
875
451
    response_->set_status(QLResponsePB::YQL_STATUS_OK);
876
451
    return Status::OK();
877
451
  }
878
879
4.61M
  const MonoDelta ttl = request_ttl();
880
881
4.61M
  const UserTimeMicros user_timestamp = request_.has_user_timestamp_usec() ?
882
4.61M
      
request_.user_timestamp_usec()58
: ValueControlFields::kInvalidUserTimestamp;
883
884
  // Initialize the new row being written to either the existing row if read, or just populate
885
  // the primary key.
886
4.61M
  QLTableRow new_row;
887
4.61M
  if (!existing_row.IsEmpty()) {
888
12.4k
    new_row = existing_row;
889
4.60M
  } else {
890
4.60M
    size_t idx = 0;
891
4.60M
    for (const QLExpressionPB& expr : request_.hashed_column_values()) {
892
4.45M
      new_row.AllocColumn(schema_->column_id(idx), expr.value());
893
4.45M
      idx++;
894
4.45M
    }
895
4.60M
    for (const QLExpressionPB& expr : request_.range_column_values()) {
896
2.82M
      new_row.AllocColumn(schema_->column_id(idx), expr.value());
897
2.82M
      idx++;
898
2.82M
    }
899
4.60M
  }
900
901
4.61M
  switch (request_.type()) {
902
    // QL insert == update (upsert) to be consistent with Cassandra's semantics. In either
903
    // INSERT or UPDATE, if non-key columns are specified, they will be inserted which will cause
904
    // the primary key to be inserted also when necessary. Otherwise, we should insert the
905
    // primary key at least.
906
4.10M
    case QLWriteRequestPB::QL_STMT_INSERT:
907
4.61M
    case QLWriteRequestPB::QL_STMT_UPDATE: {
908
      // Add the appropriate liveness column only for inserts.
909
      // We never use init markers for QL to ensure we perform writes without any reads to
910
      // ensure our write path is fast while complicating the read path a bit.
911
4.61M
      auto is_insert = request_.type() == QLWriteRequestPB::QL_STMT_INSERT;
912
4.61M
      if (is_insert && 
encoded_pk_doc_key_4.10M
) {
913
4.10M
        const DocPath sub_path(encoded_pk_doc_key_.as_slice(), PrimitiveValue::kLivenessColumn);
914
4.10M
        const auto control_fields = ValueControlFields {
915
4.10M
          .ttl = ttl,
916
4.10M
          .user_timestamp = user_timestamp,
917
4.10M
        };
918
4.10M
        RETURN_NOT_OK(data.doc_write_batch->SetPrimitive(
919
4.10M
            sub_path, control_fields, ValueRef(ValueType::kNullLow),
920
4.10M
            data.read_time, data.deadline, request_.query_id()));
921
4.10M
      }
922
923
4.61M
      std::unordered_map<ColumnIdRep, vector<int>> col_map;
924
11.3M
      for (int idx = 0; idx < request_.column_values_size(); 
idx++6.74M
) {
925
6.74M
        const auto& column_value = request_.column_values(idx);
926
6.74M
        if (!column_value.has_column_id()) {
927
0
          return STATUS_FORMAT(InvalidArgument, "column id missing: $0",
928
0
                               column_value.DebugString());
929
0
        }
930
6.74M
        const ColumnId column_id(column_value.column_id());
931
6.74M
        const auto maybe_column = schema_->column_by_id(column_id);
932
6.74M
        RETURN_NOT_OK(maybe_column);
933
6.74M
        const ColumnSchema& column = *maybe_column;
934
935
6.74M
        DocPath sub_path(
936
6.74M
            column.is_static() ?
937
6.73M
                
encoded_hashed_doc_key_.as_slice()10.3k
: encoded_pk_doc_key_.as_slice(),
938
6.74M
            PrimitiveValue(column_id));
939
940
6.74M
        QLValue expr_result;
941
6.74M
        if (!column_value.json_args().empty()) {
942
185
          auto iter = col_map.find(column_value.column_id());
943
185
          if (iter == col_map.end()) {
944
            // record column id of jsonb column
945
95
            col_map.emplace(column_value.column_id(), vector<int>());
946
95
            iter = col_map.find(column_value.column_id());
947
95
          }
948
185
          iter->second.emplace_back(idx);
949
6.74M
        } else if (!column_value.subscript_args().empty()) {
950
30
          RETURN_NOT_OK(ApplyForSubscriptArgs(column_value, existing_row, data, ttl,
951
30
                                              user_timestamp, column, &sub_path));
952
6.74M
        } else {
953
6.74M
          RETURN_NOT_OK(ApplyForRegularColumns(column_value, existing_row, data, sub_path, ttl,
954
6.74M
                                               user_timestamp, column, column_id, &new_row));
955
6.74M
        }
956
6.74M
      }
957
4.61M
      for (const auto& entry : col_map) {
958
94
        const ColumnId column_id(entry.first);
959
94
        const auto maybe_column = schema_->column_by_id(column_id);
960
94
        RETURN_NOT_OK(maybe_column);
961
94
        const ColumnSchema& column = *maybe_column;
962
94
        DocPath sub_path(
963
94
            column.is_static() ?
964
94
                
encoded_hashed_doc_key_.as_slice()0
: encoded_pk_doc_key_.as_slice(),
965
94
            PrimitiveValue(column_id));
966
94
        RETURN_NOT_OK(ApplyForJsonOperators(
967
94
            column, entry.first, col_map, data, sub_path, ttl, user_timestamp, &new_row,
968
94
            is_insert));
969
94
      }
970
971
4.61M
      if (update_indexes_) {
972
26.9k
        RETURN_NOT_OK(UpdateIndexes(existing_row, new_row));
973
26.9k
      }
974
4.61M
      break;
975
4.61M
    }
976
4.61M
    case QLWriteRequestPB::QL_STMT_DELETE: {
977
      // We have three cases:
978
      // 1. If non-key columns are specified, we delete only those columns.
979
      // 2. Otherwise, if range cols are missing, this must be a range delete.
980
      // 3. Otherwise, this is a normal delete.
981
      // Analyzer ensures these are the only cases before getting here (e.g. range deletes cannot
982
      // specify non-key columns).
983
5.90k
      if (request_.column_values_size() > 0) {
984
        // Delete the referenced columns only.
985
46
        for (const auto& column_value : request_.column_values()) {
986
46
          CHECK(column_value.has_column_id())
987
0
              << "column id missing: " << column_value.DebugString();
988
46
          const ColumnId column_id(column_value.column_id());
989
46
          const auto& column = VERIFY_RESULT_REF(schema_->column_by_id(column_id));
990
0
          const DocPath sub_path(
991
46
              column.is_static() ?
992
45
                
encoded_hashed_doc_key_.as_slice()1
: encoded_pk_doc_key_.as_slice(),
993
46
              PrimitiveValue(column_id));
994
46
          RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(sub_path,
995
46
              data.read_time, data.deadline, request_.query_id(), user_timestamp));
996
46
          if (update_indexes_) {
997
39
            new_row.MarkTombstoned(column_id);
998
39
          }
999
46
        }
1000
37
        if (update_indexes_) {
1001
31
          RETURN_NOT_OK(UpdateIndexes(existing_row, new_row));
1002
31
        }
1003
5.86k
      } else if (IsRangeOperation(request_, *schema_)) {
1004
        // If the range columns are not specified, we read everything and delete all rows for
1005
        // which the where condition matches.
1006
1007
        // Create the schema projection -- range deletes cannot reference non-primary key columns,
1008
        // so the non-static projection is all we need, it should contain all referenced columns.
1009
20
        Schema static_projection;
1010
20
        Schema projection;
1011
20
        RETURN_NOT_OK(CreateProjections(*schema_, request_.column_refs(),
1012
20
            &static_projection, &projection));
1013
1014
        // Construct the scan spec basing on the WHERE condition.
1015
20
        vector<PrimitiveValue> hashed_components;
1016
20
        RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues(
1017
20
            request_.hashed_column_values(), *schema_, 0,
1018
20
            schema_->num_hash_key_columns(), &hashed_components));
1019
1020
20
        boost::optional<int32_t> hash_code = request_.has_hash_code()
1021
20
                                             ? boost::make_optional<int32_t>(request_.hash_code())
1022
20
                                             : 
boost::none0
;
1023
20
        const auto range_covers_whole_partition_key = !request_.has_where_expr();
1024
20
        const auto include_static_columns_in_scan = range_covers_whole_partition_key &&
1025
20
                                                    
schema_->has_statics()10
;
1026
20
        DocQLScanSpec spec(*schema_,
1027
20
                           hash_code,
1028
20
                           hash_code, // max hash code.
1029
20
                           hashed_components,
1030
20
                           request_.has_where_expr() ? 
&request_.where_expr().condition()10
:
nullptr10
,
1031
20
                           nullptr,
1032
20
                           request_.query_id(),
1033
20
                           true /* is_forward_scan */,
1034
20
                           include_static_columns_in_scan);
1035
1036
        // Create iterator.
1037
20
        DocRowwiseIterator iterator(
1038
20
            projection, *schema_, txn_op_context_,
1039
20
            data.doc_write_batch->doc_db(),
1040
20
            data.deadline, data.read_time);
1041
20
        RETURN_NOT_OK(iterator.Init(spec));
1042
1043
        // Iterate through rows and delete those that match the condition.
1044
        // TODO(mihnea): We do not lock here, so other write transactions coming in might appear
1045
        // partially applied if they happen in the middle of a ranged delete.
1046
263
        
while (VERIFY_RESULT(iterator.HasNext()))20
{
1047
263
          existing_row.Clear();
1048
263
          RETURN_NOT_OK(iterator.NextRow(&existing_row));
1049
1050
          // Match the row with the where condition before deleting it.
1051
263
          bool match = false;
1052
263
          RETURN_NOT_OK(spec.Match(existing_row, &match));
1053
263
          if (match) {
1054
260
            const DocPath row_path(iterator.row_key());
1055
260
            RETURN_NOT_OK(DeleteRow(row_path, data.doc_write_batch, data.read_time, data.deadline));
1056
260
            if (update_indexes_) {
1057
7
              liveness_column_exists_ = iterator.LivenessColumnExists();
1058
7
              RETURN_NOT_OK(UpdateIndexes(existing_row, new_row));
1059
7
            }
1060
260
          }
1061
263
        }
1062
20
        data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt());
1063
5.84k
      } else {
1064
        // Otherwise, delete the referenced row (all columns).
1065
5.84k
        RETURN_NOT_OK(DeleteRow(DocPath(encoded_pk_doc_key_.as_slice()), data.doc_write_batch,
1066
5.84k
                                data.read_time, data.deadline));
1067
5.84k
        if (update_indexes_) {
1068
209
          RETURN_NOT_OK(UpdateIndexes(existing_row, new_row));
1069
209
        }
1070
5.84k
      }
1071
5.90k
      break;
1072
5.90k
    }
1073
4.61M
  }
1074
1075
4.62M
  response_->set_status(QLResponsePB::YQL_STATUS_OK);
1076
1077
4.62M
  return Status::OK();
1078
4.61M
}
1079
1080
Status QLWriteOperation::DeleteRow(const DocPath& row_path, DocWriteBatch* doc_write_batch,
1081
6.10k
                                   const ReadHybridTime& read_ht, const CoarseTimePoint deadline) {
1082
6.10k
  if (request_.has_user_timestamp_usec()) {
1083
    // If user_timestamp is provided, we need to add a tombstone for each individual
1084
    // column in the schema since we don't want to analyze this on the read path.
1085
78
    for (auto i = schema_->num_key_columns(); i < schema_->num_columns(); 
i++52
) {
1086
52
      const DocPath sub_path(row_path.encoded_doc_key(),
1087
52
                             PrimitiveValue(schema_->column_id(i)));
1088
52
      RETURN_NOT_OK(doc_write_batch->DeleteSubDoc(sub_path,
1089
52
                                                  read_ht,
1090
52
                                                  deadline,
1091
52
                                                  request_.query_id(),
1092
52
                                                  request_.user_timestamp_usec()));
1093
52
    }
1094
1095
    // Delete the liveness column as well.
1096
26
    const DocPath liveness_column(row_path.encoded_doc_key(), PrimitiveValue::kLivenessColumn);
1097
26
    RETURN_NOT_OK(doc_write_batch->DeleteSubDoc(liveness_column,
1098
26
                                                read_ht,
1099
26
                                                deadline,
1100
26
                                                request_.query_id(),
1101
26
                                                request_.user_timestamp_usec()));
1102
6.08k
  } else {
1103
6.08k
    RETURN_NOT_OK(doc_write_batch->DeleteSubDoc(row_path, read_ht, deadline));
1104
6.08k
  }
1105
1106
6.10k
  return Status::OK();
1107
6.10k
}
1108
1109
namespace {
1110
1111
YB_DEFINE_ENUM(ValueState, (kNull)(kNotNull)(kMissing));
1112
1113
1.97k
ValueState GetValueState(const QLTableRow& row, const ColumnId column_id) {
1114
1.97k
  const auto value = row.GetValue(column_id);
1115
1.97k
  return !value ? 
ValueState::kMissing4
:
IsNull(*value)1.97k
?
ValueState::kNull1.52k
:
ValueState::kNotNull450
;
1116
1.97k
}
1117
1118
} // namespace
1119
1120
Result<bool> QLWriteOperation::IsRowDeleted(const QLTableRow& existing_row,
1121
48.0k
                                            const QLTableRow& new_row) const {
1122
  // Delete the whole row?
1123
48.0k
  if (request_.type() == QLWriteRequestPB::QL_STMT_DELETE && 
request_.column_values().empty()550
) {
1124
311
    return true;
1125
311
  }
1126
1127
47.7k
  if (existing_row.IsEmpty()) { // If the row doesn't exist, don't check further.
1128
34.4k
    return false;
1129
34.4k
  }
1130
1131
  // For update/delete, if there is no liveness column, the row will be deleted after the DML unless
1132
  // a non-null column still remains.
1133
13.2k
  if ((request_.type() == QLWriteRequestPB::QL_STMT_UPDATE ||
1134
13.2k
       
request_.type() == QLWriteRequestPB::QL_STMT_DELETE11.8k
) &&
1135
13.2k
      
!liveness_column_exists_1.61k
) {
1136
1.25k
    for (size_t idx = schema_->num_key_columns(); idx < schema_->num_columns(); 
idx++586
) {
1137
1.03k
      if (schema_->column(idx).is_static()) {
1138
2
        continue;
1139
2
      }
1140
1.03k
      const ColumnId column_id = schema_->column_id(idx);
1141
1.03k
      switch (GetValueState(new_row, column_id)) {
1142
582
        case ValueState::kNull: continue;
1143
448
        case ValueState::kNotNull: return false;
1144
2
        case ValueState::kMissing: break;
1145
1.03k
      }
1146
2
      switch (GetValueState(existing_row, column_id)) {
1147
0
        case ValueState::kNull: continue;
1148
0
        case ValueState::kNotNull: return false;
1149
2
        case ValueState::kMissing: break;
1150
2
      }
1151
2
    }
1152
1153
218
    #if DCHECK_IS_ON()
1154
    // If (for all non_pk cols new_row has value NULL/kMissing i.e., the UPDATE statement only sets
1155
    //     some/all cols to NULL)
1156
    // then (existing_row should have a value read from docdb for all non_pk
1157
    //       cols that are kMissing in new_row so that we can decide if the row is deleted or not).
1158
1159
218
    bool skip_check = false;
1160
692
    for (size_t idx = schema_->num_key_columns(); idx < schema_->num_columns(); 
idx++474
) {
1161
474
        const ColumnId column_id = schema_->column_id(idx);
1162
474
        if (GetValueState(new_row, column_id) == ValueState::kNotNull) 
skip_check = true2
;
1163
474
    }
1164
1165
218
    if (!skip_check) {
1166
684
        for (size_t idx = schema_->num_key_columns(); idx < schema_->num_columns(); 
idx++468
) {
1167
468
            const ColumnId column_id = schema_->column_id(idx);
1168
468
            if (GetValueState(new_row, column_id) == ValueState::kMissing) {
1169
0
              DCHECK(GetValueState(existing_row, column_id) != ValueState::kMissing);
1170
0
            }
1171
468
        }
1172
216
    }
1173
218
    #endif
1174
1175
218
    return true;
1176
666
  }
1177
1178
12.5k
  return false;
1179
13.2k
}
1180
1181
4.61M
MonoDelta QLWriteOperation::request_ttl() const {
1182
4.61M
  return request_.has_ttl() ? 
MonoDelta::FromMilliseconds(request_.ttl())121k
1183
4.61M
                            : 
ValueControlFields::kMaxTtl4.49M
;
1184
4.61M
}
1185
1186
namespace {
1187
1188
87.3k
QLExpressionPB* NewKeyColumn(QLWriteRequestPB* request, const IndexInfo& index, const size_t idx) {
1189
87.3k
  return (idx < index.hash_column_count()
1190
87.3k
          ? 
request->add_hashed_column_values()41.3k
1191
87.3k
          : 
request->add_range_column_values()45.9k
);
1192
87.3k
}
1193
1194
QLWriteRequestPB* NewIndexRequest(
1195
    const IndexInfo& index,
1196
    QLWriteRequestPB::QLStmtType type,
1197
41.1k
    vector<pair<const IndexInfo*, QLWriteRequestPB>>* index_requests) {
1198
41.1k
  index_requests->emplace_back(&index, QLWriteRequestPB());
1199
41.1k
  QLWriteRequestPB* const request = &index_requests->back().second;
1200
41.1k
  request->set_type(type);
1201
41.1k
  return request;
1202
41.1k
}
1203
1204
} // namespace
1205
1206
27.2k
Status QLWriteOperation::UpdateIndexes(const QLTableRow& existing_row, const QLTableRow& new_row) {
1207
  // Prepare the write requests to update the indexes. There should be at most 2 requests for each
1208
  // index (one insert and one delete).
1209
27.2k
  VLOG
(2) << "Updating indexes"59
;
1210
27.2k
  const auto& index_ids = request_.update_index_ids();
1211
27.2k
  index_requests_.reserve(index_ids.size() * 2);
1212
48.0k
  for (const TableId& index_id : index_ids) {
1213
48.0k
    const IndexInfo* index = VERIFY_RESULT(index_map_.FindIndex(index_id));
1214
0
    bool index_key_changed = false;
1215
48.0k
    bool index_pred_existing_row = true;
1216
48.0k
    bool index_pred_new_row = true;
1217
48.0k
    bool is_row_deleted = VERIFY_RESULT(IsRowDeleted(existing_row, new_row));
1218
1219
48.0k
    if (index->where_predicate_spec()) {
1220
4.38k
      RETURN_NOT_OK(EvalCondition(
1221
4.38k
        index->where_predicate_spec()->where_expr().condition(), existing_row,
1222
4.38k
        &index_pred_existing_row));
1223
4.38k
    }
1224
1225
48.0k
    if (is_row_deleted) {
1226
      // If it is a partial index and predicate wasn't satisfied for the existing row
1227
      // which is being deleted, we need to do nothing.
1228
529
      if (index->where_predicate_spec() && 
!index_pred_existing_row144
) {
1229
72
        VLOG(3) << "Skip index entry delete for index_id=" << index->table_id() <<
1230
0
          " since predicate not satisfied";
1231
72
        continue;
1232
72
      }
1233
457
      index_key_changed = true;
1234
47.4k
    } else {
1235
47.4k
      VERIFY_RESULT(CreateAndSetupIndexInsertRequest(
1236
47.4k
          this, index->HasWritePermission(), existing_row, new_row, index,
1237
47.4k
          &index_requests_, &index_key_changed, &index_pred_new_row, index_pred_existing_row));
1238
47.4k
    }
1239
1240
47.9k
    bool index_pred_switched_to_false = false;
1241
47.9k
    if (index->where_predicate_spec() &&
1242
47.9k
        
!existing_row.IsEmpty()4.31k
&&
index_pred_existing_row1.75k
&&
!index_pred_new_row794
)
1243
290
      index_pred_switched_to_false = true;
1244
1245
    // If the index key is changed, delete the current key.
1246
47.9k
    if ((index_key_changed || 
index_pred_switched_to_false45.4k
) &&
index->HasDeletePermission()2.60k
) {
1247
2.60k
      if (!index_pred_switched_to_false) {
1248
        // 1. In case of a switch of predicate satisfiability to false, we surely have to delete the
1249
        // row. (Even if there wasn't any index key change).
1250
        // 2. But in case of an index key change without predicate satisfiability switch, if the
1251
        // index predicate was already false for the existing row, we have to do nothing.
1252
        // TODO(Piyush): Ensure EvalCondition returns an error if some column is missing.
1253
2.31k
        if (!index_pred_existing_row) {
1254
506
          VLOG(3) << "Skip index entry delete of existing row for index_id=" << index->table_id() <<
1255
0
            " since predicate not satisfied";
1256
506
          continue;
1257
506
        }
1258
2.31k
      }
1259
1260
2.10k
      QLWriteRequestPB* const index_request =
1261
2.10k
          NewIndexRequest(*index, QLWriteRequestPB::QL_STMT_DELETE, &index_requests_);
1262
2.10k
      VLOG(3) << "Issue index entry delete of existing row for index_id=" << index->table_id() <<
1263
0
        " since predicate was satisfied earlier AND (isn't satisfied now (OR) the key has changed)";
1264
1265
7.87k
      for (size_t idx = 0; idx < index->key_column_count(); 
idx++5.77k
) {
1266
5.77k
        const auto& index_column = index->column(idx);
1267
5.77k
        QLExpressionPB *key_column = NewKeyColumn(index_request, *index, idx);
1268
1269
        // For old message expr_case() == NOT SET.
1270
        // For new message expr_case == kColumnId when indexing expression is a column-ref.
1271
5.77k
        if (index_column.colexpr.expr_case() != QLExpressionPB::ExprCase::EXPR_NOT_SET &&
1272
5.77k
            
index_column.colexpr.expr_case() != QLExpressionPB::ExprCase::kColumnId5.77k
) {
1273
188
          QLExprResult result;
1274
188
          RETURN_NOT_OK(EvalExpr(index_column.colexpr, existing_row, result.Writer()));
1275
188
          result.MoveTo(key_column->mutable_value());
1276
5.58k
        } else {
1277
5.58k
          auto result = existing_row.GetValue(index_column.indexed_column_id);
1278
5.58k
          if (
result5.58k
) {
1279
5.58k
            key_column->mutable_value()->CopyFrom(*result);
1280
5.58k
          }
1281
5.58k
        }
1282
5.77k
      }
1283
2.10k
    }
1284
47.9k
  }
1285
1286
27.2k
  return Status::OK();
1287
27.2k
}
1288
1289
Result<QLWriteRequestPB*> CreateAndSetupIndexInsertRequest(
1290
    QLExprExecutor* expr_executor,
1291
    bool index_has_write_permission,
1292
    const QLTableRow& existing_row,
1293
    const QLTableRow& new_row,
1294
    const IndexInfo* index,
1295
    vector<pair<const IndexInfo*, QLWriteRequestPB>>* index_requests,
1296
    bool* has_index_key_changed,
1297
    bool* index_pred_new_row,
1298
52.4k
    bool index_pred_existing_row) {
1299
52.4k
  bool index_key_changed = false;
1300
52.4k
  bool update_this_index = false;
1301
52.4k
  unordered_map<size_t, QLValuePB> values;
1302
1303
  // Prepare the new index key.
1304
163k
  for (size_t idx = 0; idx < index->key_column_count(); 
idx++111k
) {
1305
111k
    const auto& index_column = index->column(idx);
1306
111k
    bool column_changed = true;
1307
1308
    // Column_id should be used without executing "colexpr" for the following cases (we want
1309
    // to avoid executing colexpr as it is less efficient).
1310
    // - Old PROTO messages (expr_case() == NOT SET).
1311
    // - When indexing expression is just a column-ref (expr_case == kColumnId)
1312
111k
    if (index_column.colexpr.expr_case() == QLExpressionPB::ExprCase::EXPR_NOT_SET ||
1313
111k
        index_column.colexpr.expr_case() == QLExpressionPB::ExprCase::kColumnId) {
1314
107k
      auto result = new_row.GetValue(index_column.indexed_column_id);
1315
107k
      if (!existing_row.IsEmpty()) {
1316
        // For each column in the index key, if there is a new value, see if the value is
1317
        // changed from the current value. Else, use the current value.
1318
28.5k
        if (result) {
1319
28.4k
          if (new_row.MatchColumn(index_column.indexed_column_id, existing_row)) {
1320
26.5k
            column_changed = false;
1321
26.5k
          } else {
1322
1.90k
            index_key_changed = true;
1323
1.90k
          }
1324
28.4k
        } else {
1325
          // TODO(Piyush): This else is possibly dead code. It can never happen that the new_row
1326
          // doesn't have some column but the existing one does because we copy the existing one
1327
          // into the new one before this function call.
1328
90
          result = existing_row.GetValue(index_column.indexed_column_id);
1329
90
        }
1330
28.5k
      }
1331
107k
      if (result) {
1332
104k
        values[idx] = std::move(*result);
1333
104k
      }
1334
107k
    } else {
1335
3.84k
      QLExprResult result;
1336
3.84k
      if (existing_row.IsEmpty()) {
1337
3.57k
        RETURN_NOT_OK(expr_executor->EvalExpr(index_column.colexpr, new_row, result.Writer()));
1338
3.57k
      } else {
1339
        // For each column in the index key, if there is a new value, see if the value is
1340
        // specified in the new value. Otherwise, use the current value.
1341
335
        if (
new_row.IsColumnSpecified(index_column.indexed_column_id)268
) {
1342
335
          RETURN_NOT_OK(expr_executor->EvalExpr(index_column.colexpr, new_row, result.Writer()));
1343
          // Compare new and existing results of the expression, if the results are equal
1344
          // that means the key is NOT changed in fact even if the column value is changed.
1345
335
          QLExprResult existing_result;
1346
335
          RETURN_NOT_OK(expr_executor->EvalExpr(
1347
335
              index_column.colexpr, existing_row, existing_result.Writer()));
1348
335
          if (result.Value() == existing_result.Value()) {
1349
179
            column_changed = false;
1350
179
          } else {
1351
156
            index_key_changed = true;
1352
156
          }
1353
18.4E
        } else {
1354
          // TODO(Piyush): This else is possibly dead code.
1355
18.4E
          RETURN_NOT_OK(expr_executor->EvalExpr(
1356
18.4E
              index_column.colexpr, existing_row, result.Writer()));
1357
18.4E
        }
1358
268
      }
1359
1360
3.84k
      result.MoveTo(&values[idx]);
1361
3.84k
    }
1362
1363
111k
    if (column_changed) {
1364
84.4k
      update_this_index = true;
1365
84.4k
    }
1366
111k
  }
1367
1368
  // Prepare the covering columns.
1369
67.3k
  
for (size_t idx = index->key_column_count(); 52.4k
idx < index->columns().size();
idx++14.9k
) {
1370
14.9k
    const auto& index_column = index->column(idx);
1371
14.9k
    auto result = new_row.GetValue(index_column.indexed_column_id);
1372
14.9k
    bool column_changed = true;
1373
1374
    // If the index value is changed and there is no new covering column value set, use the
1375
    // current value.
1376
14.9k
    if (index_key_changed) {
1377
2.77k
      if (!result) {
1378
        // TODO(Piyush): This if is possibly dead code.
1379
0
        result = existing_row.GetValue(index_column.indexed_column_id);
1380
0
      }
1381
12.1k
    } else if (!FLAGS_ycql_disable_index_updating_optimization &&
1382
12.1k
        result && 
new_row.MatchColumn(index_column.indexed_column_id, existing_row)12.1k
) {
1383
2.19k
      column_changed = false;
1384
2.19k
    }
1385
14.9k
    if (result) {
1386
14.8k
      values[idx] = std::move(*result);
1387
14.8k
    }
1388
1389
14.9k
    if (column_changed) {
1390
12.7k
      update_this_index = true;
1391
12.7k
    }
1392
14.9k
  }
1393
1394
52.4k
  if (has_index_key_changed) {
1395
47.5k
    *has_index_key_changed = index_key_changed;
1396
47.5k
  }
1397
1398
52.4k
  bool new_row_satisfies_idx_pred = true;
1399
52.4k
  if (index->where_predicate_spec()) {
1400
    // TODO(Piyush): Ensure EvalCondition returns an error if some column is missing.
1401
4.39k
    RETURN_NOT_OK(expr_executor->EvalCondition(
1402
4.39k
      index->where_predicate_spec()->where_expr().condition(), new_row,
1403
4.39k
      &new_row_satisfies_idx_pred));
1404
4.39k
    if (index_pred_new_row) {
1405
4.24k
      *index_pred_new_row = new_row_satisfies_idx_pred;
1406
4.24k
    }
1407
1408
4.39k
    if (new_row_satisfies_idx_pred && 
!index_pred_existing_row2.42k
) {
1409
      // In case the row is unchanged but the predicate switches to true (can happen if the
1410
      // predicate involves no indexed/covering cols).
1411
1.51k
      if (!update_this_index)
1412
49
        VLOG(3) << "Indexed/covering cols unchanged but predicate switched to true for index_id=" <<
1413
49
          index->table_id();
1414
1.51k
      update_this_index = true;
1415
1.51k
    }
1416
4.39k
  }
1417
1418
52.4k
  if (index_has_write_permission &&
1419
52.4k
      
(51.2k
update_this_index51.2k
||
FLAGS_ycql_disable_index_updating_optimization10.4k
)) {
1420
    // If this is a partial index and the index predicate is false for the new row, skip the insert.
1421
40.8k
    if (index->where_predicate_spec() && 
!new_row_satisfies_idx_pred3.94k
) {
1422
1.73k
      VLOG(3) << "Skip index entry write for index_id=" << index->table_id() <<
1423
0
        " since predicate not satisfied";
1424
1.73k
      return nullptr;
1425
1.73k
    }
1426
1427
39.0k
    QLWriteRequestPB* const index_request =
1428
39.0k
        NewIndexRequest(*index, QLWriteRequestPB::QL_STMT_INSERT, index_requests);
1429
1430
    // Setup the key columns.
1431
120k
    for (size_t idx = 0; idx < index->key_column_count(); 
idx++81.5k
) {
1432
81.5k
      QLExpressionPB* const key_column = NewKeyColumn(index_request, *index, idx);
1433
81.5k
      auto it = values.find(idx);
1434
81.5k
      if (it != values.end()) {
1435
78.4k
        *key_column->mutable_value() = std::move(it->second);
1436
78.4k
      }
1437
81.5k
    }
1438
1439
    // Setup the covering columns.
1440
47.7k
    for (size_t idx = index->key_column_count(); idx < index->columns().size(); 
idx++8.62k
) {
1441
8.62k
      auto it = values.find(idx);
1442
8.62k
      if (it != values.end()) {
1443
8.59k
        const auto& index_column = index->column(idx);
1444
8.59k
        QLColumnValuePB* const covering_column = index_request->add_column_values();
1445
8.59k
        covering_column->set_column_id(index_column.column_id);
1446
8.59k
        *covering_column->mutable_expr()->mutable_value() = std::move(it->second);
1447
8.59k
      }
1448
8.62k
    }
1449
1450
39.0k
    return index_request;
1451
40.8k
  }
1452
1453
11.6k
  return nullptr; // The index updating was skipped.
1454
52.4k
}
1455
1456
Status QLReadOperation::Execute(const YQLStorageIf& ql_storage,
1457
                                CoarseTimePoint deadline,
1458
                                const ReadHybridTime& read_time,
1459
                                const Schema& schema,
1460
                                const Schema& projection,
1461
                                QLResultSet* resultset,
1462
7.51M
                                HybridTime* restart_read_ht) {
1463
7.51M
  SimulateTimeoutIfTesting(&deadline);
1464
7.51M
  size_t row_count_limit = std::numeric_limits<std::size_t>::max();
1465
7.51M
  size_t num_rows_skipped = 0;
1466
7.51M
  size_t offset = 0;
1467
7.51M
  if (request_.has_offset()) {
1468
1.69k
    offset = request_.offset();
1469
1.69k
  }
1470
7.51M
  if (request_.has_limit()) {
1471
7.21M
    if (request_.limit() == 0) {
1472
0
      return Status::OK();
1473
0
    }
1474
7.21M
    row_count_limit = request_.limit();
1475
7.21M
  }
1476
1477
  // Create the projections of the non-key columns selected by the row block plus any referenced in
1478
  // the WHERE condition. When DocRowwiseIterator::NextRow() populates the value map, it uses this
1479
  // projection only to scan sub-documents. The query schema is used to select only referenced
1480
  // columns and key columns.
1481
7.51M
  Schema static_projection, non_static_projection;
1482
7.51M
  RETURN_NOT_OK(CreateProjections(schema, request_.column_refs(),
1483
7.51M
                                  &static_projection, &non_static_projection));
1484
7.51M
  const bool read_static_columns = !static_projection.columns().empty();
1485
7.51M
  const bool read_distinct_columns = request_.distinct();
1486
1487
7.51M
  std::unique_ptr<YQLRowwiseIteratorIf> iter;
1488
7.51M
  std::unique_ptr<QLScanSpec> spec, static_row_spec;
1489
7.51M
  RETURN_NOT_OK(ql_storage.BuildYQLScanSpec(
1490
7.51M
      request_, read_time, schema, read_static_columns, static_projection, &spec,
1491
7.51M
      &static_row_spec));
1492
7.51M
  RETURN_NOT_OK(ql_storage.GetIterator(request_, projection, schema, txn_op_context_,
1493
7.51M
                                       deadline, read_time, *spec, &iter));
1494
7.51M
  VTRACE(1, "Initialized iterator");
1495
1496
7.51M
  QLTableRow static_row;
1497
7.51M
  QLTableRow non_static_row;
1498
7.51M
  QLTableRow& selected_row = read_distinct_columns ? 
static_row117
:
non_static_row7.51M
;
1499
1500
  // In case when we are continuing a select with a paging state, or when using a reverse scan,
1501
  // the static columns for the next row to fetch are not included in the first iterator and we
1502
  // need to fetch them with a separate spec and iterator before beginning the normal fetch below.
1503
7.51M
  if (static_row_spec != nullptr) {
1504
3
    std::unique_ptr<YQLRowwiseIteratorIf> static_row_iter;
1505
3
    RETURN_NOT_OK(ql_storage.GetIterator(
1506
3
        request_, static_projection, schema, txn_op_context_, deadline, read_time,
1507
3
        *static_row_spec, &static_row_iter));
1508
3
    if (VERIFY_RESULT(static_row_iter->HasNext())) {
1509
3
      RETURN_NOT_OK(static_row_iter->NextRow(&static_row));
1510
3
    }
1511
3
  }
1512
1513
  // Begin the normal fetch.
1514
7.51M
  int match_count = 0;
1515
7.51M
  bool static_dealt_with = true;
1516
18.6M
  while (resultset->rsrow_count() < row_count_limit && 
VERIFY_RESULT11.5M
(iter->HasNext())) {
1517
11.1M
    const bool last_read_static = iter->IsNextStaticColumn();
1518
1519
    // Note that static columns are sorted before non-static columns in DocDB as follows. This is
1520
    // because "<empty_range_components>" is empty and terminated by kGroupEnd which sorts before
1521
    // all other ValueType characters in a non-empty range component.
1522
    //   <hash_code><hash_components><empty_range_components><static_column_id> -> value;
1523
    //   <hash_code><hash_components><range_components><non_static_column_id> -> value;
1524
11.1M
    if (last_read_static) {
1525
163
      static_row.Clear();
1526
163
      RETURN_NOT_OK(iter->NextRow(static_projection, &static_row));
1527
11.1M
    } else { // Reading a regular row that contains non-static columns.
1528
      // Read this regular row.
1529
      // TODO(omer): this is quite inefficient if read_distinct_column. A better way to do this
1530
      // would be to only read the first non-static column for each hash key, and skip the rest
1531
11.1M
      non_static_row.Clear();
1532
11.1M
      RETURN_NOT_OK(iter->NextRow(non_static_projection, &non_static_row));
1533
11.1M
    }
1534
1535
    // We have two possible cases: whether we use distinct or not
1536
    // If we use distinct, then in general we only need to add the static rows
1537
    // However, we might have to add non-static rows, if there is no static row corresponding to
1538
    // it. Of course, we add one entry per hash key in non-static row.
1539
    // If we do not use distinct, we are generally only adding non-static rows
1540
    // However, if there is no non-static row for the static row, we have to add it.
1541
11.1M
    if (read_distinct_columns) {
1542
126
      bool join_successful = false;
1543
126
      if (!last_read_static) {
1544
78
        join_successful = JoinNonStaticRow(schema, static_projection, non_static_row, &static_row);
1545
78
      }
1546
1547
      // If the join was not successful, it means that the non-static row we read has no
1548
      // corresponding static row, so we have to add it to the result
1549
126
      if (!join_successful) {
1550
58
        RETURN_NOT_OK(AddRowToResult(
1551
58
            spec, static_row, row_count_limit, offset, resultset, &match_count, &num_rows_skipped));
1552
58
      }
1553
11.1M
    } else {
1554
11.1M
      if (last_read_static) {
1555
        // If the next row to be read is not static, deal with it later, as we do not know whether
1556
        // the non-static row corresponds to this static row; if the non-static row doesn't
1557
        // correspond to this static row, we will have to add it later, so set static_dealt_with to
1558
        // false
1559
115
        if (VERIFY_RESULT(iter->HasNext()) && 
!iter->IsNextStaticColumn()98
) {
1560
92
          static_dealt_with = false;
1561
92
          continue;
1562
92
        }
1563
1564
23
        AddProjection(non_static_projection, &static_row);
1565
23
        RETURN_NOT_OK(AddRowToResult(spec, static_row, row_count_limit, offset, resultset,
1566
23
                                     &match_count, &num_rows_skipped));
1567
11.1M
      } else {
1568
        // We also have to do the join if we are not reading any static columns, as Cassandra
1569
        // reports nulls for static rows with no corresponding non-static row
1570
11.1M
        if (read_static_columns || 
!static_dealt_with11.0M
) {
1571
346
          const bool join_successful = JoinStaticRow(schema,
1572
346
                                               static_projection,
1573
346
                                               static_row,
1574
346
                                               &non_static_row);
1575
          // Add the static row if the join was not successful and it is the first time we are
1576
          // dealing with this static row
1577
346
          if (!join_successful && 
!static_dealt_with0
) {
1578
0
            AddProjection(non_static_projection, &static_row);
1579
0
            RETURN_NOT_OK(AddRowToResult(
1580
0
                spec, static_row, row_count_limit, offset, resultset, &match_count,
1581
0
                &num_rows_skipped));
1582
0
          }
1583
346
        }
1584
11.1M
        static_dealt_with = true;
1585
11.1M
        RETURN_NOT_OK(AddRowToResult(
1586
11.1M
            spec, non_static_row, row_count_limit, offset, resultset, &match_count,
1587
11.1M
            &num_rows_skipped));
1588
11.1M
      }
1589
11.1M
    }
1590
11.1M
  }
1591
1592
7.51M
  if (request_.is_aggregate() && 
match_count > 0202
) {
1593
163
    RETURN_NOT_OK(PopulateAggregate(selected_row, resultset));
1594
163
  }
1595
1596
7.51M
  VTRACE(1, "Fetched $0 rows.", resultset->rsrow_count());
1597
1598
7.51M
  RETURN_NOT_OK(SetPagingStateIfNecessary(
1599
7.51M
      iter.get(), resultset, row_count_limit, num_rows_skipped, read_time));
1600
1601
  // SetPagingStateIfNecessary could perform read, so we assign restart_read_ht after it.
1602
7.51M
  *restart_read_ht = iter->RestartReadHt();
1603
1604
7.51M
  return Status::OK();
1605
7.51M
}
1606
1607
Status QLReadOperation::SetPagingStateIfNecessary(const YQLRowwiseIteratorIf* iter,
1608
                                                  const QLResultSet* resultset,
1609
                                                  const size_t row_count_limit,
1610
                                                  const size_t num_rows_skipped,
1611
7.52M
                                                  const ReadHybridTime& read_time) {
1612
7.52M
  if ((resultset->rsrow_count() >= row_count_limit || 
request_.has_offset()373k
) &&
1613
7.52M
      
!request_.is_aggregate()7.13M
) {
1614
7.13M
    SubDocKey next_row_key;
1615
7.13M
    RETURN_NOT_OK(iter->GetNextReadSubDocKey(&next_row_key));
1616
    // When the "limit" number of rows are returned and we are asked to return the paging state,
1617
    // return the partition key and row key of the next row to read in the paging state if there are
1618
    // still more rows to read. Otherwise, leave the paging state empty which means we are done
1619
    // reading from this tablet.
1620
7.13M
    if (request_.return_paging_state()) {
1621
2.41k
      if (!next_row_key.doc_key().empty()) {
1622
786
        QLPagingStatePB* paging_state = response_.mutable_paging_state();
1623
786
        paging_state->set_next_partition_key(
1624
786
            PartitionSchema::EncodeMultiColumnHashValue(next_row_key.doc_key().hash()));
1625
786
        paging_state->set_next_row_key(next_row_key.Encode().ToStringBuffer());
1626
786
        paging_state->set_total_rows_skipped(request_.paging_state().total_rows_skipped() +
1627
786
            num_rows_skipped);
1628
1.62k
      } else if (request_.has_offset()) {
1629
1.60k
        QLPagingStatePB* paging_state = response_.mutable_paging_state();
1630
1.60k
        paging_state->set_total_rows_skipped(request_.paging_state().total_rows_skipped() +
1631
1.60k
            num_rows_skipped);
1632
1.60k
      }
1633
2.41k
    }
1634
7.13M
    if (response_.has_paging_state()) {
1635
2.39k
      if (FLAGS_ycql_consistent_transactional_paging) {
1636
0
        read_time.AddToPB(response_.mutable_paging_state());
1637
2.39k
      } else {
1638
        // Using SingleTime will help avoid read restarts on second page and later but will
1639
        // potentially produce stale results on those pages.
1640
2.39k
        auto per_row_consistent_read_time = ReadHybridTime::SingleTime(read_time.read);
1641
2.39k
        per_row_consistent_read_time.AddToPB(response_.mutable_paging_state());
1642
2.39k
      }
1643
2.39k
    }
1644
7.13M
  }
1645
1646
7.52M
  return Status::OK();
1647
7.52M
}
1648
1649
0
Status QLReadOperation::GetIntents(const Schema& schema, KeyValueWriteBatchPB* out) {
1650
0
  std::vector<PrimitiveValue> hashed_components;
1651
0
  RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues(
1652
0
      request_.hashed_column_values(), schema, 0, schema.num_hash_key_columns(),
1653
0
      &hashed_components));
1654
0
  auto pair = out->mutable_read_pairs()->Add();
1655
0
  if (hashed_components.empty()) {
1656
    // Empty hashed components mean that we don't have primary key at all, but request
1657
    // could still contain hash_code as part of tablet routing.
1658
    // So we should ignore it.
1659
0
    pair->set_key(std::string(1, ValueTypeAsChar::kGroupEnd));
1660
0
  } else {
1661
0
    DocKey doc_key(request_.hash_code(), hashed_components);
1662
0
    pair->set_key(doc_key.Encode().ToStringBuffer());
1663
0
  }
1664
0
  pair->set_value(std::string(1, ValueTypeAsChar::kNullLow));
1665
  // Wait policies make sense only for YSQL to support different modes like waiting, erroring out
1666
  // or skipping on intent conflict. YCQL behaviour matches WAIT_ERROR (see proto for details).
1667
0
  out->set_wait_policy(WAIT_ERROR);
1668
0
  return Status::OK();
1669
0
}
1670
1671
Status QLReadOperation::PopulateResultSet(const std::unique_ptr<QLScanSpec>& spec,
1672
                                          const QLTableRow& table_row,
1673
10.9M
                                          QLResultSet *resultset) {
1674
10.9M
  resultset->AllocateRow();
1675
10.9M
  int rscol_index = 0;
1676
45.3M
  for (const QLExpressionPB& expr : request_.selected_exprs()) {
1677
45.3M
    QLExprResult value;
1678
45.3M
    RETURN_NOT_OK(EvalExpr(expr, table_row, value.Writer(), spec->schema()));
1679
45.3M
    resultset->AppendColumn(rscol_index, value.Value());
1680
45.3M
    rscol_index++;
1681
45.3M
  }
1682
1683
10.9M
  return Status::OK();
1684
10.9M
}
1685
1686
14.3k
Status QLReadOperation::EvalAggregate(const QLTableRow& table_row) {
1687
14.3k
  if (aggr_result_.empty()) {
1688
163
    int column_count = request_.selected_exprs().size();
1689
163
    aggr_result_.resize(column_count);
1690
163
  }
1691
1692
14.3k
  int aggr_index = 0;
1693
15.1k
  for (const QLExpressionPB& expr : request_.selected_exprs()) {
1694
15.1k
    RETURN_NOT_OK(EvalExpr(expr, table_row, aggr_result_[aggr_index++].Writer()));
1695
15.1k
  }
1696
14.3k
  return Status::OK();
1697
14.3k
}
1698
1699
163
Status QLReadOperation::PopulateAggregate(const QLTableRow& table_row, QLResultSet *resultset) {
1700
163
  resultset->AllocateRow();
1701
163
  int column_count = request_.selected_exprs().size();
1702
559
  for (int rscol_index = 0; rscol_index < column_count; 
rscol_index++396
) {
1703
396
    resultset->AppendColumn(rscol_index, aggr_result_[rscol_index].Value());
1704
396
  }
1705
163
  return Status::OK();
1706
163
}
1707
1708
Status QLReadOperation::AddRowToResult(const std::unique_ptr<QLScanSpec>& spec,
1709
                                       const QLTableRow& row,
1710
                                       const size_t row_count_limit,
1711
                                       const size_t offset,
1712
                                       QLResultSet* resultset,
1713
                                       int* match_count,
1714
11.0M
                                       size_t *num_rows_skipped) {
1715
11.0M
  VLOG
(3) << __FUNCTION__ << " : " << yb::ToString(row)5.27k
;
1716
11.0M
  if (
resultset->rsrow_count() < row_count_limit11.0M
) {
1717
11.0M
    bool match = false;
1718
11.0M
    RETURN_NOT_OK(spec->Match(row, &match));
1719
11.0M
    if (match) {
1720
10.9M
      if (*num_rows_skipped >= offset) {
1721
10.9M
        (*match_count)++;
1722
10.9M
        if (request_.is_aggregate()) {
1723
14.3k
          RETURN_NOT_OK(EvalAggregate(row));
1724
10.8M
        } else {
1725
10.8M
          RETURN_NOT_OK(PopulateResultSet(spec, row, resultset));
1726
10.8M
        }
1727
10.9M
      } else {
1728
86
        (*num_rows_skipped)++;
1729
86
      }
1730
10.9M
    }
1731
11.0M
  }
1732
11.0M
  return Status::OK();
1733
11.0M
}
1734
1735
}  // namespace docdb
1736
}  // namespace yb