YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/cql_operation.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/cql_operation.h"
15
16
#include <limits>
17
#include <memory>
18
#include <string>
19
#include <unordered_set>
20
#include <utility>
21
#include <vector>
22
23
#include "yb/bfpg/tserver_opcodes.h"
24
25
#include "yb/common/index.h"
26
#include "yb/common/index_column.h"
27
#include "yb/common/partition.h"
28
#include "yb/common/ql_protocol_util.h"
29
#include "yb/common/ql_resultset.h"
30
#include "yb/common/ql_rowblock.h"
31
#include "yb/common/ql_value.h"
32
33
#include "yb/docdb/doc_path.h"
34
#include "yb/docdb/doc_ql_scanspec.h"
35
#include "yb/docdb/doc_rowwise_iterator.h"
36
#include "yb/docdb/doc_write_batch.h"
37
#include "yb/docdb/docdb.pb.h"
38
#include "yb/docdb/docdb_debug.h"
39
#include "yb/docdb/docdb_rocksdb_util.h"
40
#include "yb/docdb/primitive_value_util.h"
41
#include "yb/docdb/ql_storage_interface.h"
42
43
#include "yb/util/debug-util.h"
44
#include "yb/util/flag_tags.h"
45
#include "yb/util/result.h"
46
#include "yb/util/status.h"
47
#include "yb/util/status_format.h"
48
#include "yb/util/trace.h"
49
50
#include "yb/yql/cql/ql/util/errcodes.h"
51
52
DEFINE_test_flag(bool, pause_write_apply_after_if, false,
53
                 "Pause application of QLWriteOperation after evaluating if condition.");
54
55
DEFINE_bool(ycql_consistent_transactional_paging, false,
56
            "Whether to enforce consistency of data returned for second page and beyond for YCQL "
57
            "queries on transactional tables. If true, read restart errors could be returned to "
58
            "prevent inconsistency. If false, no read restart errors are returned but the data may "
59
            "be stale. The latter is preferable for long scans. The data returned for the first "
60
            "page of results is never stale regardless of this flag.");
61
62
DEFINE_bool(ycql_disable_index_updating_optimization, false,
63
            "If true all secondary indexes must be updated even if the update does not change "
64
            "the index data.");
65
TAG_FLAG(ycql_disable_index_updating_optimization, advanced);
66
67
namespace yb {
68
namespace docdb {
69
70
using std::pair;
71
using std::unordered_map;
72
using std::unordered_set;
73
using std::vector;
74
75
namespace {
76
77
// Append dummy entries in schema to table_row
78
// TODO(omer): this should most probably be added somewhere else
79
23
void AddProjection(const Schema& schema, QLTableRow* table_row) {
80
47
  for (size_t i = 0; i < schema.num_columns(); i++) {
81
24
    const auto& column_id = schema.column_id(i);
82
24
    table_row->AllocColumn(column_id);
83
24
  }
84
23
}
85
86
// Create projection schemas of static and non-static columns from a rowblock projection schema
87
// (for read) and a WHERE / IF condition (for read / write). "schema" is the full table schema
88
// and "rowblock_schema" is the selected columns from which we are splitting into static and
89
// non-static column portions.
90
CHECKED_STATUS CreateProjections(const Schema& schema, const QLReferencedColumnsPB& column_refs,
91
3.96M
                                 Schema* static_projection, Schema* non_static_projection) {
92
  // The projection schemas are used to scan docdb.
93
3.96M
  unordered_set<ColumnId> static_columns, non_static_columns;
94
95
  // Add regular columns.
96
13.2M
  for (int32_t id : column_refs.ids()) {
97
13.2M
    const ColumnId column_id(id);
98
13.2M
    if (!schema.is_key_column(column_id)) {
99
5.43M
      non_static_columns.insert(column_id);
100
5.43M
    }
101
13.2M
  }
102
103
  // Add static columns.
104
437
  for (int32_t id : column_refs.static_ids()) {
105
437
    const ColumnId column_id(id);
106
437
    static_columns.insert(column_id);
107
437
  }
108
109
3.96M
  RETURN_NOT_OK(
110
3.96M
      schema.CreateProjectionByIdsIgnoreMissing(
111
3.96M
          vector<ColumnId>(static_columns.begin(), static_columns.end()),
112
3.96M
          static_projection));
113
3.96M
  RETURN_NOT_OK(
114
3.96M
      schema.CreateProjectionByIdsIgnoreMissing(
115
3.96M
          vector<ColumnId>(non_static_columns.begin(), non_static_columns.end()),
116
3.96M
          non_static_projection));
117
118
3.96M
  return Status::OK();
119
3.96M
}
120
121
CHECKED_STATUS PopulateRow(const QLTableRow& table_row, const Schema& schema,
122
                           const size_t begin_idx, const size_t col_count,
123
66
                           QLRow* row, size_t *col_idx) {
124
144
  for (size_t i = begin_idx; i < begin_idx + col_count; i++) {
125
78
    RETURN_NOT_OK(table_row.GetValue(schema.column_id(i), row->mutable_column((*col_idx)++)));
126
78
  }
127
66
  return Status::OK();
128
66
}
129
130
CHECKED_STATUS PopulateRow(const QLTableRow& table_row, const Schema& projection,
131
44
                           QLRow* row, size_t* col_idx) {
132
44
  return PopulateRow(table_row, projection, 0, projection.num_columns(), row, col_idx);
133
44
}
134
135
// Outer join a static row with a non-static row.
136
// A join is successful if and only if for every hash key, the values in the static and the
137
// non-static row are either non-NULL and the same, or one of them is NULL. Therefore we say that
138
// a join is successful if the static row is empty, and in turn return true.
139
// Copies the entries from the static row into the non-static one.
140
bool JoinStaticRow(
141
    const Schema& schema, const Schema& static_projection, const QLTableRow& static_row,
142
353
    QLTableRow* non_static_row) {
143
  // The join is successful if the static row is empty
144
353
  if (static_row.IsEmpty()) {
145
8
    return true;
146
8
  }
147
148
  // Now we know that the static row is not empty. The non-static row cannot be empty, therefore
149
  // we know that both the static row and the non-static one have non-NULL entries for all
150
  // hash keys. Therefore if MatchColumn returns false, we know the join is unsuccessful.
151
  // TODO(neil)
152
  // - Need to assign TTL and WriteTime to their default values.
153
  // - Check if they should be compared and copied over. Most likely not needed as we don't allow
154
  //   selecting TTL and WriteTime for static columns.
155
  // - This copying function should be moved to QLTableRow class.
156
949
  for (size_t i = 0; i < schema.num_hash_key_columns(); i++) {
157
606
    if (!non_static_row->MatchColumn(schema.column_id(i), static_row)) {
158
2
      return false;
159
2
    }
160
606
  }
161
162
  // Join the static columns in the static row into the non-static row.
163
752
  for (size_t i = 0; i < static_projection.num_columns(); i++) {
164
409
    non_static_row->CopyColumn(static_projection.column_id(i), static_row);
165
409
  }
166
167
343
  return true;
168
345
}
169
170
// Join a non-static row with a static row.
171
// Returns true if the two rows match
172
bool JoinNonStaticRow(
173
    const Schema& schema, const Schema& static_projection, const QLTableRow& non_static_row,
174
78
    QLTableRow* static_row) {
175
78
  bool join_successful = true;
176
177
197
  for (size_t i = 0; i < schema.num_hash_key_columns(); i++) {
178
129
    if (!static_row->MatchColumn(schema.column_id(i), non_static_row)) {
179
10
      join_successful = false;
180
10
      break;
181
10
    }
182
129
  }
183
184
78
  if (!join_successful) {
185
10
    static_row->Clear();
186
16
    for (size_t i = 0; i < static_projection.num_columns(); i++) {
187
6
      static_row->AllocColumn(static_projection.column_id(i));
188
6
    }
189
190
20
    for (size_t i = 0; i < schema.num_hash_key_columns(); i++) {
191
10
      static_row->CopyColumn(schema.column_id(i), non_static_row);
192
10
    }
193
10
  }
194
78
  return join_successful;
195
78
}
196
197
CHECKED_STATUS FindMemberForIndex(const QLColumnValuePB& column_value,
198
                                  int index,
199
                                  rapidjson::Value* document,
200
                                  rapidjson::Value::MemberIterator* memberit,
201
                                  rapidjson::Value::ValueIterator* valueit,
202
                                  bool* last_elem_object,
203
130
                                  bool is_insert) {
204
130
  *last_elem_object = false;
205
206
130
  int64_t array_index;
207
130
  if (document->IsArray()) {
208
8
    util::VarInt varint;
209
8
    RETURN_NOT_OK(varint.DecodeFromComparable(
210
8
        column_value.json_args(index).operand().value().varint_value()));
211
6
    array_index = VERIFY_RESULT(varint.ToInt64());
212
213
6
    if (array_index >= document->GetArray().Size() || array_index < 0) {
214
2
      return STATUS_SUBSTITUTE(QLError, "Array index out of bounds: ", array_index);
215
2
    }
216
4
    *valueit = document->Begin();
217
4
    std::advance(*valueit, array_index);
218
122
  } else if (document->IsObject()) {
219
122
    if (!is_insert) {
220
122
      util::VarInt varint;
221
122
      auto status =
222
122
        varint.DecodeFromComparable(column_value.json_args(index).operand().value().varint_value());
223
122
      if (status.ok()) {
224
2
        array_index = VERIFY_RESULT(varint.ToInt64());
225
2
        return STATUS_SUBSTITUTE(QLError, "Cannot use array index $0 to access object",
226
120
            array_index);
227
120
      }
228
122
    }
229
230
120
    *last_elem_object = true;
231
232
120
    const auto& member = column_value.json_args(index).operand().value().string_value().c_str();
233
120
    *memberit = document->FindMember(member);
234
120
    if (*memberit == document->MemberEnd()) {
235
41
      return STATUS_SUBSTITUTE(QLError, "Could not find member: ", member);
236
41
    }
237
0
  } else {
238
0
    return STATUS_SUBSTITUTE(QLError, "JSON field is invalid", column_value.ShortDebugString());
239
0
  }
240
83
  return Status::OK();
241
83
}
242
243
78
CHECKED_STATUS CheckUserTimestampForCollections(const UserTimeMicros user_timestamp) {
244
78
  if (user_timestamp != Value::kInvalidUserTimestamp) {
245
0
    return STATUS(InvalidArgument, "User supplied timestamp is only allowed for "
246
0
        "replacing the whole collection");
247
0
  }
248
78
  return Status::OK();
249
78
}
250
251
} // namespace
252
253
QLWriteOperation::QLWriteOperation(std::reference_wrapper<const QLWriteRequestPB> request,
254
                                   std::shared_ptr<const Schema> schema,
255
                                   std::reference_wrapper<const IndexMap> index_map,
256
                                   const Schema* unique_index_key_schema,
257
                                   const TransactionOperationContext& txn_op_context)
258
    : DocOperationBase(request),
259
      schema_(std::move(schema)),
260
      index_map_(index_map),
261
      unique_index_key_schema_(unique_index_key_schema),
262
      txn_op_context_(txn_op_context)
263
3.38M
{}
264
265
3.39M
QLWriteOperation::~QLWriteOperation() = default;
266
267
3.38M
Status QLWriteOperation::Init(QLResponsePB* response) {
268
3.38M
  response_ = response;
269
3.38M
  insert_into_unique_index_ = request_.type() == QLWriteRequestPB::QL_STMT_INSERT &&
270
3.04M
                              unique_index_key_schema_ != nullptr;
271
3.38M
  require_read_ = RequireRead(request_, *schema_) || insert_into_unique_index_
272
3.29M
                  || !index_map_.empty();
273
3.38M
  update_indexes_ = !request_.update_index_ids().empty();
274
275
  // Determine if static / non-static columns are being written.
276
3.38M
  bool write_static_columns = false;
277
3.38M
  bool write_non_static_columns = false;
278
  // TODO(Amit): Remove the DVLOGS after backfill features stabilize.
279
2.95k
  DVLOG(4) << "Processing request " << yb::ToString(request_);
280
5.01M
  for (const auto& column : request_.column_values()) {
281
377
    DVLOG(4) << "Looking at column : " << yb::ToString(column);
282
5.01M
    auto schema_column = schema_->column_by_id(ColumnId(column.column_id()));
283
18.4E
    DVLOG(4) << "schema column : " << yb::ToString(schema_column);
284
5.01M
    RETURN_NOT_OK(schema_column);
285
5.01M
    if (schema_column->is_static()) {
286
347
      write_static_columns = true;
287
5.01M
    } else {
288
5.01M
      write_non_static_columns = true;
289
5.01M
    }
290
5.01M
    if (write_static_columns && write_non_static_columns) {
291
185
      break;
292
185
    }
293
5.01M
  }
294
295
3.38M
  bool is_range_operation = IsRangeOperation(request_, *schema_);
296
297
  // We need the hashed key if writing to the static columns, and need primary key if writing to
298
  // non-static columns or writing the full primary key (i.e. range columns are present or table
299
  // does not have range columns).
300
3.38M
  return InitializeKeys(
301
3.38M
      write_static_columns || is_range_operation,
302
3.38M
      write_non_static_columns || !request_.range_column_values().empty() ||
303
14.9k
      schema_->num_range_key_columns() == 0);
304
3.38M
}
305
306
3.41M
Status QLWriteOperation::InitializeKeys(const bool hashed_key, const bool primary_key) {
307
  // Populate the hashed and range components in the same order as they are in the table schema.
308
3.41M
  const auto& hashed_column_values = request_.hashed_column_values();
309
3.41M
  const auto& range_column_values = request_.range_column_values();
310
3.41M
  std::vector<PrimitiveValue> hashed_components;
311
3.41M
  std::vector<PrimitiveValue> range_components;
312
3.41M
  RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues(
313
3.41M
      hashed_column_values, *schema_, 0,
314
3.41M
      schema_->num_hash_key_columns(), &hashed_components));
315
3.41M
  RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues(
316
3.41M
      range_column_values, *schema_, schema_->num_hash_key_columns(),
317
3.41M
      schema_->num_range_key_columns(), &range_components));
318
319
  // need_pk - true is we should construct pk_key_key_
320
3.41M
  const bool need_pk = primary_key && !pk_doc_key_;
321
322
  // We need the hash key if writing to the static columns.
323
3.41M
  if (hashed_key && !hashed_doc_key_) {
324
273
    if (need_pk) {
325
199
      hashed_doc_key_.emplace(request_.hash_code(), hashed_components);
326
74
    } else {
327
74
      hashed_doc_key_.emplace(request_.hash_code(), std::move(hashed_components));
328
74
    }
329
273
    encoded_hashed_doc_key_ = hashed_doc_key_->EncodeAsRefCntPrefix();
330
273
  }
331
332
  // We need the primary key if writing to non-static columns or writing the full primary key
333
  // (i.e. range columns are present).
334
3.41M
  if (need_pk) {
335
3.38M
    if (request_.has_hash_code() && !hashed_column_values.empty()) {
336
3.14M
      pk_doc_key_.emplace(
337
3.14M
         request_.hash_code(), std::move(hashed_components), std::move(range_components));
338
243k
    } else {
339
      // In case of syscatalog tables, we don't have any hash components.
340
243k
      pk_doc_key_.emplace(std::move(range_components));
341
243k
    }
342
3.38M
    encoded_pk_doc_key_ =  pk_doc_key_->EncodeAsRefCntPrefix();
343
3.38M
  }
344
345
3.41M
  return Status::OK();
346
3.41M
}
347
348
Status QLWriteOperation::GetDocPaths(
349
1.21M
    GetDocPathsMode mode, DocPathsToLock *paths, IsolationLevel *level) const {
350
1.21M
  if (mode == GetDocPathsMode::kLock || request_.column_values().empty() || !index_map_.empty()) {
351
1.14M
    if (encoded_hashed_doc_key_) {
352
269
      paths->push_back(encoded_hashed_doc_key_);
353
269
    }
354
1.14M
    if (encoded_pk_doc_key_) {
355
1.14M
      paths->push_back(encoded_pk_doc_key_);
356
1.14M
    }
357
70.5k
  } else {
358
70.5k
    KeyBytes buffer;
359
73.8k
    for (const auto& column_value : request_.column_values()) {
360
73.8k
      ColumnId column_id(column_value.column_id());
361
73.8k
      const ColumnSchema& column = VERIFY_RESULT(schema_->column_by_id(column_id));
362
363
2
      Slice doc_key = column.is_static() ? encoded_hashed_doc_key_.as_slice()
364
73.8k
                                         : encoded_pk_doc_key_.as_slice();
365
73.8k
      buffer.Clear();
366
73.8k
      buffer.AppendValueType(ValueType::kColumnId);
367
73.8k
      buffer.AppendColumnId(column_id);
368
73.8k
      RefCntBuffer path(doc_key.size() + buffer.size());
369
73.8k
      memcpy(path.data(), doc_key.data(), doc_key.size());
370
73.8k
      buffer.AsSlice().CopyTo(path.data() + doc_key.size());
371
73.8k
      paths->push_back(RefCntPrefix(path));
372
73.8k
    }
373
70.5k
  }
374
375
  // When this write operation requires a read, it requires a read snapshot so paths will be locked
376
  // in snapshot isolation for consistency. Otherwise, pure writes will happen in serializable
377
  // isolation so that they will serialize but do not conflict with one another.
378
  //
379
  // Currently, only keys that are being written are locked, no lock is taken on read at the
380
  // snapshot isolation level.
381
1.21M
  *level = require_read_ ? IsolationLevel::SNAPSHOT_ISOLATION
382
1.12M
                         : IsolationLevel::SERIALIZABLE_ISOLATION;
383
384
1.21M
  return Status::OK();
385
1.21M
}
386
387
Status QLWriteOperation::ReadColumns(const DocOperationApplyData& data,
388
                                     Schema *param_static_projection,
389
                                     Schema *param_non_static_projection,
390
27.8k
                                     QLTableRow* table_row) {
391
27.8k
  Schema *static_projection = param_static_projection;
392
27.8k
  Schema *non_static_projection = param_non_static_projection;
393
394
27.8k
  Schema local_static_projection;
395
27.8k
  Schema local_non_static_projection;
396
27.8k
  if (static_projection == nullptr) {
397
27.7k
    static_projection = &local_static_projection;
398
27.7k
  }
399
27.8k
  if (non_static_projection == nullptr) {
400
27.7k
    non_static_projection = &local_non_static_projection;
401
27.7k
  }
402
403
  // Create projections to scan docdb.
404
27.8k
  RETURN_NOT_OK(CreateProjections(*schema_, request_.column_refs(),
405
27.8k
                                  static_projection, non_static_projection));
406
407
  // Generate hashed / primary key depending on if static / non-static columns are referenced in
408
  // the if-condition.
409
27.8k
  RETURN_NOT_OK(InitializeKeys(
410
27.8k
      !static_projection->columns().empty(), !non_static_projection->columns().empty()));
411
412
  // Scan docdb for the static and non-static columns of the row using the hashed / primary key.
413
27.8k
  if (hashed_doc_key_) {
414
28
    DocQLScanSpec spec(*static_projection, *hashed_doc_key_, request_.query_id());
415
28
    DocRowwiseIterator iterator(*static_projection, *schema_, txn_op_context_,
416
28
                                data.doc_write_batch->doc_db(),
417
28
                                data.deadline, data.read_time);
418
28
    RETURN_NOT_OK(iterator.Init(spec));
419
28
    if (VERIFY_RESULT(iterator.HasNext())) {
420
22
      RETURN_NOT_OK(iterator.NextRow(table_row));
421
22
    }
422
28
    data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt());
423
28
  }
