YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/pgsql_operation.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/pgsql_operation.h"
15
16
#include <limits>
17
#include <string>
18
#include <unordered_set>
19
#include <vector>
20
21
#include <boost/optional/optional_io.hpp>
22
23
#include "yb/common/partition.h"
24
#include "yb/common/pg_system_attr.h"
25
#include "yb/common/ql_value.h"
26
27
#include "yb/docdb/doc_path.h"
28
#include "yb/docdb/doc_pg_expr.h"
29
#include "yb/docdb/doc_pgsql_scanspec.h"
30
#include "yb/docdb/doc_rowwise_iterator.h"
31
#include "yb/docdb/doc_write_batch.h"
32
#include "yb/docdb/docdb.pb.h"
33
#include "yb/docdb/docdb_debug.h"
34
#include "yb/docdb/docdb_pgapi.h"
35
#include "yb/docdb/docdb_rocksdb_util.h"
36
#include "yb/docdb/intent_aware_iterator.h"
37
#include "yb/docdb/primitive_value_util.h"
38
#include "yb/docdb/ql_storage_interface.h"
39
40
#include "yb/util/flag_tags.h"
41
#include "yb/util/result.h"
42
#include "yb/util/scope_exit.h"
43
#include "yb/util/status_format.h"
44
#include "yb/util/trace.h"
45
46
#include "yb/yql/pggate/util/pg_doc_data.h"
47
48
using namespace std::literals;
49
50
DECLARE_bool(ysql_disable_index_backfill);
51
52
DEFINE_double(ysql_scan_timeout_multiplier, 0.5,
53
              "DEPRECATED. Has no affect, use ysql_scan_deadline_margin_ms to control the client "
54
              "timeout");
55
56
DEFINE_uint64(ysql_scan_deadline_margin_ms, 1000,
57
              "Scan deadline is calculated by adding client timeout to the time when the request "
58
              "was received. It defines the moment in time when client has definitely timed out "
59
              "and if the request is yet in processing after the deadline, it can be canceled. "
60
              "Therefore to prevent client timeout, the request handler should return partial "
61
              "result and paging information some time before the deadline. That's what the "
62
              "ysql_scan_deadline_margin_ms is for. It should account for network and processing "
63
              "delays.");
64
65
DEFINE_bool(pgsql_consistent_transactional_paging, true,
66
            "Whether to enforce consistency of data returned for second page and beyond for YSQL "
67
            "queries on transactional tables. If true, read restart errors could be returned to "
68
            "prevent inconsistency. If false, no read restart errors are returned but the data may "
69
            "be stale. The latter is preferable for long scans. The data returned for the first "
70
            "page of results is never stale regardless of this flag.");
71
72
DEFINE_test_flag(int32, slowdown_pgsql_aggregate_read_ms, 0,
73
                 "If set > 0, slows down the response to pgsql aggregate read by this amount.");
74
75
namespace yb {
76
namespace docdb {
77
78
namespace {
79
80
// Compatibility: accept column references from a legacy nodes as a list of column ids only
81
CHECKED_STATUS CreateProjection(const Schema& schema,
82
                                const PgsqlColumnRefsPB& column_refs,
83
6.70M
                                Schema* projection) {
84
  // Create projection of non-primary key columns. Primary key columns are implicitly read by DocDB.
85
  // It will also sort the columns before scanning.
86
6.70M
  vector<ColumnId> column_ids;
87
6.70M
  column_ids.reserve(column_refs.ids_size());
88
6.70M
  for (int32_t id : column_refs.ids()) {
89
1.45M
    const ColumnId column_id(id);
90
1.45M
    if (!schema.is_key_column(column_id)) {
91
1.30M
      column_ids.emplace_back(column_id);
92
1.30M
    }
93
1.45M
  }
94
6.70M
  return schema.CreateProjectionByIdsIgnoreMissing(column_ids, projection);
95
6.70M
}
96
97
CHECKED_STATUS CreateProjection(
98
    const Schema& schema,
99
    const google::protobuf::RepeatedPtrField<PgsqlColRefPB> &column_refs,
100
3.44M
    Schema* projection) {
101
3.44M
  vector<ColumnId> column_ids;
102
3.44M
  column_ids.reserve(column_refs.size());
103
26.4M
  for (const PgsqlColRefPB& column_ref : column_refs) {
104
26.4M
    const ColumnId column_id(column_ref.column_id());
105
26.4M
    if (!schema.is_key_column(column_id)) {
106
20.9M
      column_ids.emplace_back(column_id);
107
20.9M
    }
108
26.4M
  }
109
3.44M
  return schema.CreateProjectionByIdsIgnoreMissing(column_ids, projection);
110
3.44M
}
111
112
2.86M
void AddIntent(const std::string& encoded_key, WaitPolicy wait_policy, KeyValueWriteBatchPB *out) {
113
2.86M
  auto pair = out->mutable_read_pairs()->Add();
114
2.86M
  pair->set_key(encoded_key);
115
2.86M
  pair->set_value(std::string(1, ValueTypeAsChar::kNullLow));
116
  // Since we don't batch read RPCs that lock rows, we can get away with using a singular
117
  // wait_policy field. Once we start batching read requests (issue #2495), we will need a repeated
118
  // wait policies field.
119
2.86M
  out->set_wait_policy(wait_policy);
120
2.86M
}
121
122
CHECKED_STATUS AddIntent(const PgsqlExpressionPB& ybctid, WaitPolicy wait_policy,
123
101k
                         KeyValueWriteBatchPB* out) {
124
101k
  const auto &val = ybctid.value().binary_value();
125
101k
  SCHECK(!val.empty(), InternalError, "empty ybctid");
126
101k
  AddIntent(val, wait_policy, out);
127
101k
  return Status::OK();
128
101k
}
129
130
template<class R, class Request, class DocKeyProcessor, class EncodedDocKeyProcessor>
131
Result<R> FetchDocKeyImpl(const Schema& schema,
132
                          const Request& req,
133
                          const DocKeyProcessor& dk_processor,
134
15.6M
                          const EncodedDocKeyProcessor& edk_processor) {
135
  // Init DocDB key using either ybctid or partition and range values.
136
15.6M
  if (req.has_ybctid_column_value()) {
137
5.86M
    const auto& ybctid = req.ybctid_column_value().value().binary_value();
138
5.86M
    SCHECK(!ybctid.empty(), InternalError, "empty ybctid");
139
5.86M
    return edk_processor(ybctid);
140
9.76M
  } else {
141
9.76M
    auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
142
9.76M
        req.partition_column_values(), schema, 0 /* start_idx */));
143
9.76M
    auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
144
9.76M
        req.range_column_values(), schema, schema.num_hash_key_columns()));
145
9.76M
    return dk_processor(hashed_components.empty()
146
9.76M
        ? 
DocKey(schema, std::move(range_components))6.51M
147
9.76M
        : DocKey(
148
3.25M
            schema, req.hash_code(), std::move(hashed_components), std::move(range_components)));
149
9.76M
  }
150
15.6M
}
pgsql_operation.cc:yb::Result<yb::docdb::DocKey> yb::docdb::(anonymous namespace)::FetchDocKeyImpl<yb::docdb::DocKey, yb::PgsqlWriteRequestPB, yb::docdb::(anonymous namespace)::FetchDocKey(yb::Schema const&, yb::PgsqlWriteRequestPB const&)::$_4, yb::docdb::(anonymous namespace)::FetchDocKey(yb::Schema const&, yb::PgsqlWriteRequestPB const&)::$_5>(yb::Schema const&, yb::PgsqlWriteRequestPB const&, yb::docdb::(anonymous namespace)::FetchDocKey(yb::Schema const&, yb::PgsqlWriteRequestPB const&)::$_4 const&, yb::docdb::(anonymous namespace)::FetchDocKey(yb::Schema const&, yb::PgsqlWriteRequestPB const&)::$_5 const&)
Line
Count
Source
134
12.8M
                          const EncodedDocKeyProcessor& edk_processor) {
135
  // Init DocDB key using either ybctid or partition and range values.
136
12.8M
  if (req.has_ybctid_column_value()) {
137
5.86M
    const auto& ybctid = req.ybctid_column_value().value().binary_value();
138
5.86M
    SCHECK(!ybctid.empty(), InternalError, "empty ybctid");
139
5.86M
    return edk_processor(ybctid);
140
7.01M
  } else {
141
7.01M
    auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
142
7.01M
        req.partition_column_values(), schema, 0 /* start_idx */));
