YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/pgsql_operation.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/pgsql_operation.h"
15
16
#include <limits>
17
#include <string>
18
#include <unordered_set>
19
#include <vector>
20
21
#include <boost/optional/optional_io.hpp>
22
23
#include "yb/common/partition.h"
24
#include "yb/common/pg_system_attr.h"
25
#include "yb/common/ql_value.h"
26
27
#include "yb/docdb/doc_path.h"
28
#include "yb/docdb/doc_pg_expr.h"
29
#include "yb/docdb/doc_pgsql_scanspec.h"
30
#include "yb/docdb/doc_rowwise_iterator.h"
31
#include "yb/docdb/doc_write_batch.h"
32
#include "yb/docdb/docdb.pb.h"
33
#include "yb/docdb/docdb_debug.h"
34
#include "yb/docdb/docdb_pgapi.h"
35
#include "yb/docdb/docdb_rocksdb_util.h"
36
#include "yb/docdb/intent_aware_iterator.h"
37
#include "yb/docdb/primitive_value_util.h"
38
#include "yb/docdb/ql_storage_interface.h"
39
40
#include "yb/util/flag_tags.h"
41
#include "yb/util/result.h"
42
#include "yb/util/scope_exit.h"
43
#include "yb/util/status_format.h"
44
#include "yb/util/trace.h"
45
46
#include "yb/yql/pggate/util/pg_doc_data.h"
47
48
using namespace std::literals;
49
50
DECLARE_bool(ysql_disable_index_backfill);
51
52
DEFINE_double(ysql_scan_timeout_multiplier, 0.5,
53
              "DEPRECATED. Has no affect, use ysql_scan_deadline_margin_ms to control the client "
54
              "timeout");
55
56
DEFINE_uint64(ysql_scan_deadline_margin_ms, 1000,
57
              "Scan deadline is calculated by adding client timeout to the time when the request "
58
              "was received. It defines the moment in time when client has definitely timed out "
59
              "and if the request is yet in processing after the deadline, it can be canceled. "
60
              "Therefore to prevent client timeout, the request handler should return partial "
61
              "result and paging information some time before the deadline. That's what the "
62
              "ysql_scan_deadline_margin_ms is for. It should account for network and processing "
63
              "delays.");
64
65
DEFINE_bool(pgsql_consistent_transactional_paging, true,
66
            "Whether to enforce consistency of data returned for second page and beyond for YSQL "
67
            "queries on transactional tables. If true, read restart errors could be returned to "
68
            "prevent inconsistency. If false, no read restart errors are returned but the data may "
69
            "be stale. The latter is preferable for long scans. The data returned for the first "
70
            "page of results is never stale regardless of this flag.");
71
72
DEFINE_test_flag(int32, slowdown_pgsql_aggregate_read_ms, 0,
73
                 "If set > 0, slows down the response to pgsql aggregate read by this amount.");
74
75
namespace yb {
76
namespace docdb {
77
78
namespace {
79
80
// Compatibility: accept column references from a legacy nodes as a list of column ids only
81
CHECKED_STATUS CreateProjection(const Schema& schema,
82
                                const PgsqlColumnRefsPB& column_refs,
83
2.03M
                                Schema* projection) {
84
  // Create projection of non-primary key columns. Primary key columns are implicitly read by DocDB.
85
  // It will also sort the columns before scanning.
86
2.03M
  vector<ColumnId> column_ids;
87
2.03M
  column_ids.reserve(column_refs.ids_size());
88
403k
  for (int32_t id : column_refs.ids()) {
89
403k
    const ColumnId column_id(id);
90
403k
    if (!schema.is_key_column(column_id)) {
91
357k
      column_ids.emplace_back(column_id);
92
357k
    }
93
403k
  }
94
2.03M
  return schema.CreateProjectionByIdsIgnoreMissing(column_ids, projection);
95
2.03M
}
96
97
CHECKED_STATUS CreateProjection(
98
    const Schema& schema,
99
    const google::protobuf::RepeatedPtrField<PgsqlColRefPB> &column_refs,
100
1.49M
    Schema* projection) {
101
1.49M
  vector<ColumnId> column_ids;
102
1.49M
  column_ids.reserve(column_refs.size());
103
9.87M
  for (const PgsqlColRefPB& column_ref : column_refs) {
104
9.87M
    const ColumnId column_id(column_ref.column_id());
105
9.87M
    if (!schema.is_key_column(column_id)) {
106
7.40M
      column_ids.emplace_back(column_id);
107
7.40M
    }
108
9.87M
  }
109
1.49M
  return schema.CreateProjectionByIdsIgnoreMissing(column_ids, projection);
110
1.49M
}
111
112
1.43M
void AddIntent(const std::string& encoded_key, WaitPolicy wait_policy, KeyValueWriteBatchPB *out) {
113
1.43M
  auto pair = out->mutable_read_pairs()->Add();
114
1.43M
  pair->set_key(encoded_key);
115
1.43M
  pair->set_value(std::string(1, ValueTypeAsChar::kNullLow));
116
  // Since we don't batch read RPCs that lock rows, we can get away with using a singular
117
  // wait_policy field. Once we start batching read requests (issue #2495), we will need a repeated
118
  // wait policies field.
119
1.43M
  out->set_wait_policy(wait_policy);
120
1.43M
}
121
122
CHECKED_STATUS AddIntent(const PgsqlExpressionPB& ybctid, WaitPolicy wait_policy,
123
100k
                         KeyValueWriteBatchPB* out) {
124
100k
  const auto &val = ybctid.value().binary_value();
125
100k
  SCHECK(!val.empty(), InternalError, "empty ybctid");
126
100k
  AddIntent(val, wait_policy, out);
127
100k
  return Status::OK();
128
100k
}
129
130
template<class R, class Request, class DocKeyProcessor, class EncodedDocKeyProcessor>
131
Result<R> FetchDocKeyImpl(const Schema& schema,
132
                          const Request& req,
133
                          const DocKeyProcessor& dk_processor,
134
4.44M
                          const EncodedDocKeyProcessor& edk_processor) {
135
  // Init DocDB key using either ybctid or partition and range values.
136
4.44M
  if (req.has_ybctid_column_value()) {
137
1.78M
    const auto& ybctid = req.ybctid_column_value().value().binary_value();
138
1.78M
    SCHECK(!ybctid.empty(), InternalError, "empty ybctid");
139
1.78M
    return edk_processor(ybctid);
140
2.65M
  } else {
141
2.65M
    auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
142
2.65M
        req.partition_column_values(), schema, 0 /* start_idx */));
143
2.65M
    auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
144
2.65M
        req.range_column_values(), schema, schema.num_hash_key_columns()));
145
2.65M
    return dk_processor(hashed_components.empty()
146
1.18M
        ? DocKey(schema, std::move(range_components))
147
1.46M
        : DocKey(
148
1.46M
            schema, req.hash_code(), std::move(hashed_components), std::move(range_components)));
149
2.65M
  }
150
4.44M
}
pgsql_operation.cc:_ZN2yb5docdb12_GLOBAL__N_115FetchDocKeyImplINS0_6DocKeyENS_19PgsqlWriteRequestPBEZNS1_11FetchDocKeyERKNS_6SchemaERKS4_E3$_4ZNS1_11FetchDocKeyES7_S9_E3$_5EENS_6ResultIT_EES7_RKT0_RKT1_RKT2_
Line
Count
Source
134
3.11M
                          const EncodedDocKeyProcessor& edk_processor) {
135
  // Init DocDB key using either ybctid or partition and range values.
136
3.11M
  if (req.has_ybctid_column_value()) {
137
1.77M
    const auto& ybctid = req.ybctid_column_value().value().binary_value();
138
1.77M
    SCHECK(!ybctid.empty(), InternalError, "empty ybctid");
139
1.77M
    return edk_processor(ybctid);
140
1.33M
  } else {
141
1.33M
    auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
142
1.33M
        req.partition_column_values(), schema, 0 /* start_idx */));
143
1.33M
    auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