424
27.8k
  if (pk_doc_key_) {
425
27.8k
    DocQLScanSpec spec(*non_static_projection, *pk_doc_key_, request_.query_id());
426
27.8k
    DocRowwiseIterator iterator(*non_static_projection, *schema_, txn_op_context_,
427
27.8k
                                data.doc_write_batch->doc_db(),
428
27.8k
                                data.deadline, data.read_time);
429
27.8k
    RETURN_NOT_OK(iterator.Init(spec));
430
27.8k
    if (VERIFY_RESULT(iterator.HasNext())) {
431
12.0k
      RETURN_NOT_OK(iterator.NextRow(table_row));
432
      // If there are indexes to update, check if liveness column exists for update/delete because
433
      // that will affect whether the row will still exist after the DML and whether we need to
434
      // remove the key from the indexes.
435
12.0k
      if (update_indexes_ && (request_.type() == QLWriteRequestPB::QL_STMT_UPDATE ||
436
10.8k
                              request_.type() == QLWriteRequestPB::QL_STMT_DELETE)) {
437
1.25k
        liveness_column_exists_ = iterator.LivenessColumnExists();
438
1.25k
      }
439
15.8k
    } else {
440
      // If no non-static column is found, the row does not exist and we should clear the static
441
      // columns in the map to indicate the row does not exist.
442
15.8k
      table_row->Clear();
443
15.8k
    }
444
27.8k
    data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt());
445
27.8k
  }
446
447
27.8k
  return Status::OK();