143
7.01M
    auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
144
7.01M
        req.range_column_values(), schema, schema.num_hash_key_columns()));
145
7.01M
    return dk_processor(hashed_components.empty()
146
7.01M
        ? 
DocKey(schema, std::move(range_components))6.49M
147
7.01M
        : DocKey(
148
514k
            schema, req.hash_code(), std::move(hashed_components), std::move(range_components)));
149
7.01M
  }
150
12.8M
}
pgsql_operation.cc:yb::Result<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > yb::docdb::(anonymous namespace)::FetchDocKeyImpl<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::PgsqlReadRequestPB, yb::docdb::(anonymous namespace)::FetchEncodedDocKey(yb::Schema const&, yb::PgsqlReadRequestPB const&)::$_3, yb::docdb::(anonymous namespace)::FetchEncodedDocKey(yb::Schema const&, yb::PgsqlReadRequestPB const&)::$_2>(yb::Schema const&, yb::PgsqlReadRequestPB const&, yb::docdb::(anonymous namespace)::FetchEncodedDocKey(yb::Schema const&, yb::PgsqlReadRequestPB const&)::$_3 const&, yb::docdb::(anonymous namespace)::FetchEncodedDocKey(yb::Schema const&, yb::PgsqlReadRequestPB const&)::$_2 const&)
Line
Count
Source
134
2.76M
                          const EncodedDocKeyProcessor& edk_processor) {
135
  // Init DocDB key using either ybctid or partition and range values.
136
2.76M
  if (req.has_ybctid_column_value()) {
137
5.33k
    const auto& ybctid = req.ybctid_column_value().value().binary_value();
138
5.33k
    SCHECK(!ybctid.empty(), InternalError, "empty ybctid");
139
5.33k
    return edk_processor(ybctid);
140
2.75M
  } else {
141
2.75M
    auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
142
2.75M
        req.partition_column_values(), schema, 0 /* start_idx */));
143
2.75M
    auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues(
144
2.75M
        req.range_column_values(), schema, schema.num_hash_key_columns()));
145
2.75M
    return dk_processor(hashed_components.empty()
146
2.75M
        ? 
DocKey(schema, std::move(range_components))19.4k
147
2.75M
        : DocKey(
148
2.73M
            schema, req.hash_code(), std::move(hashed_components), std::move(range_components)));
149
2.75M
  }
150
2.76M
}
151
152
2.76M
Result<string> FetchEncodedDocKey(const Schema& schema, const PgsqlReadRequestPB& request) {
153
2.76M
  return FetchDocKeyImpl<string>(
154
2.76M
      schema, request,
155
2.76M
      [](const auto& doc_key) 
{ return doc_key.Encode().ToStringBuffer(); }2.75M
,
156
2.76M
      [](const auto& encoded_doc_key) 
{ return encoded_doc_key; }5.33k
);
157
2.76M
}
158
159
12.8M
Result<DocKey> FetchDocKey(const Schema& schema, const PgsqlWriteRequestPB& request) {
160
12.8M
  return FetchDocKeyImpl<DocKey>(
161
12.8M
      schema, request,
162
12.8M
      [](const auto& doc_key) 
{ return doc_key; }7.01M
,
163
12.8M
      [&schema](const auto& encoded_doc_key) -> Result<DocKey> {
164
5.86M
        DocKey key(schema);
165
5.86M
        RETURN_NOT_OK(key.DecodeFrom(encoded_doc_key));
166
5.86M
        return key;
167
5.86M
      });
168
12.8M
}
169
170
Result<YQLRowwiseIteratorIf::UniPtr> CreateIterator(
171
    const YQLStorageIf& ql_storage,
172
    const PgsqlReadRequestPB& request,
173
    const Schema& projection,
174
    const Schema& schema,
175
    const TransactionOperationContext& txn_op_context,
176
    CoarseTimePoint deadline,
177
    const ReadHybridTime& read_time,
178
3.91M
    bool is_explicit_request_read_time) {
179
3.91M
  VLOG_IF
(2, request.is_for_backfill()) << "Creating iterator for " << yb::ToString(request)1.30k
;
180
181
3.91M
  YQLRowwiseIteratorIf::UniPtr result;
182
  // TODO(neil) Remove the following IF block when it is completely obsolete.
183
  // The following IF block has not been used since 2.1 release.
184
  // We keep it here only for rolling upgrade purpose.
185
3.91M
  if (request.has_ybctid_column_value()) {
186
9.94k
    SCHECK(!request.has_paging_state(),
187
9.94k
           InternalError,
188
9.94k
           "Each ybctid value identifies one row in the table while paging state "
189
9.94k
           "is only used for multi-row queries.");
190
9.94k
    RETURN_NOT_OK(ql_storage.GetIterator(
191
9.94k
        request.stmt_id(), projection, schema, txn_op_context,
192
9.94k
        deadline, read_time, request.ybctid_column_value().value(), &result));
193
3.90M
  } else {
194
3.90M
    SubDocKey start_sub_doc_key;
195
3.90M
    auto actual_read_time = read_time;
196
    // Decode the start SubDocKey from the paging state and set scan start key.
197
3.90M
    if (request.has_paging_state() &&
198
3.90M
        
request.paging_state().has_next_row_key()121k
&&
199
3.90M
        
!request.paging_state().next_row_key().empty()44.6k
) {
200
44.6k
      KeyBytes start_key_bytes(request.paging_state().next_row_key());
201
44.6k
      RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice()));
202
      // TODO(dmitry) Remove backward compatibility block when obsolete.
203
44.6k
      if (!is_explicit_request_read_time) {
204
0
        if (request.paging_state().has_read_time()) {
205
0
          actual_read_time = ReadHybridTime::FromPB(request.paging_state().read_time());
206
0
        } else {
207
0
          actual_read_time.read = start_sub_doc_key.hybrid_time();
208
0
        }
209
0
      }
210
3.86M
    } else if (request.is_for_backfill()) {
211
2.25k
      RSTATUS_DCHECK(is_explicit_request_read_time, InvalidArgument,
212
2.25k
                     "Backfill request should already be using explicit read times.");
213
2.25k
      PgsqlBackfillSpecPB spec;
214
2.25k
      spec.ParseFromString(a2b_hex(request.backfill_spec()));
215
2.25k
      if (!spec.next_row_key().empty()) {
216
390
        KeyBytes start_key_bytes(spec.next_row_key());
217
390
        RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice()));
218
390
      }
219
2.25k
    }
220
3.90M
    RETURN_NOT_OK(ql_storage.GetIterator(
221
3.90M
        request, projection, schema, txn_op_context,
222
3.90M
        deadline, read_time, start_sub_doc_key.doc_key(), &result));
223
3.90M
  }
224
3.91M
  return std::move(result);
225
3.91M
}
226
227
class DocKeyColumnPathBuilder {
228
 public:
229
  explicit DocKeyColumnPathBuilder(const RefCntPrefix& doc_key)
230
18.1M
      : doc_key_(doc_key.as_slice()) {
231
18.1M
  }
232
233
28.0M
  RefCntPrefix Build(ColumnIdRep column_id) {
234
28.0M
    buffer_.Clear();
235
28.0M
    buffer_.AppendValueType(ValueType::kColumnId);
236
28.0M
    buffer_.AppendColumnId(ColumnId(column_id));
237
28.0M
    RefCntBuffer path(doc_key_.size() + buffer_.size());
238
28.0M
    doc_key_.CopyTo(path.data());
239
28.0M
    buffer_.AsSlice().CopyTo(path.data() + doc_key_.size());
240
28.0M
    return path;
241
28.0M
  }
242
243
 private:
244
  Slice doc_key_;
245
  KeyBytes buffer_;
246
};
247
248
} // namespace
249
250
//--------------------------------------------------------------------------------------------------
251
252
12.8M
Status PgsqlWriteOperation::Init(PgsqlResponsePB* response) {
253
  // Initialize operation inputs.
254
12.8M
  response_ = response;
255
256
12.8M
  doc_key_ = VERIFY_RESULT(FetchDocKey(schema_, request_));
257
0
  encoded_doc_key_ = doc_key_->EncodeAsRefCntPrefix();
258
259
12.8M
  return Status::OK();
260
12.8M
}
261
262
// Check if a duplicate value is inserted into a unique index.
263
602
Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue(const DocOperationApplyData& data) {
264
602
  VLOG(3) << "Looking for collisions in\n"
265
0
          << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db());