144
1.33M
        req.range_column_values(), schema, schema.num_hash_key_columns()));
145
1.33M
    return dk_processor(hashed_components.empty()
146
1.18M
        ? DocKey(schema, std::move(range_components))
147
146k
        : DocKey(
148
146k
            schema, req.hash_code(), std::move(hashed_components), std::move(range_components)));
149
1.33M
  }
150
3.11M
}
pgsql_operation.cc:_ZN2yb5docdb12_GLOBAL__N_115FetchDocKeyImplINSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEENS_18PgsqlReadRequestPBEZNS1_18FetchEncodedDocKeyERKNS_6SchemaERKSA_E3$_3ZNS1_18FetchEncodedDocKeyESD_SF_E3$_2EENS_6ResultIT_EESD_RKT0_RKT1_RKT2_
Line
Count
Source
134
1.33M
                          const EncodedDocKeyProcessor& edk_processor) {
135
  // Init DocDB key using either ybctid or partition and range values.
136
1.33M
  if (req.has_ybctid_column_value()) {
137
6.84k
    const auto& ybctid = req.ybctid_column_value().value().binary_value();
138
6.84k
    SCHECK(!ybctid.empty(), InternalError, "empty ybctid");
139
6.84k
    return edk_processor(ybctid);
140
1.32M
  } else {
141
1.32M
    auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
142
1.32M
        req.partition_column_values(), schema, 0 /* start_idx */));
143
1.32M
    auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
144
1.32M
        req.range_column_values(), schema, schema.num_hash_key_columns()));
145
1.32M
    return dk_processor(hashed_components.empty()
146
705
        ? DocKey(schema, std::move(range_components))
147
1.32M
        : DocKey(
148
1.32M
            schema, req.hash_code(), std::move(hashed_components), std::move(range_components)));
149
1.32M
  }
150
1.33M
}
151
152
1.33M
Result<string> FetchEncodedDocKey(const Schema& schema, const PgsqlReadRequestPB& request) {
153
1.33M
  return FetchDocKeyImpl<string>(
154
1.33M
      schema, request,
155
1.32M
      [](const auto& doc_key) { return doc_key.Encode().ToStringBuffer(); },
156
6.84k
      [](const auto& encoded_doc_key) { return encoded_doc_key; });
157
1.33M
}
158
159
3.11M
Result<DocKey> FetchDocKey(const Schema& schema, const PgsqlWriteRequestPB& request) {
160
3.11M
  return FetchDocKeyImpl<DocKey>(
161
3.11M
      schema, request,
162
1.33M
      [](const auto& doc_key) { return doc_key; },
163
1.77M
      [&schema](const auto& encoded_doc_key) -> Result<DocKey> {
164
1.77M
        DocKey key(schema);
165
1.77M
        RETURN_NOT_OK(key.DecodeFrom(encoded_doc_key));
166
1.77M
        return key;
167
1.77M
      });
168
3.11M
}
169
170
Result<YQLRowwiseIteratorIf::UniPtr> CreateIterator(
171
    const YQLStorageIf& ql_storage,
172
    const PgsqlReadRequestPB& request,
173
    const Schema& projection,
174
    const Schema& schema,
175
    const TransactionOperationContext& txn_op_context,
176
    CoarseTimePoint deadline,
177
    const ReadHybridTime& read_time,
178
1.64M
    bool is_explicit_request_read_time) {
179
1.13k
  VLOG_IF(2, request.is_for_backfill()) << "Creating iterator for " << yb::ToString(request);
180
181
1.64M
  YQLRowwiseIteratorIf::UniPtr result;
182
  // TODO(neil) Remove the following IF block when it is completely obsolete.
183
  // The following IF block has not been used since 2.1 release.
184
  // We keep it here only for rolling upgrade purpose.
185
1.64M
  if (request.has_ybctid_column_value()) {
186
4.73k
    SCHECK(!request.has_paging_state(),
187
4.73k
           InternalError,
188
4.73k
           "Each ybctid value identifies one row in the table while paging state "
189
4.73k
           "is only used for multi-row queries.");
190
4.73k
    RETURN_NOT_OK(ql_storage.GetIterator(
191
4.73k
        request.stmt_id(), projection, schema, txn_op_context,
192
4.73k
        deadline, read_time, request.ybctid_column_value().value(), &result));
193
1.64M
  } else {
194
1.64M
    SubDocKey start_sub_doc_key;
195
1.64M
    auto actual_read_time = read_time;
196
    // Decode the start SubDocKey from the paging state and set scan start key.
197
1.64M
    if (request.has_paging_state() &&
198
54.9k
        request.paging_state().has_next_row_key() &&
199
23.6k
        !request.paging_state().next_row_key().empty()) {
200
23.6k
      KeyBytes start_key_bytes(request.paging_state().next_row_key());
201
23.6k
      RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice()));
202
      // TODO(dmitry) Remove backward compatibility block when obsolete.
203
23.6k
      if (!is_explicit_request_read_time) {
204
0
        if (request.paging_state().has_read_time()) {
205
0
          actual_read_time = ReadHybridTime::FromPB(request.paging_state().read_time());
206
0
        } else {
207
0
          actual_read_time.read = start_sub_doc_key.hybrid_time();
208
0
        }
209
0
      }
210
1.61M
    } else if (request.is_for_backfill()) {
211
257
      RSTATUS_DCHECK(is_explicit_request_read_time, InvalidArgument,
212
257
                     "Backfill request should already be using explicit read times.");
213
257
      PgsqlBackfillSpecPB spec;
214
257
      spec.ParseFromString(a2b_hex(request.backfill_spec()));
215
257
      if (!spec.next_row_key().empty()) {
216
22
        KeyBytes start_key_bytes(spec.next_row_key());
217
22
        RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice()));
218
22
      }
219
257
    }
220
1.64M
    RETURN_NOT_OK(ql_storage.GetIterator(
221
1.64M
        request, projection, schema, txn_op_context,
222
1.64M
        deadline, read_time, start_sub_doc_key.doc_key(), &result));
223
1.64M
  }
224
1.64M
  return std::move(result);
225
1.64M
}
226
227
class DocKeyColumnPathBuilder {
228
 public:
229
  explicit DocKeyColumnPathBuilder(const RefCntPrefix& doc_key)
230
4.83M
      : doc_key_(doc_key.as_slice()) {
231
4.83M
  }
232
233
7.86M
  RefCntPrefix Build(ColumnIdRep column_id) {
234
7.86M
    buffer_.Clear();
235
7.86M
    buffer_.AppendValueType(ValueType::kColumnId);
236
7.86M
    buffer_.AppendColumnId(ColumnId(column_id));
237
7.86M
    RefCntBuffer path(doc_key_.size() + buffer_.size());
238
7.86M
    doc_key_.CopyTo(path.data());
239
7.86M
    buffer_.AsSlice().CopyTo(path.data() + doc_key_.size());
240
7.86M
    return path;
241
7.86M
  }
242
243
 private:
244
  Slice doc_key_;
245
  KeyBytes buffer_;
246
};
247
248
} // namespace
249
250
//--------------------------------------------------------------------------------------------------
251
252
3.11M
Status PgsqlWriteOperation::Init(PgsqlResponsePB* response) {
253
  // Initialize operation inputs.
254
3.11M
  response_ = response;
255
256
3.11M
  doc_key_ = VERIFY_RESULT(FetchDocKey(schema_, request_));
257
3.11M
  encoded_doc_key_ = doc_key_->EncodeAsRefCntPrefix();
258
259
3.11M
  return Status::OK();
260
3.11M
}
261
262
// Check if a duplicate value is inserted into a unique index.
263
5
Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue(const DocOperationApplyData& data) {
264
0
  VLOG(3) << "Looking for collisions in\n"
265
0
          << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db());
266
  // We need to check backwards only for backfilled entries.
267
5
  bool ret =
268
5
      VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kForward)) ||
269
4
      (request_.is_backfill() &&
270
4
       VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kBackward)));