448
27.8k
}
449
450
Status QLWriteOperation::PopulateConditionalDmlRow(const DocOperationApplyData& data,
451
                                                   const bool should_apply,
452
                                                   const QLTableRow& table_row,
453
                                                   Schema static_projection,
454
                                                   Schema non_static_projection,
455
102
                                                   std::unique_ptr<QLRowBlock>* rowblock) {
456
  // Populate the result set to return the "applied" status, and optionally the hash / primary key
457
  // and the present column values if the condition is not satisfied and the row does exist
458
  // (value_map is not empty).
459
102
  const bool return_present_values = !should_apply && !table_row.IsEmpty();
460
102
  const size_t num_key_columns =
461
98
      pk_doc_key_ ? schema_->num_key_columns() : schema_->num_hash_key_columns();
462
102
  std::vector<ColumnSchema> columns;
463
102
  columns.emplace_back(ColumnSchema("[applied]", BOOL));
464
102
  if (return_present_values) {
465
22
    columns.insert(columns.end(), schema_->columns().begin(),
466
22
                   schema_->columns().begin() + num_key_columns);
467
22
    columns.insert(columns.end(), static_projection.columns().begin(),
468
22
                   static_projection.columns().end());
469
22
    columns.insert(columns.end(), non_static_projection.columns().begin(),
470
22
                   non_static_projection.columns().end());
471
22
  }
472
102
  rowblock->reset(new QLRowBlock(Schema(columns, 0)));
473
102
  QLRow& row = rowblock->get()->Extend();
474
102
  row.mutable_column(0)->set_bool_value(should_apply);
475
102
  size_t col_idx = 1;
476
102
  if (return_present_values) {
477
22
    RETURN_NOT_OK(PopulateRow(table_row, *schema_, 0, num_key_columns, &row, &col_idx));
478
22
    RETURN_NOT_OK(PopulateRow(table_row, static_projection, &row, &col_idx));
479
22
    RETURN_NOT_OK(PopulateRow(table_row, non_static_projection, &row, &col_idx));
480
22
  }
481
482
102
  return Status::OK();
483
102
}
484
485
Status QLWriteOperation::PopulateStatusRow(const DocOperationApplyData& data,
486
                                           const bool should_apply,
487
                                           const QLTableRow& table_row,
488
271
                                           std::unique_ptr<QLRowBlock>* rowblock) {
489
271
  std::vector<ColumnSchema> columns;
490
271
  columns.emplace_back(ColumnSchema("[applied]", BOOL));
491
271
  columns.emplace_back(ColumnSchema("[message]", STRING));
492
271
  columns.insert(columns.end(), schema_->columns().begin(), schema_->columns().end());
493
494
271
  rowblock->reset(new QLRowBlock(Schema(columns, 0)));
495
271
  QLRow& row = rowblock->get()->Extend();
496
271
  row.mutable_column(0)->set_bool_value(should_apply);
497
  // No message unless there is an error (then message will be set in executor).
498
499
  // If not applied report the existing row values as for regular if clause.
500
271
  if (!should_apply) {
501
25
    for (size_t i = 0; i < schema_->num_columns(); i++) {
502
20
      boost::optional<const QLValuePB&> col_val = table_row.GetValue(schema_->column_id(i));
503
20
      if (col_val.is_initialized()) {
504
16
        *(row.mutable_column(i + 2)) = *col_val;
505
16
      }
506
20
    }
507
5
  }
508
509
271
  return Status::OK();
510
271
}
511
512
// Check if a duplicate value is inserted into a unique index.
513
2.33k
Result<bool> QLWriteOperation::HasDuplicateUniqueIndexValue(const DocOperationApplyData& data) {
514
2
  VLOG(3) << "Looking for collisions in\n"
515
2
          << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db());
516
  // We need to check backwards only for backfilled entries.
517
2.33k
  bool ret =
518
2.33k
      VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kForward)) ||
519
1.88k
      (request_.is_backfill() &&
520
1.88k
       VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kBackward)));
521
2.33k
  if (!ret) {
522
18.4E
    VLOG(3) << "No collisions found";
523
1.88k
  }
524
2.33k
  return ret;
525
2.33k
}
526
527
Result<bool> QLWriteOperation::HasDuplicateUniqueIndexValue(
528
2.87k
    const DocOperationApplyData& data, Direction direction) {
529
1
  VLOG(2) << "Looking for collision while going " << yb::ToString(direction)
530
1
          << ". Trying to insert " << *pk_doc_key_;
531
2.87k
  auto requested_read_time = data.read_time;
532
2.87k
  if (direction == Direction::kForward) {
533
2.33k
    return HasDuplicateUniqueIndexValue(data, requested_read_time);
534
2.33k
  }
535
536
540
  auto iter = CreateIntentAwareIterator(
537
540
      data.doc_write_batch->doc_db(),
538
540
      BloomFilterMode::USE_BLOOM_FILTER,
539
540
      pk_doc_key_->Encode().AsSlice(),
540
540
      request_.query_id(),
541
540
      txn_op_context_,
542
540
      data.deadline,
543
540
      ReadHybridTime::Max());
544
545
540
  HybridTime oldest_past_min_ht = VERIFY_RESULT(FindOldestOverwrittenTimestamp(
546
540
      iter.get(), SubDocKey(*pk_doc_key_), requested_read_time.read));
547
540
  const HybridTime oldest_past_min_ht_liveness =
548
540
      VERIFY_RESULT(FindOldestOverwrittenTimestamp(
549
540
          iter.get(),
550
540
          SubDocKey(*pk_doc_key_, PrimitiveValue::kLivenessColumn),
551
540
          requested_read_time.read));
552
540
  oldest_past_min_ht.MakeAtMost(oldest_past_min_ht_liveness);
553
540
  if (!oldest_past_min_ht.is_valid()) {
554
531
    return false;
555
531
  }
556
9
  return HasDuplicateUniqueIndexValue(
557
9
      data, ReadHybridTime::SingleTime(oldest_past_min_ht));
558
9
}
559
560
Result<bool> QLWriteOperation::HasDuplicateUniqueIndexValue(
561
2.34k
    const DocOperationApplyData& data, ReadHybridTime read_time) {
562
  // Set up the iterator to read the current primary key associated with the index key.
563
2.34k
  DocQLScanSpec spec(*unique_index_key_schema_, *pk_doc_key_, request_.query_id(), true);
564
2.34k
  DocRowwiseIterator iterator(
565
2.34k
      *unique_index_key_schema_,
566
2.34k
      *schema_,
567
2.34k
      txn_op_context_,
568
2.34k
      data.doc_write_batch->doc_db(),
569
2.34k
      data.deadline,
570
2.34k
      read_time);
571
2.34k
  RETURN_NOT_OK(iterator.Init(spec));
572
573
  // It is a duplicate value if the index key exists already and the index value (corresponding to
574
  // the indexed table's primary key) is not the same.
575
2.34k
  if (!VERIFY_RESULT(iterator.HasNext())) {
576
18.4E
    VLOG(2) << "No collision found while checking at " << yb::ToString(read_time);
577
1.78k
    return false;
578
1.78k
  }
579
561
  QLTableRow table_row;
580
561
  RETURN_NOT_OK(iterator.NextRow(&table_row));
581
561
  std::unordered_set<ColumnId> key_column_ids(unique_index_key_schema_->column_ids().begin(),
582
561
                                              unique_index_key_schema_->column_ids().end());
583
1.84k
  for (const auto& column_value : request_.column_values()) {
584
1.84k
    ColumnId column_id(column_value.column_id());
585
1.84k
    if (key_column_ids.count(column_id) > 0) {
586
1.69k
      boost::optional<const QLValuePB&> existing_value = table_row.GetValue(column_id);
587
1.69k
      const QLValuePB& new_value = column_value.expr().value();
588
1.69k
      if (existing_value && *existing_value != new_value) {
589
0
        VLOG(2) << "Found collision while checking at " << yb::ToString(read_time)
590
0
                << "\nExisting: " << yb::ToString(*existing_value)
591
0
                << " vs New: " << yb::ToString(new_value)
592
0
                << "\nUsed read time as " << yb::ToString(data.read_time);
593
0
        DVLOG(3) << "DocDB is now:\n"
594
0
                 << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db());
595
451
        return true;
596
451
      }
597
1.69k
    }
598
1.84k
  }
599
600
3
  VLOG(2) << "No collision while checking at " << yb::ToString(read_time);
601
110
  return false;
602
561
}
603
604
Result<HybridTime> QLWriteOperation::FindOldestOverwrittenTimestamp(
605
    IntentAwareIterator* iter,
606
    const SubDocKey& sub_doc_key,
607
1.07k
    HybridTime min_read_time) {
608
1.07k
  HybridTime result;
609
18.4E
  VLOG(3) << "Doing iter->Seek " << *pk_doc_key_;
610
1.07k
  iter->Seek(*pk_doc_key_);
611
1.07k
  if (iter->valid()) {
612
982
    const KeyBytes bytes = sub_doc_key.EncodeWithoutHt();
613
982
    const Slice& sub_key_slice = bytes.AsSlice();
614
982
    result = VERIFY_RESULT(
615
982
        iter->FindOldestRecord(sub_key_slice, min_read_time));
616
0
    VLOG(2) << "iter->FindOldestRecord returned " << result << " for "
617
0
            << SubDocKey::DebugSliceToString(sub_key_slice);
618
94
  } else {
619
18.4E
    VLOG(3) << "iter->Seek " << *pk_doc_key_ << " turned out to be invalid";
620
94
  }
621
1.07k
  return result;
622
1.07k
}
623
624
Status QLWriteOperation::ApplyForJsonOperators(
625
    std::unordered_map<ColumnIdRep, rapidjson::Document>* res_map,
626
    const QLColumnValuePB& column_value,
627
    const DocOperationApplyData& data,
628
    const DocPath& sub_path, const MonoDelta& ttl,
629
    const UserTimeMicros& user_timestamp,
630
    const ColumnSchema& column,
631
    QLTableRow* existing_row,
632
107
    bool is_insert) {
633
107
  using common::Jsonb;
634
  // Read the json column value inorder to perform a read modify write.
635
107
  QLExprResult temp;
636
107
  ColumnIdRep col_id = column_value.column_id();
637
  // Do we need to read the column.
638
107
  bool read_needed = res_map->find(col_id) == res_map->end();
639
107
  bool is_null = false;
640
107
  if (read_needed) {
641
68
    auto emplace_result = res_map->emplace(col_id, rapidjson::Document());
642
68
    RETURN_NOT_OK(existing_row->ReadColumn(column_value.column_id(), temp.Writer()));
643
68
    const auto& ql_value = temp.Value();
644
68
    if (!IsNull(ql_value)) {
645
50
      Jsonb jsonb(std::move(ql_value.jsonb_value()));
646
50
      RETURN_NOT_OK(jsonb.ToRapidJson(&emplace_result.first->second));
647
18
    } else {
648
18
      is_null = true;
649
18
    }
650
68
  }
651
107
  auto iter = res_map->find(col_id);
652
107
  if (is_null) {
653
18
    if (!is_insert && column_value.json_args_size() > 1) {
654
1
      return STATUS_SUBSTITUTE(QLError, "JSON path depth should be 1 for upsert",
655
1
        column_value.ShortDebugString());
656
1
    }
657
17
    common::Jsonb empty_jsonb;
658
17
    RETURN_NOT_OK(empty_jsonb.FromString("{}"));
659
17
    QLTableColumn& column = existing_row->AllocColumn(column_value.column_id());
660
17
    column.value.set_jsonb_value(empty_jsonb.MoveSerializedJsonb());
661
662
17
    Jsonb jsonb(column.value.jsonb_value());
663
17
    RETURN_NOT_OK(jsonb.ToRapidJson(&iter->second));
664
17
  }
665
666
  // Deserialize the rhs.
667
106
  Jsonb rhs(std::move(column_value.expr().value().jsonb_value()));
668
106
  rapidjson::Document rhs_doc(&iter->second.GetAllocator());
669
106
  RETURN_NOT_OK(rhs.ToRapidJson(&rhs_doc));
670
671
  // Update the json value.
672
104
  rapidjson::Value::MemberIterator memberit;
673
104
  rapidjson::Value::ValueIterator valueit;
674
104
  bool last_elem_object;
675
104
  rapidjson::Value* node = &iter->second;
676
677
104
  int i = 0;
678
104
  auto status = FindMemberForIndex(column_value, i, node, &memberit, &valueit,
679
104
      &last_elem_object, is_insert);
680
130
  for (i = 1; i < column_value.json_args_size() && status.ok(); i++) {
681
23
    node = (last_elem_object) ? &(memberit->value) : &(*valueit);
682
26
    status = FindMemberForIndex(column_value, i, node, &memberit, &valueit,
683
26
        &last_elem_object, is_insert);
684
26
  }
685
686
104
  bool update_missing = false;
687
104
  if (is_insert) {
688
0
    RETURN_NOT_OK(status);
689
104
  } else {
690
104
    update_missing = !status.ok();
691
104
  }
692
693
104
  if (update_missing) {
694
    // NOTE: lhs path cannot exceed by more than one hop
695
47
    if (last_elem_object && i == column_value.json_args_size()) {
696
40
      auto val = column_value.json_args(i - 1).operand().value().string_value();
697
40
      rapidjson::Value v(
698
40
          val.c_str(),
699
40
          narrow_cast<rapidjson::SizeType>(val.size()), iter->second.GetAllocator());
700
40
      node->AddMember(v, rhs_doc, iter->second.GetAllocator());
701
7
    } else {
702
7
      RETURN_NOT_OK(status);
703
7
    }
704
57
  } else if (last_elem_object) {
705
56
    memberit->value = rhs_doc.Move();
706
1
  } else {
707
1
    *valueit = rhs_doc.Move();
708
1
  }
709
710
97
  return Status::OK();
711
104
}
712
713
Status QLWriteOperation::ApplyForSubscriptArgs(const QLColumnValuePB& column_value,
714
                                               const QLTableRow& existing_row,
715
                                               const DocOperationApplyData& data,
716
                                               const MonoDelta& ttl,
717
                                               const UserTimeMicros& user_timestamp,
718
                                               const ColumnSchema& column,
719
30
                                               DocPath* sub_path) {
720
30
  QLExprResult expr_result;
721
30
  RETURN_NOT_OK(EvalExpr(column_value.expr(), existing_row, expr_result.Writer()));
722
30
  const yb::bfql::TSOpcode write_instr = GetTSWriteInstruction(column_value.expr());
723
30
  const SubDocument& sub_doc =
724
30
      SubDocument::FromQLValuePB(expr_result.Value(), column.sorting_type(), write_instr);
725
30
  RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp));