266
  // We need to check backwards only for backfilled entries.
267
602
  bool ret =
268
602
      VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kForward)) ||
269
602
      
(595
request_.is_backfill()595
&&
270
595
       VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kBackward)));
271
602
  if (!ret) {
272
594
    VLOG
(3) << "No collisions found"0
;
273
594
  }
274
602
  return ret;
275
602
}
276
277
Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue(
278
1.19k
    const DocOperationApplyData& data, Direction direction) {
279
1.19k
  VLOG(2) << "Looking for collision while going " << yb::ToString(direction)
280
0
          << ". Trying to insert " << *doc_key_;
281
1.19k
  auto requested_read_time = data.read_time;
282
1.19k
  if (direction == Direction::kForward) {
283
602
    return HasDuplicateUniqueIndexValue(data, requested_read_time);
284
602
  }
285
286
595
  auto iter = CreateIntentAwareIterator(
287
595
      data.doc_write_batch->doc_db(),
288
595
      BloomFilterMode::USE_BLOOM_FILTER,
289
595
      doc_key_->Encode().AsSlice(),
290
595
      rocksdb::kDefaultQueryId,
291
595
      txn_op_context_,
292
595
      data.deadline,
293
595
      ReadHybridTime::Max());
294
295
595
  HybridTime oldest_past_min_ht = VERIFY_RESULT(FindOldestOverwrittenTimestamp(
296
595
      iter.get(), SubDocKey(*doc_key_), requested_read_time.read));
297
0
  const HybridTime oldest_past_min_ht_liveness =
298
595
      VERIFY_RESULT(FindOldestOverwrittenTimestamp(
299
595
          iter.get(),
300
595
          SubDocKey(*doc_key_, PrimitiveValue::kLivenessColumn),
301
595
          requested_read_time.read));
302
0
  oldest_past_min_ht.MakeAtMost(oldest_past_min_ht_liveness);
303
595
  if (!oldest_past_min_ht.is_valid()) {
304
593
    return false;
305
593
  }
306
2
  return HasDuplicateUniqueIndexValue(
307
2
      data, ReadHybridTime::SingleTime(oldest_past_min_ht));
308
595
}
309
310
Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue(
311
604
    const DocOperationApplyData& data, ReadHybridTime read_time) {
312
  // Set up the iterator to read the current primary key associated with the index key.
313
604
  DocPgsqlScanSpec spec(schema_, request_.stmt_id(), *doc_key_);
314
604
  DocRowwiseIterator iterator(schema_,
315
604
                              schema_,
316
604
                              txn_op_context_,
317
604
                              data.doc_write_batch->doc_db(),
318
604
                              data.deadline,
319
604
                              read_time);
320
604
  RETURN_NOT_OK(iterator.Init(spec));
321
322
  // It is a duplicate value if the index key exists already and the index value (corresponding to
323
  // the indexed table's primary key) is not the same.
324
604
  if (!VERIFY_RESULT(iterator.HasNext())) {
325
505
    VLOG
(2) << "No collision found while checking at " << yb::ToString(read_time)0
;
326
505
    return false;
327
505
  }
328
329
99
  QLTableRow table_row;
330
99
  RETURN_NOT_OK(iterator.NextRow(&table_row));
331
99
  for (const auto& column_value : request_.column_values()) {
332
    // Get the column.
333
99
    if (!column_value.has_column_id()) {
334
0
      return STATUS(InternalError, "column id missing", column_value.DebugString());
335
0
    }
336
99
    const ColumnId column_id(column_value.column_id());
337
338
    // Check column-write operator.
339
99
    CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert)
340
0
      << "Illegal write instruction";
341
342
    // Evaluate column value.
343
99
    QLExprResult expr_result;
344
99
    RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer()));
345
346
99
    boost::optional<const QLValuePB&> existing_value = table_row.GetValue(column_id);
347
99
    const QLValuePB& new_value = expr_result.Value();
348
99
    if (existing_value && *existing_value != new_value) {
349
8
      VLOG(2) << "Found collision while checking at " << yb::ToString(read_time)
350
0
              << "\nExisting: " << yb::ToString(*existing_value)
351
0
              << " vs New: " << yb::ToString(new_value)
352
0
              << "\nUsed read time as " << yb::ToString(data.read_time);
353
8
      DVLOG(3) << "DocDB is now:\n"
354
0
               << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db());
355
8
      return true;
356
8
    }
357
99
  }
358
359
91
  VLOG
(2) << "No collision while checking at " << yb::ToString(read_time)0
;
360
91
  return false;
361
99
}
362
363
Result<HybridTime> PgsqlWriteOperation::FindOldestOverwrittenTimestamp(
364
    IntentAwareIterator* iter,
365
    const SubDocKey& sub_doc_key,
366
1.19k
    HybridTime min_read_time) {
367
1.19k
  HybridTime result;
368
1.19k
  VLOG
(3) << "Doing iter->Seek " << *doc_key_0
;
369
1.19k
  iter->Seek(*doc_key_);
370
1.19k
  if (iter->valid()) {
371
994
    const KeyBytes bytes = sub_doc_key.EncodeWithoutHt();
372
994
    const Slice& sub_key_slice = bytes.AsSlice();
373
994
    result = VERIFY_RESULT(
374
0
        iter->FindOldestRecord(sub_key_slice, min_read_time));
375
0
    VLOG(2) << "iter->FindOldestRecord returned " << result << " for "
376
0
            << SubDocKey::DebugSliceToString(sub_key_slice);
377
994
  } else {
378
196
    VLOG
(3) << "iter->Seek " << *doc_key_ << " turned out to be invalid"0
;
379
196
  }
380
1.19k
  return result;
381
1.19k
}
382
383
12.8M
Status PgsqlWriteOperation::Apply(const DocOperationApplyData& data) {
384
12.8M
  VLOG
(4) << "Write, read time: " << data.read_time << ", txn: " << txn_op_context_186
;
385
386
12.8M
  auto scope_exit = ScopeExit([this] {
387
12.8M
    if (!result_buffer_.empty()) {
388
12.8M
      NetworkByteOrder::Store64(result_buffer_.data(), result_rows_);
389
12.8M
    }
390
12.8M
  });
391
392
12.8M
  switch (request_.stmt_type()) {
393
4.61M
    case PgsqlWriteRequestPB::PGSQL_INSERT:
394
4.61M
      return ApplyInsert(data, IsUpsert::kFalse);
395
396
695k
    case PgsqlWriteRequestPB::PGSQL_UPDATE:
397
695k
      return ApplyUpdate(data);
398
399
907k
    case PgsqlWriteRequestPB::PGSQL_DELETE:
400
907k
      return ApplyDelete(data, request_.is_delete_persist_needed());
401
402
6.61M
    case PgsqlWriteRequestPB::PGSQL_UPSERT: {
403
      // Upserts should not have column refs (i.e. require read).
404
6.61M
      RSTATUS_DCHECK(request_.col_refs().empty(),
405
6.61M
              IllegalState,
406
6.61M
              "Upsert operation should not have column references");
407
6.61M
      return ApplyInsert(data, IsUpsert::kTrue);
408
0
    }
409
410
90
    case PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED:
411
90
      return ApplyTruncateColocated(data);
412
12.8M
  }
413
0
  return Status::OK();
414
12.8M
}
415
416
11.2M
Status PgsqlWriteOperation::ApplyInsert(const DocOperationApplyData& data, IsUpsert is_upsert) {
417
11.2M
  QLTableRow table_row;
418
11.2M
  if (!is_upsert) {
419
4.61M
    if (request_.is_backfill()) {
420
602
      if (VERIFY_RESULT(HasDuplicateUniqueIndexValue(data))) {
421
        // Unique index value conflict found.
422
8
        response_->set_status(PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR);
423
8
        response_->set_error_message("Duplicate key found in unique index");
424
8
        return Status::OK();
425
8
      }
426
4.61M
    } else {
427
      // Non-backfill requests shouldn't use HasDuplicateUniqueIndexValue because
428
      // - they should error even if the conflicting row matches
429
      // - retrieving and calculating whether the conflicting row matches is a waste
430
4.61M
      RETURN_NOT_OK(ReadColumns(data, &table_row));
431
4.61M
      if (!table_row.IsEmpty()) {
432
11.1k
        VLOG
(4) << "Duplicate row: " << table_row.ToString()0
;
433
        // Primary key or unique index value found.
434
11.1k
        response_->set_status(PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR);
435
11.1k
        response_->set_error_message("Duplicate key found in primary key or unique index");
436
11.1k
        return Status::OK();
437
11.1k
      }
438
4.61M
    }
439
4.61M
  }
440
441
11.2M
  RETURN_NOT_OK(data.doc_write_batch->SetPrimitive(
442
11.2M
      DocPath(encoded_doc_key_.as_slice(), PrimitiveValue::kLivenessColumn),
443
11.2M
      ValueControlFields(), ValueRef(ValueType::kNullLow), data.read_time, data.deadline,
444
11.2M
      request_.stmt_id()));
445
446
46.8M
  
for (const auto& column_value : request_.column_values())11.2M
{
447
    // Get the column.
448
46.8M
    if (!column_value.has_column_id()) {
449
0
      return STATUS(InternalError, "column id missing", column_value.DebugString());
450
0
    }
451
46.8M
    const ColumnId column_id(column_value.column_id());
452
46.8M
    const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id));