271
5
  if (!ret) {
272
0
    VLOG(3) << "No collisions found";
273
4
  }
274
5
  return ret;
275
5
}
276
277
Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue(
278
9
    const DocOperationApplyData& data, Direction direction) {
279
0
  VLOG(2) << "Looking for collision while going " << yb::ToString(direction)
280
0
          << ". Trying to insert " << *doc_key_;
281
9
  auto requested_read_time = data.read_time;
282
9
  if (direction == Direction::kForward) {
283
5
    return HasDuplicateUniqueIndexValue(data, requested_read_time);
284
5
  }
285
286
4
  auto iter = CreateIntentAwareIterator(
287
4
      data.doc_write_batch->doc_db(),
288
4
      BloomFilterMode::USE_BLOOM_FILTER,
289
4
      doc_key_->Encode().AsSlice(),
290
4
      rocksdb::kDefaultQueryId,
291
4
      txn_op_context_,
292
4
      data.deadline,
293
4
      ReadHybridTime::Max());
294
295
4
  HybridTime oldest_past_min_ht = VERIFY_RESULT(FindOldestOverwrittenTimestamp(
296
4
      iter.get(), SubDocKey(*doc_key_), requested_read_time.read));
297
4
  const HybridTime oldest_past_min_ht_liveness =
298
4
      VERIFY_RESULT(FindOldestOverwrittenTimestamp(
299
4
          iter.get(),
300
4
          SubDocKey(*doc_key_, PrimitiveValue::kLivenessColumn),
301
4
          requested_read_time.read));
302
4
  oldest_past_min_ht.MakeAtMost(oldest_past_min_ht_liveness);
303
4
  if (!oldest_past_min_ht.is_valid()) {
304
4
    return false;
305
4
  }
306
0
  return HasDuplicateUniqueIndexValue(
307
0
      data, ReadHybridTime::SingleTime(oldest_past_min_ht));
308
0
}
309
310
Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue(
311
5
    const DocOperationApplyData& data, ReadHybridTime read_time) {
312
  // Set up the iterator to read the current primary key associated with the index key.
313
5
  DocPgsqlScanSpec spec(schema_, request_.stmt_id(), *doc_key_);
314
5
  DocRowwiseIterator iterator(schema_,
315
5
                              schema_,
316
5
                              txn_op_context_,
317
5
                              data.doc_write_batch->doc_db(),
318
5
                              data.deadline,
319
5
                              read_time);
320
5
  RETURN_NOT_OK(iterator.Init(spec));
321
322
  // It is a duplicate value if the index key exists already and the index value (corresponding to
323
  // the indexed table's primary key) is not the same.
324
5
  if (!VERIFY_RESULT(iterator.HasNext())) {
325
0
    VLOG(2) << "No collision found while checking at " << yb::ToString(read_time);
326
4
    return false;
327
4
  }
328
329
1
  QLTableRow table_row;
330
1
  RETURN_NOT_OK(iterator.NextRow(&table_row));
331
1
  for (const auto& column_value : request_.column_values()) {
332
    // Get the column.
333
1
    if (!column_value.has_column_id()) {
334
0
      return STATUS(InternalError, "column id missing", column_value.DebugString());
335
0
    }
336
1
    const ColumnId column_id(column_value.column_id());
337
338
    // Check column-write operator.
339
0
    CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert)
340
0
      << "Illegal write instruction";
341
342
    // Evaluate column value.
343
1
    QLExprResult expr_result;
344
1
    RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer()));
345
346
1
    boost::optional<const QLValuePB&> existing_value = table_row.GetValue(column_id);
347
1
    const QLValuePB& new_value = expr_result.Value();
348
1
    if (existing_value && *existing_value != new_value) {
349
0
      VLOG(2) << "Found collision while checking at " << yb::ToString(read_time)
350
0
              << "\nExisting: " << yb::ToString(*existing_value)
351
0
              << " vs New: " << yb::ToString(new_value)
352
0
              << "\nUsed read time as " << yb::ToString(data.read_time);
353
0
      DVLOG(3) << "DocDB is now:\n"
354
0
               << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db());
355
1
      return true;
356
1
    }
357
1
  }
358
359
0
  VLOG(2) << "No collision while checking at " << yb::ToString(read_time);
360
0
  return false;
361
1
}
362
363
Result<HybridTime> PgsqlWriteOperation::FindOldestOverwrittenTimestamp(
364
    IntentAwareIterator* iter,
365
    const SubDocKey& sub_doc_key,
366
8
    HybridTime min_read_time) {
367
8
  HybridTime result;
368
0
  VLOG(3) << "Doing iter->Seek " << *doc_key_;
369
8
  iter->Seek(*doc_key_);
370
8
  if (iter->valid()) {
371
2
    const KeyBytes bytes = sub_doc_key.EncodeWithoutHt();
372
2
    const Slice& sub_key_slice = bytes.AsSlice();
373
2
    result = VERIFY_RESULT(
374
2
        iter->FindOldestRecord(sub_key_slice, min_read_time));
375
0
    VLOG(2) << "iter->FindOldestRecord returned " << result << " for "
376
0
            << SubDocKey::DebugSliceToString(sub_key_slice);
377
6
  } else {
378
0
    VLOG(3) << "iter->Seek " << *doc_key_ << " turned out to be invalid";
379
6
  }
380
8
  return result;
381
8
}
382
383
3.09M
Status PgsqlWriteOperation::Apply(const DocOperationApplyData& data) {
384
218
  VLOG(4) << "Write, read time: " << data.read_time << ", txn: " << txn_op_context_;
385
386
3.09M
  auto scope_exit = ScopeExit([this] {
387
3.09M
    if (!result_buffer_.empty()) {
388
3.08M
      NetworkByteOrder::Store64(result_buffer_.data(), result_rows_);
389
3.08M
    }
390
3.09M
  });
391
392
3.09M
  switch (request_.stmt_type()) {
393
1.61M
    case PgsqlWriteRequestPB::PGSQL_INSERT:
394
1.61M
      return ApplyInsert(data, IsUpsert::kFalse);
395
396
154k
    case PgsqlWriteRequestPB::PGSQL_UPDATE:
397
154k
      return ApplyUpdate(data);
398
399
116k
    case PgsqlWriteRequestPB::PGSQL_DELETE:
400
116k
      return ApplyDelete(data, request_.is_delete_persist_needed());
401
402
1.20M
    case PgsqlWriteRequestPB::PGSQL_UPSERT: {
403
      // Upserts should not have column refs (i.e. require read).
404
1.20M
      RSTATUS_DCHECK(request_.col_refs().empty(),
405
1.20M
              IllegalState,
406
1.20M
              "Upsert operation should not have column references");
407
1.20M
      return ApplyInsert(data, IsUpsert::kTrue);
408
0
    }
409
410
19
    case PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED:
411
19
      return ApplyTruncateColocated(data);
412
0
  }
413
0
  return Status::OK();
414
0
}
415
416
2.82M
Status PgsqlWriteOperation::ApplyInsert(const DocOperationApplyData& data, IsUpsert is_upsert) {
417
2.82M
  QLTableRow table_row;
418
2.82M
  if (!is_upsert) {
419
1.61M
    if (request_.is_backfill()) {
420
5
      if (VERIFY_RESULT(HasDuplicateUniqueIndexValue(data))) {
421
        // Unique index value conflict found.
422
1
        response_->set_status(PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR);
423
1
        response_->set_error_message("Duplicate key found in unique index");
424
1
        return Status::OK();
425
1
      }
426
1.61M
    } else {
427
      // Non-backfill requests shouldn't use HasDuplicateUniqueIndexValue because
428
      // - they should error even if the conflicting row matches
429
      // - retrieving and calculating whether the conflicting row matches is a waste
430
1.61M
      RETURN_NOT_OK(ReadColumns(data, &table_row));
431
1.61M
      if (!table_row.IsEmpty()) {
432
18.4E
        VLOG(4) << "Duplicate row: " << table_row.ToString();
433
        // Primary key or unique index value found.
434
1.02k
        response_->set_status(PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR);
435
1.02k
        response_->set_error_message("Duplicate key found in primary key or unique index");
436
1.02k
        return Status::OK();
437
1.02k
      }
438
2.81M
    }
439
1.61M
  }
440
441
2.81M
  RETURN_NOT_OK(data.doc_write_batch->SetPrimitive(
442
2.81M
      DocPath(encoded_doc_key_.as_slice(), PrimitiveValue::kLivenessColumn),
443
2.81M
      Value(PrimitiveValue()),
444
2.81M
      data.read_time, data.deadline, request_.stmt_id()));
445
446
10.0M
  for (const auto& column_value : request_.column_values()) {
447
    // Get the column.
448
10.0M
    if (!column_value.has_column_id()) {
449
0
      return STATUS(InternalError, "column id missing", column_value.DebugString());
450
0
    }
451
10.0M
    const ColumnId column_id(column_value.column_id());
452
10.0M
    const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id));