726
727
  // Setting the value for a sub-column
728
  // Currently we only support two cases here: `map['key'] = v` and `list[index] = v`)
729
  // Any other case should be rejected by the semantic analyser before getting here
730
  // Later when we support frozen or nested collections this code may need refactoring
731
30
  DCHECK_EQ(column_value.subscript_args().size(), 1);
732
0
  DCHECK(column_value.subscript_args(0).has_value()) << "An index must be a constant";
733
30
  switch (column.type()->main()) {
734
15
    case MAP: {
735
15
      const PrimitiveValue &pv = PrimitiveValue::FromQLValuePB(
736
15
          column_value.subscript_args(0).value(),
737
15
          SortingType::kNotSpecified);
738
15
      sub_path->AddSubKey(pv);
739
15
      RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
740
15
          *sub_path, sub_doc, data.read_time, data.deadline,
741
15
          request_.query_id(), ttl, user_timestamp));
742
15
      break;
743
15
    }
744
15
    case LIST: {
745
15
      MonoDelta default_ttl = schema_->table_properties().HasDefaultTimeToLive() ?
746
0
          MonoDelta::FromMilliseconds(schema_->table_properties().DefaultTimeToLive()) :
747
15
          MonoDelta::kMax;
748
749
15
      int target_cql_index = column_value.subscript_args(0).value().int32_value();
750
15
      RETURN_NOT_OK(data.doc_write_batch->ReplaceCqlInList(
751
15
          *sub_path, target_cql_index, sub_doc, data.read_time, data.deadline, request_.query_id(),
752
15
          default_ttl, ttl));
753
10
      break;
754
15
    }
755
0
    default: {
756
0
      LOG(ERROR) << "Unexpected type for setting subcolumn: "
757
0
                 << column.type()->ToString();
758
0
    }
759
30
  }
760
25
  return Status::OK();
761
30
}
762
763
Status QLWriteOperation::ApplyForRegularColumns(const QLColumnValuePB& column_value,
764
                                                const QLTableRow& existing_row,
765
                                                const DocOperationApplyData& data,
766
                                                const DocPath& sub_path, const MonoDelta& ttl,
767
                                                const UserTimeMicros& user_timestamp,
768
                                                const ColumnSchema& column,
769
                                                const ColumnId& column_id,
770
4.95M
                                                QLTableRow* new_row) {
771
4.95M
  using yb::bfql::TSOpcode;
772
773
  // Typical case, setting a columns value
774
4.95M
  QLExprResult expr_result;
775
4.95M
  RETURN_NOT_OK(EvalExpr(column_value.expr(), existing_row, expr_result.Writer()));
776
4.95M
  const TSOpcode write_instr = GetTSWriteInstruction(column_value.expr());
777
4.95M
  const SubDocument& sub_doc =
778
4.95M
      SubDocument::FromQLValuePB(expr_result.Value(), column.sorting_type(), write_instr);
779
4.95M
  switch (write_instr) {
780
0
    case TSOpcode::kToJson: FALLTHROUGH_INTENDED;
781
4.96M
    case TSOpcode::kScalarInsert:
782
4.96M
          RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
783
4.96M
              sub_path, sub_doc, data.read_time, data.deadline,
784
4.96M
              request_.query_id(), ttl, user_timestamp));
785
4.96M
      break;
786
9
    case TSOpcode::kMapExtend:
787
19
    case TSOpcode::kSetExtend:
788
24
    case TSOpcode::kMapRemove:
789
29
    case TSOpcode::kSetRemove:
790
29
          RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp));
791
29
          RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
792
29
            sub_path, sub_doc, data.read_time, data.deadline, request_.query_id(), ttl));
793
29
      break;
794
5
    case TSOpcode::kListPrepend:
795
5
          sub_doc.SetExtendOrder(ListExtendOrder::PREPEND_BLOCK);
796
5
          FALLTHROUGH_INTENDED;
797
14
    case TSOpcode::kListAppend:
798
14
          RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp));
799
14
          RETURN_NOT_OK(data.doc_write_batch->ExtendList(
800
14
              sub_path, sub_doc, data.read_time, data.deadline, request_.query_id(), ttl));
801
14
      break;
802
5
    case TSOpcode::kListRemove:
803
      // TODO(akashnil or mihnea) this should call RemoveFromList once thats implemented
804
      // Currently list subtraction is computed in memory using builtin call so this
805
      // case should never be reached. Once it is implemented the corresponding case
806
      // from EvalQLExpressionPB should be uncommented to enable this optimization.
807
5
          RETURN_NOT_OK(CheckUserTimestampForCollections(user_timestamp));
808
5
          RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
809
5
              sub_path, sub_doc, data.read_time, data.deadline,
810
5
              request_.query_id(), ttl, user_timestamp));
811
5
      break;
812
0
    default:
813
0
      LOG(FATAL) << "Unsupported operation: " << static_cast<int>(write_instr);
814
0
      break;
815
4.97M
  }
816
817
4.97M
  if (update_indexes_) {
818
38.0k
    new_row->AllocColumn(column_id, expr_result.Value());
819
38.0k
  }
820
4.97M
  return Status::OK();
821
4.97M
}
822
823
3.33M
Status QLWriteOperation::Apply(const DocOperationApplyData& data) {
824
3.33M
  QLTableRow existing_row;
825
3.33M
  if (request_.has_if_expr()) {
826
    // Check if the if-condition is satisfied.
827
133
    bool should_apply = true;
828
133
    Schema static_projection, non_static_projection;
829
133
    RETURN_NOT_OK(ReadColumns(data, &static_projection, &non_static_projection, &existing_row));
830
133
    RETURN_NOT_OK(EvalCondition(request_.if_expr().condition(), existing_row, &should_apply));
831
    // Set the response accordingly.
832
133
    response_->set_applied(should_apply);
833
133
    if (!should_apply && request_.else_error()) {
834
19
      return ql::ErrorStatus(ql::ErrorCode::CONDITION_NOT_SATISFIED); // QLError
835
114
    } else if (request_.returns_status()) {
836
12
      RETURN_NOT_OK(PopulateStatusRow(data, should_apply, existing_row, &rowblock_));
837
102
    } else {
838
102
      RETURN_NOT_OK(PopulateConditionalDmlRow(data,
839
102
          should_apply,
840
102
          existing_row,
841
102
          static_projection,
842
102
          non_static_projection,
843
102
          &rowblock_));
844
102
    }
845
846
    // If we do not need to apply we are already done.
847
114
    if (!should_apply) {
848
28
      response_->set_status(QLResponsePB::YQL_STATUS_OK);
849
28
      return Status::OK();
850
28
    }
851
852
86
    TEST_PAUSE_IF_FLAG(TEST_pause_write_apply_after_if);
853
3.33M
  } else if (RequireReadForExpressions(request_) || request_.returns_status()) {
854
27.7k
    RETURN_NOT_OK(ReadColumns(data, nullptr, nullptr, &existing_row));
855
27.7k
    if (request_.returns_status()) {
856
259
      RETURN_NOT_OK(PopulateStatusRow(data, /* should_apply = */ true, existing_row, &rowblock_));
857
259
    }
858
27.7k
  }
859
860
18.4E
  VLOG(3) << "insert_into_unique_index_ is " << insert_into_unique_index_;
861
3.33M
  if (insert_into_unique_index_ && VERIFY_RESULT(HasDuplicateUniqueIndexValue(data))) {
862
0
    VLOG(3) << "set_applied is set to " << false << " for over " << yb::ToString(existing_row);
863
451
    response_->set_applied(false);
864
451
    response_->set_status(QLResponsePB::YQL_STATUS_OK);
865
451
    return Status::OK();
866
451
  }
867
868
3.33M
  const MonoDelta ttl = request_ttl();
869
870
3.33M
  const UserTimeMicros user_timestamp = request_.has_user_timestamp_usec() ?
871
3.33M
      request_.user_timestamp_usec() : Value::kInvalidUserTimestamp;
872
873
  // Initialize the new row being written to either the existing row if read, or just populate
874
  // the primary key.
875
3.33M
  QLTableRow new_row;
876
3.33M
  if (!existing_row.IsEmpty()) {
877
12.0k
    new_row = existing_row;
878
3.32M
  } else {
879
3.32M
    size_t idx = 0;
880
3.34M
    for (const QLExpressionPB& expr : request_.hashed_column_values()) {
881
3.34M
      new_row.AllocColumn(schema_->column_id(idx), expr.value());
882
3.34M
      idx++;
883
3.34M
    }
884
1.71M
    for (const QLExpressionPB& expr : request_.range_column_values()) {
885
1.71M
      new_row.AllocColumn(schema_->column_id(idx), expr.value());
886
1.71M
      idx++;
887
1.71M
    }
888
3.32M
  }
889
890
3.33M
  switch (request_.type()) {
891
    // QL insert == update (upsert) to be consistent with Cassandra's semantics. In either
892
    // INSERT or UPDATE, if non-key columns are specified, they will be inserted which will cause
893
    // the primary key to be inserted also when necessary. Otherwise, we should insert the
894
    // primary key at least.
895
2.99M
    case QLWriteRequestPB::QL_STMT_INSERT:
896
3.32M
    case QLWriteRequestPB::QL_STMT_UPDATE: {
897
      // Add the appropriate liveness column only for inserts.
898
      // We never use init markers for QL to ensure we perform writes without any reads to
899
      // ensure our write path is fast while complicating the read path a bit.
900
3.32M
      auto is_insert = request_.type() == QLWriteRequestPB::QL_STMT_INSERT;
901
3.32M
      if (is_insert && encoded_pk_doc_key_) {
902
2.98M
        const DocPath sub_path(encoded_pk_doc_key_.as_slice(), PrimitiveValue::kLivenessColumn);
903
2.98M
        const auto value = Value(PrimitiveValue(), ttl, user_timestamp);
904
2.98M
        RETURN_NOT_OK(data.doc_write_batch->SetPrimitive(
905
2.98M
            sub_path, value, data.read_time, data.deadline, request_.query_id()));
906
2.98M
      }
907
908
3.32M
      std::unordered_map<ColumnIdRep, rapidjson::Document> res_map;
909
4.97M
      for (const auto& column_value : request_.column_values()) {
910
4.97M
        if (!column_value.has_column_id()) {
911
0
          return STATUS_FORMAT(InvalidArgument, "column id missing: $0",
912
0
                               column_value.DebugString());
913
0
        }
914
4.97M
        const ColumnId column_id(column_value.column_id());
915
4.97M
        const auto maybe_column = schema_->column_by_id(column_id);
916
4.97M
        RETURN_NOT_OK(maybe_column);
917
4.97M
        const ColumnSchema& column = *maybe_column;
918
919
4.97M
        DocPath sub_path(
920
4.97M
            column.is_static() ?
921
4.97M
                encoded_hashed_doc_key_.as_slice() : encoded_pk_doc_key_.as_slice(),
922
4.97M
            PrimitiveValue(column_id));
923
924
4.97M
        QLValue expr_result;
925
4.97M
        if (!column_value.json_args().empty()) {
926
107
          RETURN_NOT_OK(ApplyForJsonOperators(&res_map, column_value, data, sub_path, ttl,
927
107
                                              user_timestamp, column, &new_row, is_insert));
928
4.97M
        } else if (!column_value.subscript_args().empty()) {
929
30
          RETURN_NOT_OK(ApplyForSubscriptArgs(column_value, existing_row, data, ttl,
930
30
                                              user_timestamp, column, &sub_path));
931
4.97M
        } else {
932
4.97M
          RETURN_NOT_OK(ApplyForRegularColumns(column_value, existing_row, data, sub_path, ttl,
933
4.97M
                                               user_timestamp, column, column_id, &new_row));
934
4.97M
        }
935
4.97M
      }
936
      // go over the map and generate (aggregated) SubDocument
937
3.32M
      for (auto& entry : res_map) {
938
57
        const ColumnId column_id(entry.first);
939
57
        const auto maybe_column = schema_->column_by_id(column_id);
940
57
        RETURN_NOT_OK(maybe_column);
941
57
        const ColumnSchema& column = *maybe_column;
942
57
        DocPath sub_path(
943
57
            column.is_static() ?
944
57
                encoded_hashed_doc_key_.as_slice() : encoded_pk_doc_key_.as_slice(),
945
57
            PrimitiveValue(column_id));
946
947
        // Now write the new json value back.
948
57
        common::Jsonb jsonb_result;
949
57
        RETURN_NOT_OK(jsonb_result.FromRapidJson(entry.second));
950
        // Update the current row with the final value.
951
57
        QLValue val;
952
57
        *(val.mutable_jsonb_value()) = std::move(jsonb_result.MoveSerializedJsonb());
953
954
57
        const SubDocument& sub_doc =
955
57
            SubDocument::FromQLValuePB(val.value(), column.sorting_type(),
956
57
                                 yb::bfql::TSOpcode::kScalarInsert);
957
57
        RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
958
57
            sub_path, sub_doc, data.read_time, data.deadline,
959
57
            request_.query_id(), ttl, user_timestamp));
960
961
57
        new_row.AllocColumn(column_id).value = val.value();
962
57
      }
963
964
3.32M
      if (update_indexes_) {
965
27.3k
        RETURN_NOT_OK(UpdateIndexes(existing_row, new_row));
966
27.3k
      }
967
3.32M
      break;
968
3.32M
    }