453
454
    // Check column-write operator.
455
1.35k
    CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert)
456
1.35k
      << "Illegal write instruction";
457
458
    // Evaluate column value.
459
46.8M
    QLExprResult expr_result;
460
46.8M
    RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer()));
461
462
    // Inserting into specified column.
463
46.8M
    DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id));
464
46.8M
    RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
465
46.8M
        sub_path, ValueRef(expr_result.Value(), column.sorting_type()),
466
46.8M
        data.read_time, data.deadline, request_.stmt_id()));
467
46.8M
  }
468
469
11.2M
  RETURN_NOT_OK(PopulateResultSet(table_row));
470
471
11.2M
  response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK);
472
11.2M
  return Status::OK();
473
11.2M
}
474
475
695k
Status PgsqlWriteOperation::ApplyUpdate(const DocOperationApplyData& data) {
476
695k
  QLTableRow table_row;
477
695k
  RETURN_NOT_OK(ReadColumns(data, &table_row));
478
695k
  if (table_row.IsEmpty()) {
479
    // Row not found.
480
12
    response_->set_skipped(true);
481
12
    return Status::OK();
482
12
  }
483
695k
  QLTableRow returning_table_row;
484
695k
  if (request_.targets_size()) {
485
6
    returning_table_row = table_row;
486
6
  }
487
488
  // skipped is set to false if this operation produces some data to write.
489
695k
  bool skipped = true;
490
491
695k
  if (request_.has_ybctid_column_value()) {
492
692k
    DocPgExprExecutor expr_exec(&schema_);
493
692k
    std::vector<QLExprResult> results;
494
692k
    int num_exprs = 0;
495
692k
    int cur_expr = 0;
496
971k
    for (const auto& column_value : request_.column_new_values()) {
497
971k
      if (GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kPgEvalExprCall) {
498
16.7k
        RETURN_NOT_OK(expr_exec.AddTargetExpression(column_value.expr()));
499
16.7k
        VLOG
(1) << "Added target expression to the executor"0
;
500
16.7k
        num_exprs++;
501
16.7k
      }
502
971k
    }
503
692k
    if (num_exprs > 0) {
504
11.8k
      bool match;
505
16.7k
      for (const PgsqlColRefPB& column_ref : request_.col_refs()) {
506
16.7k
        RETURN_NOT_OK(expr_exec.AddColumnRef(column_ref));
507
16.7k
        VLOG
(1) << "Added column reference to the executor"0
;
508
16.7k
      }
509
11.8k
      results.resize(num_exprs);
510
11.8k
      RETURN_NOT_OK(expr_exec.Exec(table_row, &results, &match));
511
11.8k
    }
512
971k
    
for (const auto& column_value : request_.column_new_values())692k
{
513
      // Get the column.
514
971k
      if (!column_value.has_column_id()) {
515
0
        return STATUS(InternalError, "column id missing", column_value.DebugString());
516
0
      }
517
971k
      const ColumnId column_id(column_value.column_id());
518
971k
      const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id));
519
      // Evaluate column value.
520
0
      QLExprResult expr_result;
521
522
971k
      if (GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kPgEvalExprCall) {
523
16.7k
        expr_result = std::move(results[cur_expr++]);
524
954k
      } else {
525
        // Check column-write operator.
526
954k
        SCHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert,
527
954k
               InternalError,
528
954k
               "Unsupported DocDB Expression");
529
530
954k
        RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer(), &schema_));
531
954k
      }
532
533
      // Update RETURNING values
534
971k
      if (request_.targets_size()) {
535
6
        returning_table_row.AllocColumn(column_id, expr_result.Value());
536
6
      }
537
538
      // Inserting into specified column.
539
971k
      DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id));
540
971k
      RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
541
971k
          sub_path, ValueRef(expr_result.Value(), column.sorting_type()), data.read_time,
542
971k
          data.deadline, request_.stmt_id()));
543
971k
      skipped = false;
544
971k
    }
545
692k
  } else {
546
    // This UPDATE is calling PGGATE directly without going thru PostgreSQL layer.
547
    // Keep it here as we might need it.
548
549
    // Very limited support for where expressions. Only used for updates to the sequences data
550
    // table.
551
3.02k
    bool is_match = true;
552
3.02k
    if (request_.has_where_expr()) {
553
2.97k
      QLExprResult match;
554
2.97k
      RETURN_NOT_OK(EvalExpr(request_.where_expr(), table_row, match.Writer()));
555
2.97k
      is_match = match.Value().bool_value();
556
2.97k
    }
557
558
3.02k
    if (is_match) {
559
5.99k
      for (const auto &column_value : request_.column_new_values()) {
560
        // Get the column.
561
5.99k
        if (!column_value.has_column_id()) {
562
0
          return STATUS(InternalError, "column id missing", column_value.DebugString());
563
0
        }
564
5.99k
        const ColumnId column_id(column_value.column_id());
565
5.99k
        const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id));
566
567
        // Check column-write operator.
568
0
        CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert)
569
0
        << "Illegal write instruction";
570
571
        // Evaluate column value.
572
5.99k
        QLExprResult expr_result;
573
5.99k
        RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer()));
574
575
        // Inserting into specified column.
576
5.99k
        DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id));
577
5.99k
        RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
578
5.99k
            sub_path, ValueRef(expr_result.Value(), column.sorting_type()), data.read_time,
579
5.99k
            data.deadline, request_.stmt_id()));
580
5.99k
        skipped = false;
581
5.99k
      }
582
2.98k
    }
583
3.02k
  }
584
585
695k
  if (request_.targets_size()) {
586
    // Returning the values after the update.
587
6
    RETURN_NOT_OK(PopulateResultSet(returning_table_row));
588
695k
  } else {
589
    // Returning the values before the update.
590
695k
    RETURN_NOT_OK(PopulateResultSet(table_row));
591
695k
  }
592
593
695k
  if (skipped) {
594
0
    response_->set_skipped(true);
595
0
  }
596
695k
  response_->set_rows_affected_count(1);
597
695k
  response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK);
598
695k
  return Status::OK();