453
454
    // Check column-write operator.
455
432
    CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert)
456
432
      << "Illegal write instruction";
457
458
    // Evaluate column value.
459
10.0M
    QLExprResult expr_result;
460
10.0M
    RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer()));
461
10.0M
    const SubDocument sub_doc =
462
10.0M
        SubDocument::FromQLValuePB(expr_result.Value(), column.sorting_type());
463
464
    // Inserting into specified column.
465
10.0M
    DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id));
466
10.0M
    RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
467
10.0M
        sub_path, sub_doc, data.read_time, data.deadline, request_.stmt_id()));
468
10.0M
  }
469
470
2.81M
  RETURN_NOT_OK(PopulateResultSet(table_row));
471
472
2.81M
  response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK);
473
2.81M
  return Status::OK();
474
2.81M
}
475
476
154k
Status PgsqlWriteOperation::ApplyUpdate(const DocOperationApplyData& data) {
477
154k
  QLTableRow table_row;
478
154k
  RETURN_NOT_OK(ReadColumns(data, &table_row));
479
154k
  if (table_row.IsEmpty()) {
480
    // Row not found.
481
2
    response_->set_skipped(true);
482
2
    return Status::OK();
483
2
  }
484
154k
  QLTableRow returning_table_row;
485
154k
  if (request_.targets_size()) {
486
0
    returning_table_row = table_row;
487
0
  }
488
489
  // skipped is set to false if this operation produces some data to write.
490
154k
  bool skipped = true;
491
492
154k
  if (request_.has_ybctid_column_value()) {
493
154k
    DocPgExprExecutor expr_exec(&schema_);
494
154k
    std::vector<QLExprResult> results;
495
154k
    int num_exprs = 0;
496
154k
    int cur_expr = 0;
497
246k
    for (const auto& column_value : request_.column_new_values()) {
498
246k
      if (GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kPgEvalExprCall) {
499
5.74k
        RETURN_NOT_OK(expr_exec.AddTargetExpression(column_value.expr()));
500
0
        VLOG(1) << "Added target expression to the executor";
501
5.74k
        num_exprs++;
502
5.74k
      }
503
246k
    }
504
154k
    if (num_exprs > 0) {
505
3.92k
      bool match;
506
5.74k
      for (const PgsqlColRefPB& column_ref : request_.col_refs()) {
507
5.74k
        RETURN_NOT_OK(expr_exec.AddColumnRef(column_ref));
508
0
        VLOG(1) << "Added column reference to the executor";
509
5.74k
      }
510
3.92k
      results.resize(num_exprs);
511
3.92k
      RETURN_NOT_OK(expr_exec.Exec(table_row, &results, &match));
512
3.92k
    }
513
246k
    for (const auto& column_value : request_.column_new_values()) {
514
      // Get the column.
515
246k
      if (!column_value.has_column_id()) {
516
0
        return STATUS(InternalError, "column id missing", column_value.DebugString());
517
0
      }
518
246k
      const ColumnId column_id(column_value.column_id());
519
246k
      const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id));
520
      // Evaluate column value.
521
246k
      QLExprResult expr_result;
522
523
246k
      if (GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kPgEvalExprCall) {
524
5.74k
        expr_result = std::move(results[cur_expr++]);
525
240k
      } else {
526
        // Check column-write operator.
527
240k
        SCHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert,
528
240k
               InternalError,
529
240k
               "Unsupported DocDB Expression");
530
531
240k
        RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer(), &schema_));
532
240k
      }
533
534
      // Update RETURNING values
535
246k
      if (request_.targets_size()) {
536
0
        returning_table_row.AllocColumn(column_id, expr_result.Value());
537
0
      }
538
539
      // Inserting into specified column.
540
246k
      const SubDocument sub_doc =
541
246k
          SubDocument::FromQLValuePB(expr_result.Value(), column.sorting_type());
542
543
246k
      DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id));
544
246k
      RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
545
246k
          sub_path, sub_doc, data.read_time, data.deadline, request_.stmt_id()));
546
246k
      skipped = false;
547
246k
    }
548
33
  } else {
549
    // This UPDATE is calling PGGATE directly without going thru PostgreSQL layer.
550
    // Keep it here as we might need it.
551
552
    // Very limited support for where expressions. Only used for updates to the sequences data
553
    // table.
554
33
    bool is_match = true;
555
33
    if (request_.has_where_expr()) {
556
33
      QLExprResult match;
557
33
      RETURN_NOT_OK(EvalExpr(request_.where_expr(), table_row, match.Writer()));
558
33
      is_match = match.Value().bool_value();
559
33
    }
560
561
33
    if (is_match) {
562
66
      for (const auto &column_value : request_.column_new_values()) {
563
        // Get the column.
564
66
        if (!column_value.has_column_id()) {
565
0
          return STATUS(InternalError, "column id missing", column_value.DebugString());
566
0
        }
567
66
        const ColumnId column_id(column_value.column_id());
568
66
        const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id));
569
570
        // Check column-write operator.
571
0
        CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert)
572
0
        << "Illegal write instruction";
573
574
        // Evaluate column value.
575
66
        QLExprResult expr_result;
576
66
        RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer()));
577
578
66
        const SubDocument sub_doc =
579
66
            SubDocument::FromQLValuePB(expr_result.Value(), column.sorting_type());
580
581
        // Inserting into specified column.
582
66
        DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id));
583
66
        RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
584
66
            sub_path, sub_doc, data.read_time, data.deadline, request_.stmt_id()));
585
66
        skipped = false;
586
66
      }
587
33
    }
588
33
  }
589
590
154k
  if (request_.targets_size()) {
591
    // Returning the values after the update.
592
0
    RETURN_NOT_OK(PopulateResultSet(returning_table_row));
593
154k
  } else {
594
    // Returning the values before the update.
595
154k
    RETURN_NOT_OK(PopulateResultSet(table_row));
596
154k
  }
597
598
154k
  if (skipped) {
599
0
    response_->set_skipped(true);
600
0
  }
601
154k
  response_->set_rows_affected_count(1);
602
154k
  response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK);
603
154k
  return Status::OK();
604
154k
}
605
606
Status PgsqlWriteOperation::ApplyDelete(
607
    const DocOperationApplyData& data,
608
116k
    const bool is_persist_needed) {
609
116k
  int num_deleted = 1;
610
116k
  QLTableRow table_row;
611
116k
  RETURN_NOT_OK(ReadColumns(data, &table_row));
612
116k
  if (table_row.IsEmpty()) {
613
    // Row not found.
614
    // Return early unless we still want to apply the delete for backfill purposes.  Deletes to
615
    // nonexistent rows are expected to get written to the index when the index has the delete
616
    // permission during an online schema migration.  num_deleted should be 0 because we don't want
617
    // to report back to the user that we deleted 1 row; response_ should not set skipped because it
618
    // will prevent tombstone intents from getting applied.
619
15
    if (!is_persist_needed) {
620
15
      response_->set_skipped(true);
621
15
      return Status::OK();
622
15
    }
623
0
    num_deleted = 0;
624
0
  }
625
626
  // TODO(neil) Add support for WHERE clause.
627
4
  CHECK(request_.column_values_size() == 0) << "WHERE clause condition is not yet fully supported";
628
629
  // Otherwise, delete the referenced row (all columns).
630
116k
  RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(DocPath(
631
116k
      encoded_doc_key_.as_slice()), data.read_time, data.deadline));