969
5.74k
    case QLWriteRequestPB::QL_STMT_DELETE: {
970
      // We have three cases:
971
      // 1. If non-key columns are specified, we delete only those columns.
972
      // 2. Otherwise, if range cols are missing, this must be a range delete.
973
      // 3. Otherwise, this is a normal delete.
974
      // Analyzer ensures these are the only cases before getting here (e.g. range deletes cannot
975
      // specify non-key columns).
976
5.74k
      if (request_.column_values_size() > 0) {
977
        // Delete the referenced columns only.
978
46
        for (const auto& column_value : request_.column_values()) {
979
0
          CHECK(column_value.has_column_id())
980
0
              << "column id missing: " << column_value.DebugString();
981
46
          const ColumnId column_id(column_value.column_id());
982
46
          const auto& column = VERIFY_RESULT_REF(schema_->column_by_id(column_id));
983
46
          const DocPath sub_path(
984
46
              column.is_static() ?
985
45
                encoded_hashed_doc_key_.as_slice() : encoded_pk_doc_key_.as_slice(),
986
46
              PrimitiveValue(column_id));
987
46
          RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(sub_path,
988
46
              data.read_time, data.deadline, request_.query_id(), user_timestamp));
989
46
          if (update_indexes_) {
990
39
            new_row.MarkTombstoned(column_id);
991
39
          }
992
46
        }
993
37
        if (update_indexes_) {
994
31
          RETURN_NOT_OK(UpdateIndexes(existing_row, new_row));
995
31
        }
996
5.70k
      } else if (IsRangeOperation(request_, *schema_)) {
997
        // If the range columns are not specified, we read everything and delete all rows for
998
        // which the where condition matches.
999
1000
        // Create the schema projection -- range deletes cannot reference non-primary key columns,
1001
        // so the non-static projection is all we need, it should contain all referenced columns.
1002
19
        Schema static_projection;
1003
19
        Schema projection;
1004
19
        RETURN_NOT_OK(CreateProjections(*schema_, request_.column_refs(),
1005
19
            &static_projection, &projection));
1006
1007
        // Construct the scan spec basing on the WHERE condition.
1008
19
        vector<PrimitiveValue> hashed_components;
1009
19
        RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues(
1010
19
            request_.hashed_column_values(), *schema_, 0,
1011
19
            schema_->num_hash_key_columns(), &hashed_components));
1012
1013
19
        boost::optional<int32_t> hash_code = request_.has_hash_code()
1014
19
                                             ? boost::make_optional<int32_t>(request_.hash_code())
1015
0
                                             : boost::none;
1016
19
        const auto range_covers_whole_partition_key = !request_.has_where_expr();
1017
19
        const auto include_static_columns_in_scan = range_covers_whole_partition_key &&
1018
10
                                                    schema_->has_statics();
1019
19
        DocQLScanSpec spec(*schema_,
1020
19
                           hash_code,
1021
19
                           hash_code, // max hash code.
1022
19
                           hashed_components,
1023
10
                           request_.has_where_expr() ? &request_.where_expr().condition() : nullptr,
1024
19
                           nullptr,
1025
19
                           request_.query_id(),
1026
19
                           true /* is_forward_scan */,
1027
19
                           include_static_columns_in_scan);
1028
1029
        // Create iterator.
1030
19
        DocRowwiseIterator iterator(
1031
19
            projection, *schema_, txn_op_context_,
1032
19
            data.doc_write_batch->doc_db(),
1033
19
            data.deadline, data.read_time);
1034
19
        RETURN_NOT_OK(iterator.Init(spec));
1035
1036
        // Iterate through rows and delete those that match the condition.
1037
        // TODO(mihnea): We do not lock here, so other write transactions coming in might appear
1038
        // partially applied if they happen in the middle of a ranged delete.
1039
162
        while (VERIFY_RESULT(iterator.HasNext())) {
1040
162
          existing_row.Clear();
1041
162
          RETURN_NOT_OK(iterator.NextRow(&existing_row));
1042
1043
          // Match the row with the where condition before deleting it.
1044
162
          bool match = false;
1045
162
          RETURN_NOT_OK(spec.Match(existing_row, &match));
1046
162
          if (match) {
1047
159
            const DocPath row_path(iterator.row_key());
1048
159
            RETURN_NOT_OK(DeleteRow(row_path, data.doc_write_batch, data.read_time, data.deadline));
1049
159
            if (update_indexes_) {
1050
7
              liveness_column_exists_ = iterator.LivenessColumnExists();
1051
7
              RETURN_NOT_OK(UpdateIndexes(existing_row, new_row));
1052
7
            }
1053
159
          }
1054
162
        }
1055
19
        data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt());
1056
5.68k
      } else {
1057
        // Otherwise, delete the referenced row (all columns).
1058
5.68k
        RETURN_NOT_OK(DeleteRow(DocPath(encoded_pk_doc_key_.as_slice()), data.doc_write_batch,
1059
5.68k
                                data.read_time, data.deadline));
1060
5.68k
        if (update_indexes_) {
1061
209
          RETURN_NOT_OK(UpdateIndexes(existing_row, new_row));
1062
209
        }
1063
5.68k
      }
1064
5.74k
      break;
1065
3.33M
    }
1066
3.33M
  }
1067
1068
3.33M
  response_->set_status(QLResponsePB::YQL_STATUS_OK);
1069
1070
3.33M
  return Status::OK();