599
695k
}
600
601
Status PgsqlWriteOperation::ApplyDelete(
602
    const DocOperationApplyData& data,
603
907k
    const bool is_persist_needed) {
604
907k
  int num_deleted = 1;
605
907k
  QLTableRow table_row;
606
907k
  RETURN_NOT_OK(ReadColumns(data, &table_row));
607
907k
  if (table_row.IsEmpty()) {
608
    // Row not found.
609
    // Return early unless we still want to apply the delete for backfill purposes.  Deletes to
610
    // nonexistent rows are expected to get written to the index when the index has the delete
611
    // permission during an online schema migration.  num_deleted should be 0 because we don't want
612
    // to report back to the user that we deleted 1 row; response_ should not set skipped because it
613
    // will prevent tombstone intents from getting applied.
614
42
    if (!is_persist_needed) {
615
37
      response_->set_skipped(true);
616
37
      return Status::OK();
617
37
    }
618
5
    num_deleted = 0;
619
5
  }
620
621
  // TODO(neil) Add support for WHERE clause.
622
907k
  CHECK
(request_.column_values_size() == 0) << "WHERE clause condition is not yet fully supported"11
;
623
624
  // Otherwise, delete the referenced row (all columns).
625
907k
  RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(DocPath(
626
907k
      encoded_doc_key_.as_slice()), data.read_time, data.deadline));
627
628
907k
  RETURN_NOT_OK(PopulateResultSet(table_row));
629
630
907k
  response_->set_rows_affected_count(num_deleted);
631
907k
  response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK);
632
907k
  return Status::OK();
633
907k
}
634
635
90
Status PgsqlWriteOperation::ApplyTruncateColocated(const DocOperationApplyData& data) {
636
90
  RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(DocPath(
637
90
      encoded_doc_key_.as_slice()), data.read_time, data.deadline));
638
90
  response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK);
639
90
  return Status::OK();
640
90
}
641
642
Status PgsqlWriteOperation::ReadColumns(const DocOperationApplyData& data,
643
6.21M
                                        QLTableRow* table_row) {
644
  // Filter the columns using primary key.
645
6.21M
  if (doc_key_) {
646
6.21M
    Schema projection;
647
6.21M
    RETURN_NOT_OK(CreateProjection(schema_, request_.column_refs(), &projection));
648
6.21M
    DocPgsqlScanSpec spec(projection, request_.stmt_id(), *doc_key_);
649
6.21M
    DocRowwiseIterator iterator(projection,
650
6.21M
                                schema_,
651
6.21M
                                txn_op_context_,
652
6.21M
                                data.doc_write_batch->doc_db(),
653
6.21M
                                data.deadline,
654
6.21M
                                data.read_time);
655
6.21M
    RETURN_NOT_OK(iterator.Init(spec));
656
6.21M
    if (VERIFY_RESULT(iterator.HasNext())) {
657
1.61M
      RETURN_NOT_OK(iterator.NextRow(table_row));
658
4.60M
    } else {
659
4.60M
      table_row->Clear();
660
4.60M
    }
661
6.21M
    data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt());
662
6.21M
  }
663
664
6.21M
  return Status::OK();
665
6.21M
}
666
667
12.8M
Status PgsqlWriteOperation::PopulateResultSet(const QLTableRow& table_row) {
668
12.8M
  if (result_buffer_.empty()) {
669
    // Reserve space for num rows.
670
12.8M
    pggate::PgWire::WriteInt64(0, &result_buffer_);
671
12.8M
  }
672
12.8M
  ++result_rows_;
673
12.8M
  int rscol_index = 0;
674
12.8M
  for (const PgsqlExpressionPB& expr : request_.targets()) {
675
23
    if (expr.has_column_id()) {
676
23
      QLExprResult value;
677
23
      if (expr.column_id() == static_cast<int>(PgSystemAttrNum::kYBTupleId)) {
678
        // Strip cotable ID / colocation ID from the serialized DocKey before returning it
679
        // as ybctid.
680
0
        Slice tuple_id = encoded_doc_key_.as_slice();
681
0
        if (tuple_id.starts_with(ValueTypeAsChar::kTableId)) {
682
0
          tuple_id.remove_prefix(1 + kUuidSize);
683
0
        } else if (tuple_id.starts_with(ValueTypeAsChar::kColocationId)) {
684
0
          tuple_id.remove_prefix(1 + sizeof(ColocationId));
685
0
        }
686
0
        value.Writer().NewValue().set_binary_value(tuple_id.data(), tuple_id.size());
687
23
      } else {
688
23
        RETURN_NOT_OK(EvalExpr(expr, table_row, value.Writer()));
689
23
      }
690
23
      RETURN_NOT_OK(pggate::WriteColumn(value.Value(), &result_buffer_));
691
23
    }
692
23
    rscol_index++;
693
23
  }
694
12.8M
  return Status::OK();
695
12.8M
}
696
697
Status PgsqlWriteOperation::GetDocPaths(GetDocPathsMode mode,
698
                                        DocPathsToLock *paths,
699
20.0M
                                        IsolationLevel *level) const {
700
  // When this write operation requires a read, it requires a read snapshot so paths will be locked
701
  // in snapshot isolation for consistency. Otherwise, pure writes will happen in serializable
702
  // isolation so that they will serialize but do not conflict with one another.
703
  //
704
  // Currently, only keys that are being written are locked, no lock is taken on read at the
705
  // snapshot isolation level.
706
20.0M
  *level = RequireReadSnapshot() ? 
IsolationLevel::SNAPSHOT_ISOLATION12.5M
707
20.0M
                                 : 
IsolationLevel::SERIALIZABLE_ISOLATION7.54M
;
708
709
20.0M
  switch (mode) {
710
12.9M
    case GetDocPathsMode::kLock: {
711
      // Weak intent is required to lock the row and prevent it from being removed.
712
      // For this purpose path for row's SystemColumnIds::kLivenessColumn column is returned.
713
      // The caller code will create strong intent for returned path (raw's column doc key)
714
      // and weak intents for all its prefixes (including row's doc key).
715
12.9M
      if (!encoded_doc_key_) {
716
0
        return Status::OK();
717
0
      }
718
12.9M
      if (request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) {
719
        // In case of UPDATE some columns may have expressions instead of exact value.
720
        // These expressions may read column value.
721
        // Potentially expression for updating column v1 may read value of column v2.
722
        //
723
        // UPDATE t SET v = v + 10 WHERE k = 1
724
        // UPDATE t SET v1 = v2 + 10 WHERE k = 1
725
        //
726
        // Strong intent for the whole row is required in this case as it may be too expensive to
727
        // determine what exact columns are read by the expression.
728
729
995k
        for (const auto& column_value : request_.column_new_values()) {
730
995k
          if (!column_value.expr().has_value()) {
731
11.9k
            paths->push_back(encoded_doc_key_);
732
11.9k
            return Status::OK();
733
11.9k
          }
734
995k
        }
735
718k
      }
736
12.9M
      DocKeyColumnPathBuilder builder(encoded_doc_key_);
737
12.9M
      paths->push_back(builder.Build(to_underlying(SystemColumnIds::kLivenessColumn)));
738
12.9M
      break;
739
12.9M
    }
740
7.10M
    case GetDocPathsMode::kIntents: {
741
7.10M
      const google::protobuf::RepeatedPtrField<PgsqlColumnValuePB>* column_values = nullptr;
742
7.10M
      if (request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_INSERT ||
743
7.10M
          
request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPSERT2.52M
) {
744
5.49M
        column_values = &request_.column_values();
745
5.49M
      } else 
if (1.60M
request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE1.60M
) {
746
702k
        column_values = &request_.column_new_values();
747
702k
      }
748
749
7.10M
      if (column_values != nullptr && 
!column_values->empty()6.20M
) {
750
5.17M
        DocKeyColumnPathBuilder builder(encoded_doc_key_);
751
15.1M
        for (const auto& column_value : *column_values) {
752
15.1M
          paths->push_back(builder.Build(column_value.column_id()));
753
15.1M
        }
754
5.17M
      } else 
if (1.92M
encoded_doc_key_1.92M
) {
755
1.92M
        paths->push_back(encoded_doc_key_);
756
1.92M
      }
757
7.10M
      break;
758
12.9M
    }
759
20.0M
  }
760
20.0M
  return Status::OK();
761
20.0M
}
762
763
//--------------------------------------------------------------------------------------------------
764
765
Result<size_t> PgsqlReadOperation::Execute(const YQLStorageIf& ql_storage,
766
                                           CoarseTimePoint deadline,
767
                                           const ReadHybridTime& read_time,
768
                                           bool is_explicit_request_read_time,
769
                                           const Schema& schema,
770
                                           const Schema *index_schema,
771
                                           faststring *result_buffer,
772
3.47M
                                           HybridTime *restart_read_ht) {
773
3.47M
  size_t fetched_rows = 0;
774
  // Reserve space for fetched rows count.
775
3.47M
  pggate::PgWire::WriteInt64(0, result_buffer);
776
3.47M
  auto se = ScopeExit([&fetched_rows, result_buffer] {
777
3.47M
    NetworkByteOrder::Store64(result_buffer->data(), fetched_rows);
778
3.47M
  });
779
3.47M
  VLOG
(4) << "Read, read time: " << read_time << ", txn: " << txn_op_context_2.24k
;
780
781
  // Fetching data.
782
3.47M
  bool has_paging_state = false;
783
3.47M
  if (request_.batch_arguments_size() > 0) {
784
6.35k
    SCHECK(request_.has_ybctid_column_value(),
785
6.35k
           InternalError,
786
6.35k
           "ybctid arguments can be batched only");
787
6.35k
    fetched_rows = VERIFY_RESULT(ExecuteBatchYbctid(
788
6.35k
        ql_storage, deadline, read_time, schema,
789
6.35k
        result_buffer, restart_read_ht));
790
3.47M
  } else if (request_.has_sampling_state()) {
791
406
    fetched_rows = VERIFY_RESULT(ExecuteSample(
792
406
        ql_storage, deadline, read_time, is_explicit_request_read_time, schema,
793
406
        result_buffer, restart_read_ht, &has_paging_state));
794
3.46M
  } else {
795
3.46M
    fetched_rows = VERIFY_RESULT(ExecuteScalar(
796
3.46M
        ql_storage, deadline, read_time, is_explicit_request_read_time, schema, index_schema,
797
3.46M
        result_buffer, restart_read_ht, &has_paging_state));
798
3.46M
  }
799
800
3.47M
  VTRACE(1, "Fetched $0 rows. $1 paging state", fetched_rows, (has_paging_state ? "No" : "Has"));
801
3.47M
  *restart_read_ht = table_iter_->RestartReadHt();
802
3.47M
  return fetched_rows;
803
3.47M
}
804
805
Result<size_t> PgsqlReadOperation::ExecuteSample(const YQLStorageIf& ql_storage,
806
                                                 CoarseTimePoint deadline,
807
                                                 const ReadHybridTime& read_time,
808
                                                 bool is_explicit_request_read_time,
809
                                                 const Schema& schema,
810
                                                 faststring *result_buffer,
811
                                                 HybridTime *restart_read_ht,
812
406
                                                 bool *has_paging_state) {
813
406
  *has_paging_state = false;
814
406
  size_t scanned_rows = 0;
815
406
  PgsqlSamplingStatePB sampling_state = request_.sampling_state();
816
  // Requested total number of rows to collect
817
406
  int targrows = sampling_state.targrows();
818
  // Number of rows collected so far
819
406
  int numrows = sampling_state.numrows();
820
  // Total number of rows scanned
821
406
  double samplerows = sampling_state.samplerows();
822
  // Current number of rows to skip before collecting next one for sample
823
406
  double rowstoskip = sampling_state.rowstoskip();
824
  // Variables for the random numbers generator
825
406
  YbgPrepareMemoryContext();
826
406
  YbgReservoirState rstate = NULL;
827
406
  YbgSamplerCreate(sampling_state.rstate_w(), sampling_state.rand_state(), &rstate);
828
  // Buffer to hold selected row ids from the current page
829
406
  std::unique_ptr<QLValuePB[]> reservoir = std::make_unique<QLValuePB[]>(targrows);
830
  // Number of rows to scan for the current page.
831
  // Too low row count limit is inefficient since we have to allocate and initialize a reservoir
832
  // capable to hold potentially large (targrows) number of tuples. The row count has to be at least
833
  // targrows for a chance to fill up the reservoir. Actually, the algorithm selects targrows only
834
  // for very first page of the table, then it starts to skip tuples, the further it goes, the more
835
  // it skips. For a large enough table it eventually starts to select less than targrows per page,
836
  // regardless of the row_count_limit.
837
  // Anyways, double targrows seems like reasonable minimum for the row_count_limit.
838
406
  size_t row_count_limit = 2 * targrows;
839
406
  if (request_.has_limit() && request_.limit() > row_count_limit) {
840
0
    row_count_limit = request_.limit();
841
0
  }
842
  // Request is not supposed to contain any column refs, we just need the liveness column.
843
406
  Schema projection;
844
406
  RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection));