632
633
116k
  RETURN_NOT_OK(PopulateResultSet(table_row));
634
635
116k
  response_->set_rows_affected_count(num_deleted);
636
116k
  response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK);
637
116k
  return Status::OK();
638
116k
}
639
640
19
Status PgsqlWriteOperation::ApplyTruncateColocated(const DocOperationApplyData& data) {
641
19
  RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(DocPath(
642
19
      encoded_doc_key_.as_slice()), data.read_time, data.deadline));
643
19
  response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK);
644
19
  return Status::OK();
645
19
}
646
647
Status PgsqlWriteOperation::ReadColumns(const DocOperationApplyData& data,
648
1.88M
                                        QLTableRow* table_row) {
649
  // Filter the columns using primary key.
650
1.88M
  if (doc_key_) {
651
1.88M
    Schema projection;
652
1.88M
    RETURN_NOT_OK(CreateProjection(schema_, request_.column_refs(), &projection));
653
1.88M
    DocPgsqlScanSpec spec(projection, request_.stmt_id(), *doc_key_);
654
1.88M
    DocRowwiseIterator iterator(projection,
655
1.88M
                                schema_,
656
1.88M
                                txn_op_context_,
657
1.88M
                                data.doc_write_batch->doc_db(),
658
1.88M
                                data.deadline,
659
1.88M
                                data.read_time);
660
1.88M
    RETURN_NOT_OK(iterator.Init(spec));
661
1.88M
    if (VERIFY_RESULT(iterator.HasNext())) {
662
271k
      RETURN_NOT_OK(iterator.NextRow(table_row));
663
1.61M
    } else {
664
1.61M
      table_row->Clear();
665
1.61M
    }
666
1.88M
    data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt());
667
1.88M
  }
668
669
1.88M
  return Status::OK();
670
1.88M
}
671
672
3.08M
Status PgsqlWriteOperation::PopulateResultSet(const QLTableRow& table_row) {
673
3.08M
  if (result_buffer_.empty()) {
674
    // Reserve space for num rows.
675
3.08M
    pggate::PgWire::WriteInt64(0, &result_buffer_);
676
3.08M
  }
677
3.08M
  ++result_rows_;
678
3.08M
  int rscol_index = 0;
679
0
  for (const PgsqlExpressionPB& expr : request_.targets()) {
680
0
    if (expr.has_column_id()) {
681
0
      QLExprResult value;
682
0
      if (expr.column_id() == static_cast<int>(PgSystemAttrNum::kYBTupleId)) {
683
        // Strip cotable id / pgtable id from the serialized DocKey before returning it as ybctid.
684
0
        Slice tuple_id = encoded_doc_key_.as_slice();
685
0
        if (tuple_id.starts_with(ValueTypeAsChar::kTableId)) {
686
0
          tuple_id.remove_prefix(1 + kUuidSize);
687
0
        } else if (tuple_id.starts_with(ValueTypeAsChar::kPgTableOid)) {
688
0
          tuple_id.remove_prefix(1 + sizeof(PgTableOid));
689
0
        }
690
0
        value.Writer().NewValue().set_binary_value(tuple_id.data(), tuple_id.size());
691
0
      } else {
692
0
        RETURN_NOT_OK(EvalExpr(expr, table_row, value.Writer()));
693
0
      }
694
0
      RETURN_NOT_OK(pggate::WriteColumn(value.Value(), &result_buffer_));
695
0
    }
696
0
    rscol_index++;
697
0
  }
698
3.08M
  return Status::OK();
699
3.08M
}
700
701
Status PgsqlWriteOperation::GetDocPaths(GetDocPathsMode mode,
702
                                        DocPathsToLock *paths,
703
5.27M
                                        IsolationLevel *level) const {
704
  // When this write operation requires a read, it requires a read snapshot so paths will be locked
705
  // in snapshot isolation for consistency. Otherwise, pure writes will happen in serializable
706
  // isolation so that they will serialize but do not conflict with one another.
707
  //
708
  // Currently, only keys that are being written are locked, no lock is taken on read at the
709
  // snapshot isolation level.
710
3.83M
  *level = RequireReadSnapshot() ? IsolationLevel::SNAPSHOT_ISOLATION
711
1.44M
                                 : IsolationLevel::SERIALIZABLE_ISOLATION;
712
713
5.27M
  switch (mode) {
714
3.14M
    case GetDocPathsMode::kLock: {
715
      // Weak intent is required to lock the row and prevent it from being removed.
716
      // For this purpose path for row's SystemColumnIds::kLivenessColumn column is returned.
717
      // The caller code will create strong intent for returned path (raw's column doc key)
718
      // and weak intents for all its prefixes (including row's doc key).
719
3.14M
      if (!encoded_doc_key_) {
720
0
        return Status::OK();
721
0
      }
722
3.14M
      if (request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) {
723
        // In case of UPDATE some columns may have expressions instead of exact value.
724
        // These expressions may read column value.
725
        // Potentially expression for updating column v1 may read value of column v2.
726
        //
727
        // UPDATE t SET v = v + 10 WHERE k = 1
728
        // UPDATE t SET v1 = v2 + 10 WHERE k = 1
729
        //
730
        // Strong intent for the whole row is required in this case as it may be too expensive to
731
        // determine what exact columns are read by the expression.
732
733
257k
        for (const auto& column_value : request_.column_new_values()) {
734
257k
          if (!column_value.expr().has_value()) {
735
3.92k
            paths->push_back(encoded_doc_key_);
736
3.92k
            return Status::OK();
737
3.92k
          }
738
257k
        }
739
166k
      }
740
3.13M
      DocKeyColumnPathBuilder builder(encoded_doc_key_);
741
3.13M
      paths->push_back(builder.Build(to_underlying(SystemColumnIds::kLivenessColumn)));
742
3.13M
      break;
743
3.14M
    }
744
2.12M
    case GetDocPathsMode::kIntents: {
745
2.12M
      const google::protobuf::RepeatedPtrField<PgsqlColumnValuePB>* column_values = nullptr;
746
2.12M
      if (request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_INSERT ||
747
1.84M
          request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPSERT) {
748
1.84M
        column_values = &request_.column_values();
749
282k
      } else if (request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) {
750
166k
        column_values = &request_.column_new_values();
751
166k
      }
752
753
2.12M
      if (column_values != nullptr && !column_values->empty()) {
754
1.69M
        DocKeyColumnPathBuilder builder(encoded_doc_key_);
755
4.72M
        for (const auto& column_value : *column_values) {
756
4.72M
          paths->push_back(builder.Build(column_value.column_id()));
757
4.72M
        }
758
436k
      } else if (encoded_doc_key_) {
759
436k
        paths->push_back(encoded_doc_key_);
760
436k
      }
761
2.12M
      break;
762
5.26M
    }
763
5.26M
  }
764
5.26M
  return Status::OK();
765
5.26M
}
766
767
//--------------------------------------------------------------------------------------------------
768
769
Result<size_t> PgsqlReadOperation::Execute(const YQLStorageIf& ql_storage,
770
                                           CoarseTimePoint deadline,
771
                                           const ReadHybridTime& read_time,
772
                                           bool is_explicit_request_read_time,
773
                                           const Schema& schema,
774
                                           const Schema *index_schema,
775
                                           faststring *result_buffer,
776
1.50M
                                           HybridTime *restart_read_ht) {
777
1.50M
  size_t fetched_rows = 0;
778
  // Reserve space for fetched rows count.
779
1.50M
  pggate::PgWire::WriteInt64(0, result_buffer);
780
1.49M
  auto se = ScopeExit([&fetched_rows, result_buffer] {
781
1.49M
    NetworkByteOrder::Store64(result_buffer->data(), fetched_rows);
782
1.49M
  });
783
405
  VLOG(4) << "Read, read time: " << read_time << ", txn: " << txn_op_context_;
784
785
  // Fetching data.
786
1.50M
  bool has_paging_state = false;
787
1.50M
  if (request_.batch_arguments_size() > 0) {
788
1.84k
    SCHECK(request_.has_ybctid_column_value(),
789
1.84k
           InternalError,
790
1.84k
           "ybctid arguments can be batched only");
791
1.84k
    fetched_rows = VERIFY_RESULT(ExecuteBatchYbctid(
792
1.84k
        ql_storage, deadline, read_time, schema,
793
1.84k
        result_buffer, restart_read_ht));
794
1.49M
  } else if (request_.has_sampling_state()) {
795
109
    fetched_rows = VERIFY_RESULT(ExecuteSample(
796
109
        ql_storage, deadline, read_time, is_explicit_request_read_time, schema,
797
109
        result_buffer, restart_read_ht, &has_paging_state));
798
1.49M
  } else {
799
1.49M
    fetched_rows = VERIFY_RESULT(ExecuteScalar(
800
1.49M
        ql_storage, deadline, read_time, is_explicit_request_read_time, schema, index_schema,
801
1.49M
        result_buffer, restart_read_ht, &has_paging_state));
802
1.49M
  }
803
804
1.50M
  VTRACE(1, "Fetched $0 rows. $1 paging state", fetched_rows, (has_paging_state ? "No" : "Has"));
805
1.50M
  *restart_read_ht = table_iter_->RestartReadHt();
806
1.50M
  return fetched_rows;
807
1.50M
}
808
809
Result<size_t> PgsqlReadOperation::ExecuteSample(const YQLStorageIf& ql_storage,
810
                                                 CoarseTimePoint deadline,
811
                                                 const ReadHybridTime& read_time,
812
                                                 bool is_explicit_request_read_time,
813
                                                 const Schema& schema,
814
                                                 faststring *result_buffer,
815
                                                 HybridTime *restart_read_ht,
816
109
                                                 bool *has_paging_state) {
817
109
  *has_paging_state = false;
818
109
  size_t scanned_rows = 0;
819
109
  PgsqlSamplingStatePB sampling_state = request_.sampling_state();
820
  // Requested total number of rows to collect
821
109
  int targrows = sampling_state.targrows();
822
  // Number of rows collected so far
823
109
  int numrows = sampling_state.numrows();
824
  // Total number of rows scanned
825
109
  double samplerows = sampling_state.samplerows();
826
  // Current number of rows to skip before collecting next one for sample
827
109
  double rowstoskip = sampling_state.rowstoskip();
828
  // Variables for the random numbers generator
829
109
  YbgPrepareMemoryContext();
830
109
  YbgReservoirState rstate = NULL;
831
109
  YbgSamplerCreate(sampling_state.rstate_w(), sampling_state.rand_state(), &rstate);
832
  // Buffer to hold selected row ids from the current page
833
109
  std::unique_ptr<QLValuePB[]> reservoir = std::make_unique<QLValuePB[]>(targrows);
834
  // Number of rows to scan for the current page.
835
  // Too low row count limit is inefficient since we have to allocate and initialize a reservoir
836
  // capable to hold potentially large (targrows) number of tuples. The row count has to be at least
837
  // targrows for a chance to fill up the reservoir. Actually, the algorithm selects targrows only
838
  // for very first page of the table, then it starts to skip tuples, the further it goes, the more
839
  // it skips. For a large enough table it eventually starts to select less than targrows per page,
840
  // regardless of the row_count_limit.
841
  // Anyways, double targrows seems like reasonable minimum for the row_count_limit.
842
109
  size_t row_count_limit = 2 * targrows;
843
109
  if (request_.has_limit() && request_.limit() > row_count_limit) {
844
0
    row_count_limit = request_.limit();
845
0
  }
846
  // Request is not supposed to contain any column refs, we just need the liveness column.
847
109
  Schema projection;
848
109
  RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection));