1071
3.33M
}
1072
1073
Status QLWriteOperation::DeleteRow(const DocPath& row_path, DocWriteBatch* doc_write_batch,
1074
5.83k
                                   const ReadHybridTime& read_ht, const CoarseTimePoint deadline) {
1075
5.83k
  if (request_.has_user_timestamp_usec()) {
1076
    // If user_timestamp is provided, we need to add a tombstone for each individual
1077
    // column in the schema since we don't want to analyze this on the read path.
1078
78
    for (auto i = schema_->num_key_columns(); i < schema_->num_columns(); i++) {
1079
52
      const DocPath sub_path(row_path.encoded_doc_key(),
1080
52
                             PrimitiveValue(schema_->column_id(i)));
1081
52
      RETURN_NOT_OK(doc_write_batch->DeleteSubDoc(sub_path,
1082
52
                                                  read_ht,
1083
52
                                                  deadline,
1084
52
                                                  request_.query_id(),
1085
52
                                                  request_.user_timestamp_usec()));
1086
52
    }
1087
1088
    // Delete the liveness column as well.
1089
26
    const DocPath liveness_column(row_path.encoded_doc_key(), PrimitiveValue::kLivenessColumn);
1090
26
    RETURN_NOT_OK(doc_write_batch->DeleteSubDoc(liveness_column,
1091
26
                                                read_ht,
1092
26
                                                deadline,
1093
26
                                                request_.query_id(),
1094
26
                                                request_.user_timestamp_usec()));
1095
5.81k
  } else {
1096
5.81k
    RETURN_NOT_OK(doc_write_batch->DeleteSubDoc(row_path, read_ht, deadline));
1097
5.81k
  }
1098
1099
5.83k
  return Status::OK();
1100
5.83k
}
1101
1102
namespace {
1103
1104
YB_DEFINE_ENUM(ValueState, (kNull)(kNotNull)(kMissing));
1105
1106
1.97k
ValueState GetValueState(const QLTableRow& row, const ColumnId column_id) {
1107
1.97k
  const auto value = row.GetValue(column_id);
1108
1.97k
  return !value ? ValueState::kMissing : IsNull(*value) ? ValueState::kNull : ValueState::kNotNull;
1109
1.97k
}
1110
1111
} // namespace
1112
1113
Result<bool> QLWriteOperation::IsRowDeleted(const QLTableRow& existing_row,
1114
49.7k
                                            const QLTableRow& new_row) const {
1115
  // Delete the whole row?
1116
49.7k
  if (request_.type() == QLWriteRequestPB::QL_STMT_DELETE && request_.column_values().empty()) {
1117
311
    return true;
1118
311
  }
1119
1120
49.4k
  if (existing_row.IsEmpty()) { // If the row doesn't exist, don't check further.
1121
36.6k
    return false;
1122
36.6k
  }
1123
1124
  // For update/delete, if there is no liveness column, the row will be deleted after the DML unless
1125
  // a non-null column still remains.
1126
12.8k
  if ((request_.type() == QLWriteRequestPB::QL_STMT_UPDATE ||
1127
11.4k
       request_.type() == QLWriteRequestPB::QL_STMT_DELETE) &&
1128
1.61k
      !liveness_column_exists_) {
1129
1.25k
    for (size_t idx = schema_->num_key_columns(); idx < schema_->num_columns(); idx++) {
1130
1.03k
      if (schema_->column(idx).is_static()) {
1131
2
        continue;
1132
2
      }
1133
1.03k
      const ColumnId column_id = schema_->column_id(idx);
1134
1.03k
      switch (GetValueState(new_row, column_id)) {
1135
582
        case ValueState::kNull: continue;
1136
448
        case ValueState::kNotNull: return false;
1137
2
        case ValueState::kMissing: break;
1138
2
      }
1139
2
      switch (GetValueState(existing_row, column_id)) {
1140
0
        case ValueState::kNull: continue;
1141
0
        case ValueState::kNotNull: return false;
1142
2
        case ValueState::kMissing: break;
1143
2
      }
1144
2
    }
1145
1146
666
    #if DCHECK_IS_ON()
1147
    // If (for all non_pk cols new_row has value NULL/kMissing i.e., the UPDATE statement only sets
1148
    //     some/all cols to NULL)
1149
    // then (existing_row should have a value read from docdb for all non_pk
1150
    //       cols that are kMissing in new_row so that we can decide if the row is deleted or not).
1151
1152
218
    bool skip_check = false;
1153
692
    for (size_t idx = schema_->num_key_columns(); idx < schema_->num_columns(); idx++) {
1154
474
        const ColumnId column_id = schema_->column_id(idx);
1155
474
        if (GetValueState(new_row, column_id) == ValueState::kNotNull) skip_check = true;
1156
474
    }
1157
1158
218
    if (!skip_check) {
1159
684
        for (size_t idx = schema_->num_key_columns(); idx < schema_->num_columns(); idx++) {
1160
468
            const ColumnId column_id = schema_->column_id(idx);
1161
468
            if (GetValueState(new_row, column_id) == ValueState::kMissing) {
1162
0
              DCHECK(GetValueState(existing_row, column_id) != ValueState::kMissing);
1163
0
            }
1164
468
        }
1165
216
    }
1166
218
    #endif
1167
1168
218
    return true;
1169
12.1k
  }
1170
1171
12.1k
  return false;
1172
12.1k
}
1173
1174
3.33M
MonoDelta QLWriteOperation::request_ttl() const {
1175
3.33M
  return request_.has_ttl() ? MonoDelta::FromMilliseconds(request_.ttl()) : Value::kMaxTtl;
1176
3.33M
}
1177
1178
namespace {
1179
1180
100k
QLExpressionPB* NewKeyColumn(QLWriteRequestPB* request, const IndexInfo& index, const size_t idx) {
1181
100k
  return (idx < index.hash_column_count()
1182
48.1k
          ? request->add_hashed_column_values()
1183
52.5k
          : request->add_range_column_values());
1184
100k
}
1185
1186
QLWriteRequestPB* NewIndexRequest(
1187
    const IndexInfo& index,
1188
    QLWriteRequestPB::QLStmtType type,
1189
48.0k
    vector<pair<const IndexInfo*, QLWriteRequestPB>>* index_requests) {
1190
48.0k
  index_requests->emplace_back(&index, QLWriteRequestPB());
1191
48.0k
  QLWriteRequestPB* const request = &index_requests->back().second;
1192
48.0k
  request->set_type(type);
1193
48.0k
  return request;
1194
48.0k
}
1195
1196
} // namespace
1197
1198
27.5k
Status QLWriteOperation::UpdateIndexes(const QLTableRow& existing_row, const QLTableRow& new_row) {
1199
  // Prepare the write requests to update the indexes. There should be at most 2 requests for each
1200
  // index (one insert and one delete).
1201
2
  VLOG(2) << "Updating indexes";
1202
27.5k
  const auto& index_ids = request_.update_index_ids();
1203
27.5k
  index_requests_.reserve(index_ids.size() * 2);
1204
49.7k
  for (const TableId& index_id : index_ids) {
1205
49.7k
    const IndexInfo* index = VERIFY_RESULT(index_map_.FindIndex(index_id));
1206
49.7k
    bool index_key_changed = false;
1207
49.7k
    bool index_pred_existing_row = true;
1208
49.7k
    bool index_pred_new_row = true;
1209
49.7k
    bool is_row_deleted = VERIFY_RESULT(IsRowDeleted(existing_row, new_row));
1210
1211
49.7k
    if (index->where_predicate_spec()) {
1212
4.37k
      RETURN_NOT_OK(EvalCondition(
1213
4.37k
        index->where_predicate_spec()->where_expr().condition(), existing_row,
1214
4.37k
        &index_pred_existing_row));
1215
4.37k
    }
1216
1217
49.7k
    if (is_row_deleted) {
1218
      // If it is a partial index and predicate wasn't satisfied for the existing row
1219
      // which is being deleted, we need to do nothing.
1220
529
      if (index->where_predicate_spec() && !index_pred_existing_row) {
1221
0
        VLOG(3) << "Skip index entry delete for index_id=" << index->table_id() <<
1222
0
          " since predicate not satisfied";
1223
72
        continue;
1224
72
      }
1225
457
      index_key_changed = true;
1226
49.1k
    } else {
1227
49.1k
      VERIFY_RESULT(CreateAndSetupIndexInsertRequest(
1228
49.1k
          this, index->HasWritePermission(), existing_row, new_row, index,
1229
49.1k
          &index_requests_, &index_key_changed, &index_pred_new_row, index_pred_existing_row));
1230
49.1k
    }
1231
1232
49.6k
    bool index_pred_switched_to_false = false;
1233
49.6k
    if (index->where_predicate_spec() &&
1234
4.30k
        !existing_row.IsEmpty() && index_pred_existing_row && !index_pred_new_row)
1235
290
      index_pred_switched_to_false = true;
1236
1237
    // If the index key is changed, delete the current key.
1238
49.6k
    if ((index_key_changed || index_pred_switched_to_false) && index->HasDeletePermission()) {
1239
2.62k
      if (!index_pred_switched_to_false) {
1240
        // 1. In case of a switch of predicate satisfiability to false, we surely have to delete the
1241
        // row. (Even if there wasn't any index key change).
1242
        // 2. But in case of an index key change without predicate satisfiability switch, if the
1243
        // index predicate was already false for the existing row, we have to do nothing.
1244
        // TODO(Piyush): Ensure EvalCondition returns an error if some column is missing.
1245
2.33k
        if (!index_pred_existing_row) {
1246
0
          VLOG(3) << "Skip index entry delete of existing row for index_id=" << index->table_id() <<
1247
0
            " since predicate not satisfied";
1248
506
          continue;
1249
506
        }
1250
2.11k
      }
1251
1252
2.11k
      QLWriteRequestPB* const index_request =
1253
2.11k
          NewIndexRequest(*index, QLWriteRequestPB::QL_STMT_DELETE, &index_requests_);
1254
18.4E
      VLOG(3) << "Issue index entry delete of existing row for index_id=" << index->table_id() <<
1255
18.4E
        " since predicate was satisfied earlier AND (isn't satisfied now (OR) the key has changed)";
1256
1257
7.75k
      for (size_t idx = 0; idx < index->key_column_count(); idx++) {
1258
5.63k
        const auto& index_column = index->column(idx);
1259
5.63k
        QLExpressionPB *key_column = NewKeyColumn(index_request, *index, idx);
1260
1261
        // For old message expr_case() == NOT SET.
1262
        // For new message expr_case == kColumnId when indexing expression is a column-ref.
1263
5.63k
        if (index_column.colexpr.expr_case() != QLExpressionPB::ExprCase::EXPR_NOT_SET &&
1264
5.63k
            index_column.colexpr.expr_case() != QLExpressionPB::ExprCase::kColumnId) {
1265
188
          QLExprResult result;
1266
188
          RETURN_NOT_OK(EvalExpr(index_column.colexpr, existing_row, result.Writer()));
1267
188
          result.MoveTo(key_column->mutable_value());
1268
5.44k
        } else {
1269
5.44k
          auto result = existing_row.GetValue(index_column.indexed_column_id);
1270
5.44k
          if (result) {
1271
5.44k
            key_column->mutable_value()->CopyFrom(*result);
1272
5.44k
          }
1273
5.44k
        }
1274
5.63k
      }
1275
2.11k
    }
1276
49.6k
  }
1277
1278
27.5k
  return Status::OK();
1279
27.5k
}
1280
1281
Result<QLWriteRequestPB*> CreateAndSetupIndexInsertRequest(
1282
    QLExprExecutor* expr_executor,
1283
    bool index_has_write_permission,
1284
    const QLTableRow& existing_row,
1285
    const QLTableRow& new_row,
1286
    const IndexInfo* index,
1287
    vector<pair<const IndexInfo*, QLWriteRequestPB>>* index_requests,
1288
    bool* has_index_key_changed,
1289
    bool* index_pred_new_row,
1290
58.7k
    bool index_pred_existing_row) {
1291
58.7k
  bool index_key_changed = false;
1292
58.7k
  bool update_this_index = false;
1293
58.7k
  unordered_map<size_t, QLValuePB> values;
1294
1295
  // Prepare the new index key.
1296
182k
  for (size_t idx = 0; idx < index->key_column_count(); idx++) {
1297
123k
    const auto& index_column = index->column(idx);
1298
123k
    bool column_changed = true;
1299
1300
    // Column_id should be used without executing "colexpr" for the following cases (we want
1301
    // to avoid executing colexpr as it is less efficient).
1302
    // - Old PROTO messages (expr_case() == NOT SET).
1303
    // - When indexing expression is just a column-ref (expr_case == kColumnId)
1304
123k
    if (index_column.colexpr.expr_case() == QLExpressionPB::ExprCase::EXPR_NOT_SET ||
1305
123k
        index_column.colexpr.expr_case() == QLExpressionPB::ExprCase::kColumnId) {
1306
119k
      auto result = new_row.GetValue(index_column.indexed_column_id);
1307
119k
      if (!existing_row.IsEmpty()) {
1308
        // For each column in the index key, if there is a new value, see if the value is
1309
        // changed from the current value. Else, use the current value.
1310
27.4k
        if (result) {
1311
27.4k
          if (new_row.MatchColumn(index_column.indexed_column_id, existing_row)) {
1312
25.5k
            column_changed = false;
1313
1.90k
          } else {
1314
1.90k
            index_key_changed = true;
1315
1.90k
          }
1316
63
        } else {
1317
          // TODO(Piyush): This else is possibly dead code. It can never happen that the new_row
1318
          // doesn't have some column but the existing one does because we copy the existing one
1319
          // into the new one before this function call.
1320
63
          result = existing_row.GetValue(index_column.indexed_column_id);
1321
63
        }
1322
27.4k
      }
1323
119k
      if (result) {
1324
116k
        values[idx] = std::move(*result);
1325
116k
      }
1326
3.95k
    } else {
1327
3.95k
      QLExprResult result;
1328
3.95k
      if (existing_row.IsEmpty()) {
1329
3.56k
        RETURN_NOT_OK(expr_executor->EvalExpr(index_column.colexpr, new_row, result.Writer()));
1330
384
      } else {
1331
        // For each column in the index key, if there is a new value, see if the value is
1332
        // specified in the new value. Otherwise, use the current value.
1333
384
        if (new_row.IsColumnSpecified(index_column.indexed_column_id)) {
1334
335
          RETURN_NOT_OK(expr_executor->EvalExpr(index_column.colexpr, new_row, result.Writer()));
1335
          // Compare new and existing results of the expression, if the results are equal
1336
          // that means the key is NOT changed in fact even if the column value is changed.
1337
335
          QLExprResult existing_result;
1338
335
          RETURN_NOT_OK(expr_executor->EvalExpr(
1339
335
              index_column.colexpr, existing_row, existing_result.Writer()));
1340
335
          if (result.Value() == existing_result.Value()) {
1341
179
            column_changed = false;
1342
156
          } else {
1343
156
            index_key_changed = true;
1344
156
          }
1345
49
        } else {
1346
          // TODO(Piyush): This else is possibly dead code.
1347
49
          RETURN_NOT_OK(expr_executor->EvalExpr(
1348
49
              index_column.colexpr, existing_row, result.Writer()));
1349
49
        }
1350
384
      }
1351
1352
3.95k
      result.MoveTo(&values[idx]);
1353
3.95k
    }
1354
1355
123k
    if (column_changed) {
1356
97.8k
      update_this_index = true;
1357
97.8k
    }
1358
123k
  }
1359
1360
  // Prepare the covering columns.
1361
73.6k
  for (size_t idx = index->key_column_count(); idx < index->columns().size(); idx++) {
1362
14.9k
    const auto& index_column = index->column(idx);
1363
14.9k
    auto result = new_row.GetValue(index_column.indexed_column_id);
1364
14.9k
    bool column_changed = true;
1365
1366
    // If the index value is changed and there is no new covering column value set, use the
1367
    // current value.
1368
14.9k
    if (index_key_changed) {
1369
2.82k
      if (!result) {
1370
        // TODO(Piyush): This if is possibly dead code.
1371
0
        result = existing_row.GetValue(index_column.indexed_column_id);
1372
0
      }
1373
12.1k
    } else if (!FLAGS_ycql_disable_index_updating_optimization &&
1374
12.1k
        result && new_row.MatchColumn(index_column.indexed_column_id, existing_row)) {
1375
2.21k
      column_changed = false;
1376
2.21k
    }
1377
14.9k
    if (result) {
1378
14.9k
      values[idx] = std::move(*result);
1379
14.9k
    }
1380
1381
14.9k
    if (column_changed) {
1382
12.7k
      update_this_index = true;
1383
12.7k
    }
1384
14.9k
  }
1385
1386
58.7k
  if (has_index_key_changed) {
1387
49.1k
    *has_index_key_changed = index_key_changed;
1388
49.1k
  }
1389
1390
58.7k
  bool new_row_satisfies_idx_pred = true;
1391
58.7k
  if (index->where_predicate_spec()) {
1392
    // TODO(Piyush): Ensure EvalCondition returns an error if some column is missing.
1393
4.38k
    RETURN_NOT_OK(expr_executor->EvalCondition(
1394
4.38k
      index->where_predicate_spec()->where_expr().condition(), new_row,
1395
4.38k
      &new_row_satisfies_idx_pred));
1396
4.38k
    if (index_pred_new_row) {
1397
4.23k
      *index_pred_new_row = new_row_satisfies_idx_pred;
1398
4.23k
    }
1399
1400
4.38k
    if (new_row_satisfies_idx_pred && !index_pred_existing_row) {
1401
      // In case the row is unchanged but the predicate switches to true (can happen if the
1402
      // predicate involves no indexed/covering cols).
1403
1.50k
      if (!update_this_index)
1404
48
        VLOG(3) << "Indexed/covering cols unchanged but predicate switched to true for index_id=" <<
1405
48
          index->table_id();
1406
1.50k
      update_this_index = true;
1407
1.50k
    }
1408
4.38k
  }
1409
1410
58.7k
  if (index_has_write_permission &&
1411
57.6k
      (update_this_index || FLAGS_ycql_disable_index_updating_optimization)) {
1412
    // If this is a partial index and the index predicate is false for the new row, skip the insert.
1413
47.6k
    if (index->where_predicate_spec() && !new_row_satisfies_idx_pred) {
1414
0
      VLOG(3) << "Skip index entry write for index_id=" << index->table_id() <<
1415
0
        " since predicate not satisfied";
1416
1.73k
      return nullptr;
1417
1.73k
    }
1418
1419
45.8k
    QLWriteRequestPB* const index_request =
1420
45.8k
        NewIndexRequest(*index, QLWriteRequestPB::QL_STMT_INSERT, index_requests);
1421
1422
    // Setup the key columns.
1423
140k
    for (size_t idx = 0; idx < index->key_column_count(); idx++) {
1424
95.1k
      QLExpressionPB* const key_column = NewKeyColumn(index_request, *index, idx);
1425
95.1k
      auto it = values.find(idx);
1426
95.1k
      if (it != values.end()) {
1427
92.0k
        *key_column->mutable_value() = std::move(it->second);
1428
92.0k
      }
1429
95.1k
    }
1430
1431
    // Setup the covering columns.
1432
54.5k
    for (size_t idx = index->key_column_count(); idx < index->columns().size(); idx++) {
1433
8.63k
      auto it = values.find(idx);
1434
8.63k
      if (it != values.end()) {
1435
8.59k
        const auto& index_column = index->column(idx);
1436
8.59k
        QLColumnValuePB* const covering_column = index_request->add_column_values();
1437
8.59k
        covering_column->set_column_id(index_column.column_id);
1438
8.59k
        *covering_column->mutable_expr()->mutable_value() = std::move(it->second);
1439
8.59k
      }
1440
8.63k
    }
1441
1442
45.8k
    return index_request;
1443
45.8k
  }
1444
1445
11.1k
  return nullptr; // The index updating was skipped.
1446
11.1k
}
1447
1448
Status QLReadOperation::Execute(const YQLStorageIf& ql_storage,
1449
                                CoarseTimePoint deadline,
1450
                                const ReadHybridTime& read_time,
1451
                                const Schema& schema,
1452
                                const Schema& projection,
1453
                                QLResultSet* resultset,
1454
3.92M
                                HybridTime* restart_read_ht) {
1455
3.92M
  SimulateTimeoutIfTesting(&deadline);
1456
3.92M
  size_t row_count_limit = std::numeric_limits<std::size_t>::max();
1457
3.92M
  size_t num_rows_skipped = 0;
1458
3.92M
  size_t offset = 0;
1459
3.92M
  if (request_.has_offset()) {
1460
1.70k
    offset = request_.offset();
1461
1.70k
  }
1462
3.92M
  if (request_.has_limit()) {
1463
3.64M
    if (request_.limit() == 0) {
1464
0
      return Status::OK();
1465
0
    }
1466
3.64M
    row_count_limit = request_.limit();
1467
3.64M
  }
1468
1469
  // Create the projections of the non-key columns selected by the row block plus any referenced in
1470
  // the WHERE condition. When DocRowwiseIterator::NextRow() populates the value map, it uses this
1471
  // projection only to scan sub-documents. The query schema is used to select only referenced
1472
  // columns and key columns.
1473
3.92M
  Schema static_projection, non_static_projection;
1474
3.92M
  RETURN_NOT_OK(CreateProjections(schema, request_.column_refs(),
1475
3.92M
                                  &static_projection, &non_static_projection));
1476
3.92M
  const bool read_static_columns = !static_projection.columns().empty();
1477
3.92M
  const bool read_distinct_columns = request_.distinct();
1478
1479
3.92M
  std::unique_ptr<YQLRowwiseIteratorIf> iter;
1480
3.92M
  std::unique_ptr<QLScanSpec> spec, static_row_spec;
1481
3.92M
  RETURN_NOT_OK(ql_storage.BuildYQLScanSpec(
1482
3.92M
      request_, read_time, schema, read_static_columns, static_projection, &spec,
1483
3.92M
      &static_row_spec));
1484
3.92M
  RETURN_NOT_OK(ql_storage.GetIterator(request_, projection, schema, txn_op_context_,
1485
3.92M
                                       deadline, read_time, *spec, &iter));
1486
3.92M
  VTRACE(1, "Initialized iterator");
1487
1488
3.92M
  QLTableRow static_row;
1489
3.92M
  QLTableRow non_static_row;
1490
3.92M
  QLTableRow& selected_row = read_distinct_columns ? static_row : non_static_row;
1491
1492
  // In case when we are continuing a select with a paging state, or when using a reverse scan,
1493
  // the static columns for the next row to fetch are not included in the first iterator and we
1494
  // need to fetch them with a separate spec and iterator before beginning the normal fetch below.
1495
3.92M
  if (static_row_spec != nullptr) {
1496
3
    std::unique_ptr<YQLRowwiseIteratorIf> static_row_iter;
1497
3
    RETURN_NOT_OK(ql_storage.GetIterator(
1498
3
        request_, static_projection, schema, txn_op_context_, deadline, read_time,
1499
3
        *static_row_spec, &static_row_iter));
1500
3
    if (VERIFY_RESULT(static_row_iter->HasNext())) {
1501
3
      RETURN_NOT_OK(static_row_iter->NextRow(&static_row));
1502
3
    }
1503
3
  }
1504
1505
  // Begin the normal fetch.
1506
3.92M
  int match_count = 0;
1507
3.92M
  bool static_dealt_with = true;
1508
11.5M
  while (resultset->rsrow_count() < row_count_limit && VERIFY_RESULT(iter->HasNext())) {
1509
7.61M
    const bool last_read_static = iter->IsNextStaticColumn();
1510
1511
    // Note that static columns are sorted before non-static columns in DocDB as follows. This is
1512
    // because "<empty_range_components>" is empty and terminated by kGroupEnd which sorts before
1513
    // all other ValueType characters in a non-empty range component.
1514
    //   <hash_code><hash_components><empty_range_components><static_column_id> -> value;
1515
    //   <hash_code><hash_components><range_components><non_static_column_id> -> value;
1516
7.61M
    if (last_read_static) {
1517
162
      static_row.Clear();
1518
162
      RETURN_NOT_OK(iter->NextRow(static_projection, &static_row));
1519
7.61M
    } else { // Reading a regular row that contains non-static columns.
1520
      // Read this regular row.
1521
      // TODO(omer): this is quite inefficient if read_distinct_column. A better way to do this
1522
      // would be to only read the first non-static column for each hash key, and skip the rest
1523
7.61M
      non_static_row.Clear();
1524
7.61M
      RETURN_NOT_OK(iter->NextRow(non_static_projection, &non_static_row));
1525
7.61M
    }
1526
1527
    // We have two possible cases: whether we use distinct or not
1528
    // If we use distinct, then in general we only need to add the static rows
1529
    // However, we might have to add non-static rows, if there is no static row corresponding to
1530
    // it. Of course, we add one entry per hash key in non-static row.
1531
    // If we do not use distinct, we are generally only adding non-static rows
1532
    // However, if there is no non-static row for the static row, we have to add it.
1533
7.61M
    if (read_distinct_columns) {
1534
126
      bool join_successful = false;
1535
126
      if (!last_read_static) {
1536
78
        join_successful = JoinNonStaticRow(schema, static_projection, non_static_row, &static_row);
1537
78
      }
1538
1539
      // If the join was not successful, it means that the non-static row we read has no
1540
      // corresponding static row, so we have to add it to the result
1541
126
      if (!join_successful) {
1542
58
        RETURN_NOT_OK(AddRowToResult(
1543
58
            spec, static_row, row_count_limit, offset, resultset, &match_count, &num_rows_skipped));
1544
58
      }
1545
7.61M
    } else {
1546
7.61M
      if (last_read_static) {
1547
        // If the next row to be read is not static, deal with it later, as we do not know whether
1548
        // the non-static row corresponds to this static row; if the non-static row doesn't
1549
        // correspond to this static row, we will have to add it later, so set static_dealt_with to
1550
        // false
1551
114
        if (VERIFY_RESULT(iter->HasNext()) && !iter->IsNextStaticColumn()) {
1552
91
          static_dealt_with = false;
1553
91
          continue;
1554
91
        }
1555
1556
23
        AddProjection(non_static_projection, &static_row);
1557
23
        RETURN_NOT_OK(AddRowToResult(spec, static_row, row_count_limit, offset, resultset,
1558
23
                                     &match_count, &num_rows_skipped));
1559
7.61M
      } else {
1560
        // We also have to do the join if we are not reading any static columns, as Cassandra
1561
        // reports nulls for static rows with no corresponding non-static row
1562
7.61M
        if (read_static_columns || !static_dealt_with) {
1563
353
          const bool join_successful = JoinStaticRow(schema,
1564
353
                                               static_projection,
1565
353
                                               static_row,
1566
353
                                               &non_static_row);
1567
          // Add the static row if the join was not successful and it is the first time we are
1568
          // dealing with this static row
1569
353
          if (!join_successful && !static_dealt_with) {
1570
0
            AddProjection(non_static_projection, &static_row);
1571
0
            RETURN_NOT_OK(AddRowToResult(
1572
0
                spec, static_row, row_count_limit, offset, resultset, &match_count,
1573
0
                &num_rows_skipped));
1574
0
          }
1575
353
        }
1576
7.61M
        static_dealt_with = true;
1577
7.61M
        RETURN_NOT_OK(AddRowToResult(
1578
7.61M
            spec, non_static_row, row_count_limit, offset, resultset, &match_count,
1579
7.61M
            &num_rows_skipped));
1580
7.61M
      }
1581
7.61M
    }
1582
7.61M
  }
1583
1584
3.92M
  if (request_.is_aggregate() && match_count > 0) {
1585
161
    RETURN_NOT_OK(PopulateAggregate(selected_row, resultset));
1586
161
  }
1587
1588
3.92M
  VTRACE(1, "Fetched $0 rows.", resultset->rsrow_count());
1589
1590
3.92M
  RETURN_NOT_OK(SetPagingStateIfNecessary(
1591
3.92M
      iter.get(), resultset, row_count_limit, num_rows_skipped, read_time));
1592
1593
  // SetPagingStateIfNecessary could perform read, so we assign restart_read_ht after it.
1594
3.92M
  *restart_read_ht = iter->RestartReadHt();
1595
1596
3.92M
  return Status::OK();
1597
3.92M
}
1598
1599
Status QLReadOperation::SetPagingStateIfNecessary(const YQLRowwiseIteratorIf* iter,
1600
                                                  const QLResultSet* resultset,
1601
                                                  const size_t row_count_limit,
1602
                                                  const size_t num_rows_skipped,
1603
3.92M
                                                  const ReadHybridTime& read_time) {
1604
3.92M
  if ((resultset->rsrow_count() >= row_count_limit || request_.has_offset()) &&
1605
3.60M
      !request_.is_aggregate()) {
1606
3.60M
    SubDocKey next_row_key;
1607
3.60M
    RETURN_NOT_OK(iter->GetNextReadSubDocKey(&next_row_key));
1608
    // When the "limit" number of rows are returned and we are asked to return the paging state,
1609
    // return the partition key and row key of the next row to read in the paging state if there are
1610
    // still more rows to read. Otherwise, leave the paging state empty which means we are done
1611
    // reading from this tablet.
1612
3.60M
    if (request_.return_paging_state()) {
1613
2.83k
      if (!next_row_key.doc_key().empty()) {
1614
1.20k
        QLPagingStatePB* paging_state = response_.mutable_paging_state();
1615
1.20k
        paging_state->set_next_partition_key(
1616
1.20k
            PartitionSchema::EncodeMultiColumnHashValue(next_row_key.doc_key().hash()));
1617
1.20k
        paging_state->set_next_row_key(next_row_key.Encode().ToStringBuffer());
1618
1.20k
        paging_state->set_total_rows_skipped(request_.paging_state().total_rows_skipped() +
1619
1.20k
            num_rows_skipped);
1620
1.62k
      } else if (request_.has_offset()) {
1621
1.61k
        QLPagingStatePB* paging_state = response_.mutable_paging_state();
1622
1.61k
        paging_state->set_total_rows_skipped(request_.paging_state().total_rows_skipped() +
1623
1.61k
            num_rows_skipped);
1624
1.61k
      }
1625
2.83k
    }
1626
3.60M
    if (response_.has_paging_state()) {
1627
2.82k
      if (FLAGS_ycql_consistent_transactional_paging) {
1628
0
        read_time.AddToPB(response_.mutable_paging_state());
1629
2.82k
      } else {
1630
        // Using SingleTime will help avoid read restarts on second page and later but will
1631
        // potentially produce stale results on those pages.
1632
2.82k
        auto per_row_consistent_read_time = ReadHybridTime::SingleTime(read_time.read);
1633
2.82k
        per_row_consistent_read_time.AddToPB(response_.mutable_paging_state());
1634
2.82k
      }
1635
2.82k
    }
1636
3.60M
  }
1637
1638
3.92M
  return Status::OK();
1639
3.92M
}
1640
1641
0
Status QLReadOperation::GetIntents(const Schema& schema, KeyValueWriteBatchPB* out) {
1642
0
  std::vector<PrimitiveValue> hashed_components;
1643
0
  RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues(
1644
0
      request_.hashed_column_values(), schema, 0, schema.num_hash_key_columns(),
1645
0
      &hashed_components));