845
  // Request may carry paging state, CreateIterator takes care of positioning
846
406
  table_iter_ = VERIFY_RESULT(CreateIterator(
847
0
      ql_storage, request_, projection, schema, txn_op_context_,
848
0
      deadline, read_time, is_explicit_request_read_time));
849
0
  bool scan_time_exceeded = false;
850
406
  CoarseTimePoint stop_scan = deadline - FLAGS_ysql_scan_deadline_margin_ms * 1ms;
851
162k
  while (scanned_rows++ < row_count_limit &&
852
162k
         VERIFY_RESULT(table_iter_->HasNext()) &&
853
162k
         
!scan_time_exceeded161k
) {
854
161k
    if (numrows < targrows) {
855
      // Select first targrows of the table. If first partition(s) have less than that, next
856
      // partition starts to continue populating it's reservoir starting from the numrows' position:
857
      // the numrows, as well as other sampling state variables is returned and copied over to the
858
      // next sampling request
859
161k
      Slice ybctid = VERIFY_RESULT(table_iter_->GetTupleId());
860
0
      reservoir[numrows++].set_binary_value(ybctid.data(), ybctid.size());
861
161k
    } else {
862
      // At least targrows tuples have already been collected, now algorithm skips increasing number
863
      // of row before taking next one into the reservoir
864
0
      if (rowstoskip <= 0) {
865
        // Take ybctid of the current row
866
0
        Slice ybctid = VERIFY_RESULT(table_iter_->GetTupleId());
867
        // Pick random tuple in the reservoir to replace
868
0
        double rvalue;
869
0
        int k;
870
0
        YbgSamplerRandomFract(rstate, &rvalue);
871
0
        k = static_cast<int>(targrows * rvalue);
872
        // Overwrite previous value with new one
873
0
        reservoir[k].set_binary_value(ybctid.data(), ybctid.size());
874
        // Choose next number of rows to skip
875
0
        YbgReservoirGetNextS(rstate, samplerows, targrows, &rowstoskip);
876
0
      } else {
877
0
        rowstoskip -= 1;
878
0
      }
879
0
    }
880
    // Taking tuple ID does not advance the table iterator. Move it now.
881
161k
    table_iter_->SkipRow();
882
    // Check if we are running out of time
883
161k
    scan_time_exceeded = CoarseMonoClock::now() >= stop_scan;
884
161k
  }
885
  // Count live rows we have scanned TODO how to count dead rows?
886
406
  samplerows += (scanned_rows - 1);
887
  // Return collected tuples from the reservoir.
888
  // Tuples are returned as (index, ybctid) pairs, where index is in [0..targrows-1] range.
889
  // As mentioned above, for large tables reservoirs become increasingly sparse from page to page.
890
  // So we hope to save by sending variable number of index/ybctid pairs vs exactly targrows of
891
  // nullable ybctids. It also helps in case of extremely small table or partition.
892
406
  int fetched_rows = 0;
893
295k
  for (int i = 0; i < numrows; 
i++294k
) {
894
294k
    QLValuePB index;
895
294k
    if (reservoir[i].has_binary_value()) {
896
161k
      index.set_int32_value(i);
897
161k
      RETURN_NOT_OK(pggate::WriteColumn(index, result_buffer));
898
161k
      RETURN_NOT_OK(pggate::WriteColumn(reservoir[i], result_buffer));
899
161k
      fetched_rows++;
900
161k
    }
901
294k
  }
902
903
  // Return sampling state to continue with next page
904
406
  PgsqlSamplingStatePB *new_sampling_state = response_.mutable_sampling_state();
905
406
  new_sampling_state->set_numrows(numrows);
906
406
  new_sampling_state->set_targrows(targrows);
907
406
  new_sampling_state->set_samplerows(samplerows);