849
  // Request may carry paging state, CreateIterator takes care of positioning
850
109
  table_iter_ = VERIFY_RESULT(CreateIterator(
851
109
      ql_storage, request_, projection, schema, txn_op_context_,
852
109
      deadline, read_time, is_explicit_request_read_time));
853
109
  bool scan_time_exceeded = false;
854
109
  CoarseTimePoint stop_scan = deadline - FLAGS_ysql_scan_deadline_margin_ms * 1ms;
855
86.6k
  while (scanned_rows++ < row_count_limit &&
856
86.6k
         VERIFY_RESULT(table_iter_->HasNext()) &&
857
86.5k
         !scan_time_exceeded) {
858
86.5k
    if (numrows < targrows) {
859
      // Select first targrows of the table. If first partition(s) have less than that, next
860
      // partition starts to continue populating it's reservoir starting from the numrows' position:
861
      // the numrows, as well as other sampling state variables is returned and copied over to the
862
      // next sampling request
863
86.5k
      Slice ybctid = VERIFY_RESULT(table_iter_->GetTupleId());
864
86.5k
      reservoir[numrows++].set_binary_value(ybctid.data(), ybctid.size());
865
0
    } else {
866
      // At least targrows tuples have already been collected, now algorithm skips increasing number
867
      // of row before taking next one into the reservoir
868
0
      if (rowstoskip <= 0) {
869
        // Take ybctid of the current row
870
0
        Slice ybctid = VERIFY_RESULT(table_iter_->GetTupleId());
871
        // Pick random tuple in the reservoir to replace
872
0
        double rvalue;
873
0
        int k;
874
0
        YbgSamplerRandomFract(rstate, &rvalue);
875
0
        k = static_cast<int>(targrows * rvalue);
876
        // Overwrite previous value with new one
877
0
        reservoir[k].set_binary_value(ybctid.data(), ybctid.size());
878
        // Choose next number of rows to skip
879
0
        YbgReservoirGetNextS(rstate, samplerows, targrows, &rowstoskip);
880
0
      } else {
881
0
        rowstoskip -= 1;
882
0
      }
883
0
    }
884
    // Taking tuple ID does not advance the table iterator. Move it now.
885
86.5k
    table_iter_->SkipRow();
886
    // Check if we are running out of time
887
86.5k
    scan_time_exceeded = CoarseMonoClock::now() >= stop_scan;
888
86.5k
  }
889
  // Count live rows we have scanned TODO how to count dead rows?
890
109
  samplerows += (scanned_rows - 1);
891
  // Return collected tuples from the reservoir.
892
  // Tuples are returned as (index, ybctid) pairs, where index is in [0..targrows-1] range.
893
  // As mentioned above, for large tables reservoirs become increasingly sparse from page to page.
894
  // So we hope to save by sending variable number of index/ybctid pairs vs exactly targrows of
895
  // nullable ybctids. It also helps in case of extremely small table or partition.
896
109
  int fetched_rows = 0;
897
153k
  for (int i = 0; i < numrows; i++) {
898
153k
    QLValuePB index;
899
153k
    if (reservoir[i].has_binary_value()) {
900
86.5k
      index.set_int32_value(i);
901
86.5k
      RETURN_NOT_OK(pggate::WriteColumn(index, result_buffer));
902
86.5k
      RETURN_NOT_OK(pggate::WriteColumn(reservoir[i], result_buffer));
903
86.5k
      fetched_rows++;
904
86.5k
    }
905
153k
  }
906
907
  // Return sampling state to continue with next page
908
109
  PgsqlSamplingStatePB *new_sampling_state = response_.mutable_sampling_state();
909
109
  new_sampling_state->set_numrows(numrows);
910
109
  new_sampling_state->set_targrows(targrows);
911
109
  new_sampling_state->set_samplerows(samplerows);
912
109
  new_sampling_state->set_rowstoskip(rowstoskip);
913
109
  uint64_t randstate = 0;