1646
0
  auto pair = out->mutable_read_pairs()->Add();
1647
0
  if (hashed_components.empty()) {
1648
    // Empty hashed components mean that we don't have primary key at all, but request
1649
    // could still contain hash_code as part of tablet routing.
1650
    // So we should ignore it.
1651
0
    pair->set_key(std::string(1, ValueTypeAsChar::kGroupEnd));
1652
0
  } else {
1653
0
    DocKey doc_key(request_.hash_code(), hashed_components);
1654
0
    pair->set_key(doc_key.Encode().ToStringBuffer());
1655
0
  }
1656
0
  pair->set_value(std::string(1, ValueTypeAsChar::kNullLow));
1657
  // Wait policies make sense only for YSQL to support different modes like waiting, erroring out
1658
  // or skipping on intent conflict. YCQL behaviour matches WAIT_ERROR (see proto for details).
1659
0
  out->set_wait_policy(WAIT_ERROR);
1660
0
  return Status::OK();
1661
0
}
1662
1663
Status QLReadOperation::PopulateResultSet(const std::unique_ptr<QLScanSpec>& spec,
1664
                                          const QLTableRow& table_row,
1665
7.38M
                                          QLResultSet *resultset) {
1666
7.38M
  resultset->AllocateRow();
1667
7.38M
  int rscol_index = 0;
1668
34.8M
  for (const QLExpressionPB& expr : request_.selected_exprs()) {
1669
34.8M
    QLExprResult value;
1670
34.8M
    RETURN_NOT_OK(EvalExpr(expr, table_row, value.Writer(), spec->schema()));
1671
34.8M
    resultset->AppendColumn(rscol_index, value.Value());
1672
34.8M
    rscol_index++;
1673
34.8M
  }
1674
1675
7.38M
  return Status::OK();
1676
7.38M
}
1677
1678
15.8k
Status QLReadOperation::EvalAggregate(const QLTableRow& table_row) {
1679
15.8k
  if (aggr_result_.empty()) {
1680
161
    int column_count = request_.selected_exprs().size();
1681
161
    aggr_result_.resize(column_count);
1682
161
  }
1683
1684
15.8k
  int aggr_index = 0;
1685
16.7k
  for (const QLExpressionPB& expr : request_.selected_exprs()) {
1686
16.7k
    RETURN_NOT_OK(EvalExpr(expr, table_row, aggr_result_[aggr_index++].Writer()));
1687
16.7k
  }
1688
15.8k
  return Status::OK();
1689
15.8k
}
1690
1691
161
Status QLReadOperation::PopulateAggregate(const QLTableRow& table_row, QLResultSet *resultset) {
1692
161
  resultset->AllocateRow();
1693
161
  int column_count = request_.selected_exprs().size();
1694
552
  for (int rscol_index = 0; rscol_index < column_count; rscol_index++) {
1695
391
    resultset->AppendColumn(rscol_index, aggr_result_[rscol_index].Value());
1696
391
  }
1697
161
  return Status::OK();
1698
161
}
1699
1700
Status QLReadOperation::AddRowToResult(const std::unique_ptr<QLScanSpec>& spec,
1701
                                       const QLTableRow& row,
1702
                                       const size_t row_count_limit,
1703
                                       const size_t offset,
1704
                                       QLResultSet* resultset,
1705
                                       int* match_count,
1706
7.60M
                                       size_t *num_rows_skipped) {
1707
1.74k
  VLOG(3) << __FUNCTION__ << " : " << yb::ToString(row);
1708
7.60M
  if (resultset->rsrow_count() < row_count_limit) {
1709
7.58M
    bool match = false;
1710
7.58M
    RETURN_NOT_OK(spec->Match(row, &match));
1711
7.58M
    if (match) {
1712
7.39M
      if (*num_rows_skipped >= offset) {
1713
7.39M
        (*match_count)++;
1714
7.39M
        if (request_.is_aggregate()) {
1715
15.8k
          RETURN_NOT_OK(EvalAggregate(row));
1716
7.38M
        } else {
1717
7.38M
          RETURN_NOT_OK(PopulateResultSet(spec, row, resultset));
1718
7.38M
        }
1719
572
      } else {
1720
572
        (*num_rows_skipped)++;
1721
572
      }
1722
7.39M
    }
1723
7.58M
  }
1724
7.60M
  return Status::OK();
1725
7.60M
}
1726
1727
}  // namespace docdb
1728
}  // namespace yb