908
406
  new_sampling_state->set_rowstoskip(rowstoskip);
909
406
  uint64_t randstate = 0;
910
406
  double rstate_w = 0;
911
406
  YbgSamplerGetState(rstate, &rstate_w, &randstate);
912
406
  new_sampling_state->set_rstate_w(rstate_w);
913
406
  new_sampling_state->set_rand_state(randstate);
914
406
  YbgDeleteMemoryContext();
915
916
  // Return paging state if scan has not been completed
917
406
  RETURN_NOT_OK(SetPagingStateIfNecessary(table_iter_.get(), scanned_rows, row_count_limit,
918
406
                                          scan_time_exceeded, &schema, read_time,
919
406
                                          has_paging_state));
920
406
  return fetched_rows;
921
406
}
922
923
Result<size_t> PgsqlReadOperation::ExecuteScalar(const YQLStorageIf& ql_storage,
924
                                                 CoarseTimePoint deadline,
925
                                                 const ReadHybridTime& read_time,
926
                                                 bool is_explicit_request_read_time,
927
                                                 const Schema& schema,
928
                                                 const Schema *index_schema,
929
                                                 faststring *result_buffer,
930
                                                 HybridTime *restart_read_ht,
931
3.46M
                                                 bool *has_paging_state) {
932
3.46M
  *has_paging_state = false;
933
934
3.46M
  size_t fetched_rows = 0;
935
3.46M
  size_t row_count_limit = std::numeric_limits<std::size_t>::max();
936
3.46M
  if (request_.has_limit()) {
937
3.46M
    if (request_.limit() == 0) {
938
0
      return fetched_rows;
939
0
    }
940
3.46M
    row_count_limit = request_.limit();
941
3.46M
  }
942
943
  // Create the projection of regular columns selected by the row block plus any referenced in
944
  // the WHERE condition. When DocRowwiseIterator::NextRow() populates the value map, it uses this
945
  // projection only to scan sub-documents. The query schema is used to select only referenced
946
  // columns and key columns.
947
3.46M
  Schema projection;
948
3.46M
  Schema index_projection;
949
3.46M
  YQLRowwiseIteratorIf *iter;
950
3.46M
  const Schema* scan_schema;
951
3.46M
  DocPgExprExecutor expr_exec(&schema);
952
953
3.46M
  if (!request_.col_refs().empty()) {
954
3.44M
    RETURN_NOT_OK(CreateProjection(schema, request_.col_refs(), &projection));
955
3.44M
  } else {
956
    // Compatibility: Either request indeed has no column refs, or it comes from a legacy node.
957
22.4k
    RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection));
958
22.4k
  }
959
3.46M
  table_iter_ = VERIFY_RESULT(CreateIterator(
960
0
      ql_storage, request_, projection, schema, txn_op_context_,
961
0
      deadline, read_time, is_explicit_request_read_time));
962
963
0
  ColumnId ybbasectid_id;
964
3.46M
  if (request_.has_index_request()) {
965
458k
    const PgsqlReadRequestPB& index_request = request_.index_request();
966
458k
    RETURN_NOT_OK(CreateProjection(*index_schema, index_request.column_refs(), &index_projection));
967
458k
    index_iter_ = VERIFY_RESULT(CreateIterator(
968
0
        ql_storage, index_request, index_projection, *index_schema, txn_op_context_,
969
0
        deadline, read_time, is_explicit_request_read_time));
970
0
    iter = index_iter_.get();
971
458k
    const auto idx = index_schema->find_column("ybidxbasectid");
972
458k
    SCHECK_NE(idx, Schema::kColumnNotFound, Corruption, "ybidxbasectid not found in index schema");
973
458k
    ybbasectid_id = index_schema->column_id(idx);
974
458k
    scan_schema = index_schema;
975
3.00M
  } else {
976
3.00M
    iter = table_iter_.get();
977
3.00M
    scan_schema = &schema;
978
20.7M
    for (const PgsqlColRefPB& column_ref : request_.col_refs()) {
979
20.7M
      RETURN_NOT_OK(expr_exec.AddColumnRef(column_ref));
980
20.7M
      VLOG
(1) << "Added column reference to the executor"4.12k
;
981
20.7M
    }
982
3.00M
    for (const PgsqlExpressionPB& expr : request_.where_clauses()) {
983
372
      RETURN_NOT_OK(expr_exec.AddWhereExpression(expr));
984
372
      VLOG
(1) << "Added where expression to the executor"1
;
985
372
    }
986
3.00M
  }
987
988
3.46M
  VLOG
(1) << "Started iterator"2.22k
;
989
990
  // Set scan start time.
991
3.46M
  bool scan_time_exceeded = false;
992
3.46M
  CoarseTimePoint stop_scan = deadline - FLAGS_ysql_scan_deadline_margin_ms * 1ms;
993
994
  // Fetching data.
995
3.46M
  int match_count = 0;
996
3.46M
  QLTableRow row;
997
60.2M
  while (fetched_rows < row_count_limit && VERIFY_RESULT(iter->HasNext()) &&
998
60.2M
         
!scan_time_exceeded56.8M
) {
999
56.8M
    row.Clear();
1000
1001
    // If there is an index request, fetch ybbasectid from the index and use it as ybctid
1002
    // to fetch from the base table. Otherwise, fetch from the base table directly.
1003
56.8M
    if (request_.has_index_request()) {
1004
1.13M
      RETURN_NOT_OK(iter->NextRow(&row));
1005
1.13M
      const auto& tuple_id = row.GetValue(ybbasectid_id);
1006
1.13M
      SCHECK_NE(tuple_id, boost::none, Corruption, "ybbasectid not found in index row");
1007
1.13M
      if (!VERIFY_RESULT(table_iter_->SeekTuple(tuple_id->binary_value()))) {
1008
0
        DocKey doc_key;
1009
0
        RETURN_NOT_OK(doc_key.DecodeFrom(tuple_id->binary_value()));
1010
0
        return STATUS_FORMAT(Corruption, "ybctid $0 not found in indexed table", doc_key);
1011
0
      }
1012
1.13M
      row.Clear();
1013
1.13M
      RETURN_NOT_OK(table_iter_->NextRow(projection, &row));
1014
55.6M
    } else {
1015
55.6M
      RETURN_NOT_OK(iter->NextRow(projection, &row));
1016
55.6M
    }
1017
1018
    // Match the row with the where condition before adding to the row block.
1019
56.8M
    bool is_match = true;
1020
56.8M
    RETURN_NOT_OK(expr_exec.Exec(row, nullptr, &is_match));
1021
56.8M
    if (is_match) {
1022
56.7M
      match_count++;
1023
56.7M
      if (request_.is_aggregate()) {
1024
3.14M
        RETURN_NOT_OK(EvalAggregate(row));
1025
53.6M
      } else {
1026
53.6M
        RETURN_NOT_OK(PopulateResultSet(row, result_buffer));
1027
53.6M
        ++fetched_rows;
1028
53.6M
      }
1029
56.7M
    }
1030
1031
    // Check if we are running out of time
1032
56.8M
    scan_time_exceeded = CoarseMonoClock::now() >= stop_scan;
1033
56.8M
  }
1034
1035
3.46M
  VLOG(1) << "Stopped iterator after " << match_count << " matches, "
1036
4.23k
          << fetched_rows << " rows fetched";
1037
3.46M
  VLOG
(1) << "Deadline is " << (2.18k
scan_time_exceeded2.18k
?
""0
:
"not "2.18k
) << "exceeded";
1038
1039
3.46M
  if (request_.is_aggregate() && 
match_count > 018.1k
) {
1040
16.2k
    RETURN_NOT_OK(PopulateAggregate(row, result_buffer));
1041
16.2k
    ++fetched_rows;
1042
16.2k
  }
1043
1044
3.46M
  if (PREDICT_FALSE(FLAGS_TEST_slowdown_pgsql_aggregate_read_ms > 0) && 
request_.is_aggregate()1.14k
) {
1045
425
    TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_pgsql_aggregate_read_ms);
1046
425
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_pgsql_aggregate_read_ms));
1047
425
  }
1048
1049
3.46M
  RETURN_NOT_OK(SetPagingStateIfNecessary(
1050
3.46M
      iter, fetched_rows, row_count_limit, scan_time_exceeded, scan_schema,
1051
3.46M
      read_time, has_paging_state));