914
109
  double rstate_w = 0;
915
109
  YbgSamplerGetState(rstate, &rstate_w, &randstate);
916
109
  new_sampling_state->set_rstate_w(rstate_w);
917
109
  new_sampling_state->set_rand_state(randstate);
918
109
  YbgDeleteMemoryContext();
919
920
  // Return paging state if scan has not been completed
921
109
  RETURN_NOT_OK(SetPagingStateIfNecessary(table_iter_.get(), scanned_rows, row_count_limit,
922
109
                                          scan_time_exceeded, &schema, read_time,
923
109
                                          has_paging_state));
924
109
  return fetched_rows;
925
109
}
926
927
Result<size_t> PgsqlReadOperation::ExecuteScalar(const YQLStorageIf& ql_storage,
928
                                                 CoarseTimePoint deadline,
929
                                                 const ReadHybridTime& read_time,
930
                                                 bool is_explicit_request_read_time,
931
                                                 const Schema& schema,
932
                                                 const Schema *index_schema,
933
                                                 faststring *result_buffer,
934
                                                 HybridTime *restart_read_ht,
935
1.49M
                                                 bool *has_paging_state) {
936
1.49M
  *has_paging_state = false;
937
938
1.49M
  size_t fetched_rows = 0;
939
1.49M
  size_t row_count_limit = std::numeric_limits<std::size_t>::max();
940
1.49M
  if (request_.has_limit()) {
941
1.49M
    if (request_.limit() == 0) {
942
0
      return fetched_rows;
943
0
    }
944
1.49M
    row_count_limit = request_.limit();
945
1.49M
  }
946
947
  // Create the projection of regular columns selected by the row block plus any referenced in
948
  // the WHERE condition. When DocRowwiseIterator::NextRow() populates the value map, it uses this
949
  // projection only to scan sub-documents. The query schema is used to select only referenced
950
  // columns and key columns.
951
1.49M
  Schema projection;
952
1.49M
  Schema index_projection;
953
1.49M
  YQLRowwiseIteratorIf *iter;
954
1.49M
  const Schema* scan_schema;
955
1.49M
  DocPgExprExecutor expr_exec(&schema);
956
957
1.49M
  if (!request_.col_refs().empty()) {
958
1.49M
    RETURN_NOT_OK(CreateProjection(schema, request_.col_refs(), &projection));
959
2.85k
  } else {
960
    // Compatibility: Either request indeed has no column refs, or it comes from a legacy node.
961
2.85k
    RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection));
962
2.85k
  }
963
1.49M
  table_iter_ = VERIFY_RESULT(CreateIterator(
964
1.49M
      ql_storage, request_, projection, schema, txn_op_context_,
965
1.49M
      deadline, read_time, is_explicit_request_read_time));
966
967
1.49M
  ColumnId ybbasectid_id;
968
1.49M
  if (request_.has_index_request()) {
969
151k
    const PgsqlReadRequestPB& index_request = request_.index_request();
970
151k
    RETURN_NOT_OK(CreateProjection(*index_schema, index_request.column_refs(), &index_projection));
971
151k
    index_iter_ = VERIFY_RESULT(CreateIterator(
972
151k
        ql_storage, index_request, index_projection, *index_schema, txn_op_context_,
973
151k
        deadline, read_time, is_explicit_request_read_time));
974
151k
    iter = index_iter_.get();
975
151k
    const auto idx = index_schema->find_column("ybidxbasectid");
976
151k
    SCHECK_NE(idx, Schema::kColumnNotFound, Corruption, "ybidxbasectid not found in index schema");
977
151k
    ybbasectid_id = index_schema->column_id(idx);
978
151k
    scan_schema = index_schema;
979
1.34M
  } else {
980
1.34M
    iter = table_iter_.get();
981
1.34M
    scan_schema = &schema;
982
8.02M
    for (const PgsqlColRefPB& column_ref : request_.col_refs()) {
983
8.02M
      RETURN_NOT_OK(expr_exec.AddColumnRef(column_ref));
984
885
      VLOG(1) << "Added column reference to the executor";
985
8.02M
    }
986
1.34M
    for (const PgsqlExpressionPB& expr : request_.where_clauses()) {
987
368
      RETURN_NOT_OK(expr_exec.AddWhereExpression(expr));
988
0
      VLOG(1) << "Added where expression to the executor";
989
368
    }
990
1.34M
  }
991
992
1.45k
  VLOG(1) << "Started iterator";
993
994
  // Set scan start time.
995
1.49M
  bool scan_time_exceeded = false;
996
1.49M
  CoarseTimePoint stop_scan = deadline - FLAGS_ysql_scan_deadline_margin_ms * 1ms;
997
998
  // Fetching data.
999
1.49M
  int match_count = 0;
1000
1.49M
  QLTableRow row;
1001
23.6M
  while (fetched_rows < row_count_limit && VERIFY_RESULT(iter->HasNext()) &&
1002
22.1M
         !scan_time_exceeded) {
1003
22.1M
    row.Clear();
1004
1005
    // If there is an index request, fetch ybbasectid from the index and use it as ybctid
1006
    // to fetch from the base table. Otherwise, fetch from the base table directly.
1007
22.1M
    if (request_.has_index_request()) {
1008
291k
      RETURN_NOT_OK(iter->NextRow(&row));
1009
291k
      const auto& tuple_id = row.GetValue(ybbasectid_id);
1010
291k
      SCHECK_NE(tuple_id, boost::none, Corruption, "ybbasectid not found in index row");
1011
291k
      if (!VERIFY_RESULT(table_iter_->SeekTuple(tuple_id->binary_value()))) {
1012
0
        DocKey doc_key;
1013
0
        RETURN_NOT_OK(doc_key.DecodeFrom(tuple_id->binary_value()));
1014
0
        return STATUS_FORMAT(Corruption, "ybctid $0 not found in indexed table", doc_key);
1015
291k
      }
1016
291k
      row.Clear();
1017
291k
      RETURN_NOT_OK(table_iter_->NextRow(projection, &row));
1018
21.8M
    } else {
1019
21.8M
      RETURN_NOT_OK(iter->NextRow(projection, &row));
1020
21.8M
    }
1021
1022
    // Match the row with the where condition before adding to the row block.
1023
22.1M
    bool is_match = true;
1024
22.1M
    RETURN_NOT_OK(expr_exec.Exec(row, nullptr, &is_match));
1025
22.1M
    if (is_match) {
1026
22.0M
      match_count++;
1027
22.0M
      if (request_.is_aggregate()) {
1028
1.71M
        RETURN_NOT_OK(EvalAggregate(row));
1029
20.3M
      } else {
1030
20.3M
        RETURN_NOT_OK(PopulateResultSet(row, result_buffer));
1031
20.3M
        ++fetched_rows;
1032
20.3M
      }
1033
22.0M
    }
1034
1035
    // Check if we are running out of time
1036
22.1M
    scan_time_exceeded = CoarseMonoClock::now() >= stop_scan;
1037
22.1M
  }
1038
1039
2.34k
  VLOG(1) << "Stopped iterator after " << match_count << " matches, "
1040
2.34k
          << fetched_rows << " rows fetched";
1041
1.71k
  VLOG(1) << "Deadline is " << (scan_time_exceeded ? "" : "not ") << "exceeded";
1042
1043
1.49M
  if (request_.is_aggregate() && match_count > 0) {
1044
345
    RETURN_NOT_OK(PopulateAggregate(row, result_buffer));
1045
345
    ++fetched_rows;
1046
345
  }
1047
1048
1.49M
  if (PREDICT_FALSE(FLAGS_TEST_slowdown_pgsql_aggregate_read_ms > 0) && request_.is_aggregate()) {
1049
418
    TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_pgsql_aggregate_read_ms);
1050
418
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_pgsql_aggregate_read_ms));
1051
418
  }
1052
1053
1.49M
  RETURN_NOT_OK(SetPagingStateIfNecessary(
1054
1.49M
      iter, fetched_rows, row_count_limit, scan_time_exceeded, scan_schema,
1055
1.49M
      read_time, has_paging_state));
1056
1.49M
  return fetched_rows;
1057
1.49M
}
1058
1059
Result<size_t> PgsqlReadOperation::ExecuteBatchYbctid(const YQLStorageIf& ql_storage,
1060
                                                      CoarseTimePoint deadline,
1061
                                                      const ReadHybridTime& read_time,
1062
                                                      const Schema& schema,
1063
                                                      faststring *result_buffer,
1064
1.84k
                                                      HybridTime *restart_read_ht) {
1065
1.84k
  Schema projection;
1066
1.84k
  RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection));
1067
1068
1.84k
  QLTableRow row;
1069
1.84k
  size_t row_count = 0;
1070
796k
  for (const PgsqlBatchArgumentPB& batch_argument : request_.batch_arguments()) {
1071
    // Get the row.
1072
796k
    RETURN_NOT_OK(ql_storage.GetIterator(request_.stmt_id(), projection, schema, txn_op_context_,
1073
796k
                                         deadline, read_time, batch_argument.ybctid().value(),
1074
796k
                                         &table_iter_));
1075
1076
796k
    if (VERIFY_RESULT(table_iter_->HasNext())) {
1077
796k
      row.Clear();
1078
796k
      RETURN_NOT_OK(table_iter_->NextRow(projection, &row));
1079
1080
      // Populate result set.
1081
796k
      RETURN_NOT_OK(PopulateResultSet(row, result_buffer));
1082
796k
      response_.add_batch_orders(batch_argument.order());
1083
796k
      row_count++;
1084
796k
    }
1085
796k
  }
1086
1087
  // Set status for this batch.
1088
  // Mark all rows were processed even in case some of the ybctids were not found.
1089
1.84k
  response_.set_batch_arg_count(request_.batch_arguments_size());
1090
1091
1.84k
  return row_count;
1092
1.84k
}
1093
1094
Status PgsqlReadOperation::SetPagingStateIfNecessary(const YQLRowwiseIteratorIf* iter,
1095
                                                     size_t fetched_rows,
1096
                                                     const size_t row_count_limit,
1097
                                                     const bool scan_time_exceeded,
1098
                                                     const Schema* schema,
1099
                                                     const ReadHybridTime& read_time,
1100
1.49M
                                                     bool *has_paging_state) {
1101
1.49M
  *has_paging_state = false;
1102
1.49M
  if (!request_.return_paging_state()) {
1103
69
    return Status::OK();
1104
69
  }
1105
1106
  // Set the paging state for next row.
1107
1.49M
  if (fetched_rows >= row_count_limit || scan_time_exceeded) {
1108
25.8k
    SubDocKey next_row_key;
1109
25.8k
    RETURN_NOT_OK(iter->GetNextReadSubDocKey(&next_row_key));
1110
    // When the "limit" number of rows are returned and we are asked to return the paging state,
1111
    // return the partition key and row key of the next row to read in the paging state if there are
1112
    // still more rows to read. Otherwise, leave the paging state empty which means we are done
1113
    // reading from this tablet.
1114
25.8k
    if (!next_row_key.doc_key().empty()) {
1115
23.7k
      const auto& keybytes = next_row_key.Encode();
1116
23.7k
      PgsqlPagingStatePB* paging_state = response_.mutable_paging_state();
1117
23.7k
      RSTATUS_DCHECK(schema != nullptr, IllegalState, "Missing schema");
1118
23.7k
      if (schema->num_hash_key_columns() > 0) {
1119
6.20k
        paging_state->set_next_partition_key(
1120
6.20k
           PartitionSchema::EncodeMultiColumnHashValue(next_row_key.doc_key().hash()));
1121
17.5k
      } else {
1122
17.5k
        paging_state->set_next_partition_key(keybytes.ToStringBuffer());
1123
17.5k
      }
1124
23.7k
      paging_state->set_next_row_key(keybytes.ToStringBuffer());
1125
23.7k
      *has_paging_state = true;
1126
23.7k
    }
1127
25.8k
  }
1128
1.49M
  if (*has_paging_state) {
1129
23.7k
    if (FLAGS_pgsql_consistent_transactional_paging) {
1130
23.7k
      read_time.AddToPB(response_.mutable_paging_state());
1131
0
    } else {
1132
      // Using SingleTime will help avoid read restarts on second page and later but will
1133
      // potentially produce stale results on those pages.
1134
0
      auto per_row_consistent_read_time = ReadHybridTime::SingleTime(read_time.read);
1135
0
      per_row_consistent_read_time.AddToPB(response_.mutable_paging_state());
1136
0
    }
1137
23.7k
  }
1138
1139
1.49M
  return Status::OK();
1140
1.49M
}
1141
1142
Status PgsqlReadOperation::PopulateResultSet(const QLTableRow& table_row,
1143
21.1M
                                             faststring *result_buffer) {
1144
21.1M
  QLExprResult result;
1145
228M
  for (const PgsqlExpressionPB& expr : request_.targets()) {
1146
228M
    RETURN_NOT_OK(EvalExpr(expr, table_row, result.Writer()));
1147
228M
    RETURN_NOT_OK(pggate::WriteColumn(result.Value(), result_buffer));
1148
228M
  }
1149
21.1M
  return Status::OK();
1150
21.1M
}
1151
1152
15.2M
Status PgsqlReadOperation::GetTupleId(QLValue *result) const {
1153
  // Get row key and save to QLValue.
1154
  // TODO(neil) Check if we need to append a table_id and other info to TupleID. For example, we
1155
  // might need info to make sure the TupleId by itself is a valid reference to a specific row of
1156
  // a valid table.
1157
15.2M
  const Slice tuple_id = VERIFY_RESULT(table_iter_->GetTupleId());
1158
15.2M
  result->set_binary_value(tuple_id.data(), tuple_id.size());
1159
15.2M
  return Status::OK();
1160
15.2M
}
1161
1162
1.71M
Status PgsqlReadOperation::EvalAggregate(const QLTableRow& table_row) {
1163
1.71M
  if (aggr_result_.empty()) {
1164
341
    int column_count = request_.targets().size();
1165
341
    aggr_result_.resize(column_count);
1166
341
  }
1167
1168
1.71M
  int aggr_index = 0;
1169
1.71M
  for (const PgsqlExpressionPB& expr : request_.targets()) {
1170
1.71M
    RETURN_NOT_OK(EvalExpr(expr, table_row, aggr_result_[aggr_index++].Writer()));
1171
1.71M
  }
1172
1.71M
  return Status::OK();
1173
1.71M
}
1174
1175
Status PgsqlReadOperation::PopulateAggregate(const QLTableRow& table_row,
1176
345
                                             faststring *result_buffer) {
1177
345
  int column_count = request_.targets().size();
1178
755
  for (int rscol_index = 0; rscol_index < column_count; rscol_index++) {
1179
410
    RETURN_NOT_OK(pggate::WriteColumn(aggr_result_[rscol_index].Value(), result_buffer));
1180
410
  }
1181
345
  return Status::OK();
1182
345
}
1183
1184
1.33M
Status PgsqlReadOperation::GetIntents(const Schema& schema, KeyValueWriteBatchPB* out) {
1185
1.33M
  if (request_.batch_arguments_size() > 0 && request_.has_ybctid_column_value()) {
1186
100k
    for (const auto& batch_argument : request_.batch_arguments()) {
1187
100k
      SCHECK(batch_argument.has_ybctid(), InternalError, "ybctid batch argument is expected");
1188
100k
      RETURN_NOT_OK(AddIntent(batch_argument.ybctid(), request_.wait_policy(), out));
1189
100k
    }
1190
1.33M
  } else {
1191
1.33M
    AddIntent(VERIFY_RESULT(FetchEncodedDocKey(schema, request_)), request_.wait_policy(), out);
1192
1.33M
  }
1193
1.33M
  return Status::OK();
1194
1.33M
}
1195
1196
}  // namespace docdb
1197
}  // namespace yb