1052
3.46M
  return fetched_rows;
1053
3.46M
}
1054
1055
Result<size_t> PgsqlReadOperation::ExecuteBatchYbctid(const YQLStorageIf& ql_storage,
1056
                                                      CoarseTimePoint deadline,
1057
                                                      const ReadHybridTime& read_time,
1058
                                                      const Schema& schema,
1059
                                                      faststring *result_buffer,
1060
6.36k
                                                      HybridTime *restart_read_ht) {
1061
6.36k
  Schema projection;
1062
6.36k
  RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection));
1063
1064
6.36k
  QLTableRow row;
1065
6.36k
  size_t row_count = 0;
1066
1.54M
  for (const PgsqlBatchArgumentPB& batch_argument : request_.batch_arguments()) {
1067
    // Get the row.
1068
1.54M
    RETURN_NOT_OK(ql_storage.GetIterator(request_.stmt_id(), projection, schema, txn_op_context_,
1069
1.54M
                                         deadline, read_time, batch_argument.ybctid().value(),
1070
1.54M
                                         &table_iter_));
1071
1072
1.54M
    if (VERIFY_RESULT(table_iter_->HasNext())) {
1073
1.54M
      row.Clear();
1074
1.54M
      RETURN_NOT_OK(table_iter_->NextRow(projection, &row));
1075
1076
      // Populate result set.
1077
1.54M
      RETURN_NOT_OK(PopulateResultSet(row, result_buffer));
1078
1.54M
      response_.add_batch_orders(batch_argument.order());
1079
1.54M
      row_count++;
1080
1.54M
    }
1081
1.54M
  }
1082
1083
  // Set status for this batch.
1084
  // Mark all rows were processed even in case some of the ybctids were not found.
1085
6.36k
  response_.set_batch_arg_count(request_.batch_arguments_size());
1086
1087
6.36k
  return row_count;
1088
6.36k
}
1089
1090
Status PgsqlReadOperation::SetPagingStateIfNecessary(const YQLRowwiseIteratorIf* iter,
1091
                                                     size_t fetched_rows,
1092
                                                     const size_t row_count_limit,
1093
                                                     const bool scan_time_exceeded,
1094
                                                     const Schema* schema,
1095
                                                     const ReadHybridTime& read_time,
1096
3.46M
                                                     bool *has_paging_state) {
1097
3.46M
  *has_paging_state = false;
1098
3.46M
  if (!request_.return_paging_state()) {
1099
3.23k
    return Status::OK();
1100
3.23k
  }
1101
1102
  // Set the paging state for next row.
1103
3.46M
  if (fetched_rows >= row_count_limit || 
scan_time_exceeded3.39M
) {
1104
65.3k
    SubDocKey next_row_key;
1105
65.3k
    RETURN_NOT_OK(iter->GetNextReadSubDocKey(&next_row_key));
1106
    // When the "limit" number of rows are returned and we are asked to return the paging state,
1107
    // return the partition key and row key of the next row to read in the paging state if there are
1108
    // still more rows to read. Otherwise, leave the paging state empty which means we are done
1109
    // reading from this tablet.
1110
65.3k
    if (!next_row_key.doc_key().empty()) {
1111
62.8k
      const auto& keybytes = next_row_key.Encode();
1112
62.8k
      PgsqlPagingStatePB* paging_state = response_.mutable_paging_state();
1113
62.8k
      RSTATUS_DCHECK(schema != nullptr, IllegalState, "Missing schema");
1114
62.8k
      if (schema->num_hash_key_columns() > 0) {
1115
28.7k
        paging_state->set_next_partition_key(
1116
28.7k
           PartitionSchema::EncodeMultiColumnHashValue(next_row_key.doc_key().hash()));
1117
34.1k
      } else {
1118
34.1k
        paging_state->set_next_partition_key(keybytes.ToStringBuffer());
1119
34.1k
      }
1120
62.8k
      paging_state->set_next_row_key(keybytes.ToStringBuffer());
1121
62.8k
      *has_paging_state = true;
1122
62.8k
    }
1123
65.3k
  }
1124
3.46M
  if (*has_paging_state) {
1125
62.8k
    if (FLAGS_pgsql_consistent_transactional_paging) {
1126
62.8k
      read_time.AddToPB(response_.mutable_paging_state());
1127
62.8k
    } else {
1128
      // Using SingleTime will help avoid read restarts on second page and later but will
1129
      // potentially produce stale results on those pages.
1130
17
      auto per_row_consistent_read_time = ReadHybridTime::SingleTime(read_time.read);
1131
17
      per_row_consistent_read_time.AddToPB(response_.mutable_paging_state());
1132
17
    }
1133
62.8k
  }
1134
1135
3.46M
  return Status::OK();
1136
3.46M
}
1137
1138
Status PgsqlReadOperation::PopulateResultSet(const QLTableRow& table_row,
1139
55.1M
                                             faststring *result_buffer) {
1140
55.1M
  QLExprResult result;
1141
656M
  for (const PgsqlExpressionPB& expr : request_.targets()) {
1142
656M
    RETURN_NOT_OK(EvalExpr(expr, table_row, result.Writer()));
1143
656M
    RETURN_NOT_OK(pggate::WriteColumn(result.Value(), result_buffer));
1144
656M
  }
1145
55.1M
  return Status::OK();
1146
55.1M
}
1147
1148
41.2M
Status PgsqlReadOperation::GetTupleId(QLValue *result) const {
1149
  // Get row key and save to QLValue.
1150
  // TODO(neil) Check if we need to append a table_id and other info to TupleID. For example, we
1151
  // might need info to make sure the TupleId by itself is a valid reference to a specific row of
1152
  // a valid table.
1153
41.2M
  const Slice tuple_id = VERIFY_RESULT(table_iter_->GetTupleId());
1154
0
  result->set_binary_value(tuple_id.data(), tuple_id.size());
1155
41.2M
  return Status::OK();
1156
41.2M
}
1157
1158
3.14M
Status PgsqlReadOperation::EvalAggregate(const QLTableRow& table_row) {
1159
3.14M
  if (aggr_result_.empty()) {
1160
16.1k
    int column_count = request_.targets().size();
1161
16.1k
    aggr_result_.resize(column_count);
1162
16.1k
  }
1163
1164
3.14M
  int aggr_index = 0;
1165
3.14M
  for (const PgsqlExpressionPB& expr : request_.targets()) {
1166
3.14M
    RETURN_NOT_OK(EvalExpr(expr, table_row, aggr_result_[aggr_index++].Writer()));
1167
3.14M
  }
1168
3.14M
  return Status::OK();
1169
3.14M
}
1170
1171
Status PgsqlReadOperation::PopulateAggregate(const QLTableRow& table_row,
1172
16.3k
                                             faststring *result_buffer) {
1173
16.3k
  int column_count = request_.targets().size();
1174
32.6k
  for (int rscol_index = 0; rscol_index < column_count; 
rscol_index++16.3k
) {
1175
16.3k
    RETURN_NOT_OK(pggate::WriteColumn(aggr_result_[rscol_index].Value(), result_buffer));
1176
16.3k
  }
1177
16.3k
  return Status::OK();
1178
16.3k
}
1179
1180
2.76M
Status PgsqlReadOperation::GetIntents(const Schema& schema, KeyValueWriteBatchPB* out) {
1181
2.76M
  if (request_.batch_arguments_size() > 0 && 
request_.has_ybctid_column_value()2.26k
) {
1182
101k
    for (const auto& batch_argument : request_.batch_arguments()) {
1183
101k
      SCHECK(batch_argument.has_ybctid(), InternalError, "ybctid batch argument is expected");
1184
101k
      RETURN_NOT_OK(AddIntent(batch_argument.ybctid(), request_.wait_policy(), out));
1185
101k
    }
1186
2.76M
  } else {
1187
2.76M
    AddIntent(VERIFY_RESULT(FetchEncodedDocKey(schema, request_)), request_.wait_policy(), out);
1188
2.76M
  }
1189
2.76M
  return Status::OK();
1190
2.76M
}
1191
1192
}  // namespace docdb
1193
}  // namespace yb