/Users/deen/code/yugabyte-db/src/yb/docdb/pgsql_operation.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/pgsql_operation.h" |
15 | | |
16 | | #include <limits> |
17 | | #include <string> |
18 | | #include <unordered_set> |
19 | | #include <vector> |
20 | | |
21 | | #include <boost/optional/optional_io.hpp> |
22 | | |
23 | | #include "yb/common/partition.h" |
24 | | #include "yb/common/pg_system_attr.h" |
25 | | #include "yb/common/ql_value.h" |
26 | | |
27 | | #include "yb/docdb/doc_path.h" |
28 | | #include "yb/docdb/doc_pg_expr.h" |
29 | | #include "yb/docdb/doc_pgsql_scanspec.h" |
30 | | #include "yb/docdb/doc_rowwise_iterator.h" |
31 | | #include "yb/docdb/doc_write_batch.h" |
32 | | #include "yb/docdb/docdb.pb.h" |
33 | | #include "yb/docdb/docdb_debug.h" |
34 | | #include "yb/docdb/docdb_pgapi.h" |
35 | | #include "yb/docdb/docdb_rocksdb_util.h" |
36 | | #include "yb/docdb/intent_aware_iterator.h" |
37 | | #include "yb/docdb/primitive_value_util.h" |
38 | | #include "yb/docdb/ql_storage_interface.h" |
39 | | |
40 | | #include "yb/util/flag_tags.h" |
41 | | #include "yb/util/result.h" |
42 | | #include "yb/util/scope_exit.h" |
43 | | #include "yb/util/status_format.h" |
44 | | #include "yb/util/trace.h" |
45 | | |
46 | | #include "yb/yql/pggate/util/pg_doc_data.h" |
47 | | |
48 | | using namespace std::literals; |
49 | | |
50 | | DECLARE_bool(ysql_disable_index_backfill); |
51 | | |
52 | | DEFINE_double(ysql_scan_timeout_multiplier, 0.5, |
53 | | "DEPRECATED. Has no affect, use ysql_scan_deadline_margin_ms to control the client " |
54 | | "timeout"); |
55 | | |
56 | | DEFINE_uint64(ysql_scan_deadline_margin_ms, 1000, |
57 | | "Scan deadline is calculated by adding client timeout to the time when the request " |
58 | | "was received. It defines the moment in time when client has definitely timed out " |
59 | | "and if the request is yet in processing after the deadline, it can be canceled. " |
60 | | "Therefore to prevent client timeout, the request handler should return partial " |
61 | | "result and paging information some time before the deadline. That's what the " |
62 | | "ysql_scan_deadline_margin_ms is for. It should account for network and processing " |
63 | | "delays."); |
64 | | |
65 | | DEFINE_bool(pgsql_consistent_transactional_paging, true, |
66 | | "Whether to enforce consistency of data returned for second page and beyond for YSQL " |
67 | | "queries on transactional tables. If true, read restart errors could be returned to " |
68 | | "prevent inconsistency. If false, no read restart errors are returned but the data may " |
69 | | "be stale. The latter is preferable for long scans. The data returned for the first " |
70 | | "page of results is never stale regardless of this flag."); |
71 | | |
72 | | DEFINE_test_flag(int32, slowdown_pgsql_aggregate_read_ms, 0, |
73 | | "If set > 0, slows down the response to pgsql aggregate read by this amount."); |
74 | | |
75 | | namespace yb { |
76 | | namespace docdb { |
77 | | |
78 | | namespace { |
79 | | |
80 | | // Compatibility: accept column references from a legacy nodes as a list of column ids only |
81 | | CHECKED_STATUS CreateProjection(const Schema& schema, |
82 | | const PgsqlColumnRefsPB& column_refs, |
83 | 6.70M | Schema* projection) { |
84 | | // Create projection of non-primary key columns. Primary key columns are implicitly read by DocDB. |
85 | | // It will also sort the columns before scanning. |
86 | 6.70M | vector<ColumnId> column_ids; |
87 | 6.70M | column_ids.reserve(column_refs.ids_size()); |
88 | 6.70M | for (int32_t id : column_refs.ids()) { |
89 | 1.45M | const ColumnId column_id(id); |
90 | 1.45M | if (!schema.is_key_column(column_id)) { |
91 | 1.30M | column_ids.emplace_back(column_id); |
92 | 1.30M | } |
93 | 1.45M | } |
94 | 6.70M | return schema.CreateProjectionByIdsIgnoreMissing(column_ids, projection); |
95 | 6.70M | } |
96 | | |
97 | | CHECKED_STATUS CreateProjection( |
98 | | const Schema& schema, |
99 | | const google::protobuf::RepeatedPtrField<PgsqlColRefPB> &column_refs, |
100 | 3.44M | Schema* projection) { |
101 | 3.44M | vector<ColumnId> column_ids; |
102 | 3.44M | column_ids.reserve(column_refs.size()); |
103 | 26.4M | for (const PgsqlColRefPB& column_ref : column_refs) { |
104 | 26.4M | const ColumnId column_id(column_ref.column_id()); |
105 | 26.4M | if (!schema.is_key_column(column_id)) { |
106 | 20.9M | column_ids.emplace_back(column_id); |
107 | 20.9M | } |
108 | 26.4M | } |
109 | 3.44M | return schema.CreateProjectionByIdsIgnoreMissing(column_ids, projection); |
110 | 3.44M | } |
111 | | |
112 | 2.86M | void AddIntent(const std::string& encoded_key, WaitPolicy wait_policy, KeyValueWriteBatchPB *out) { |
113 | 2.86M | auto pair = out->mutable_read_pairs()->Add(); |
114 | 2.86M | pair->set_key(encoded_key); |
115 | 2.86M | pair->set_value(std::string(1, ValueTypeAsChar::kNullLow)); |
116 | | // Since we don't batch read RPCs that lock rows, we can get away with using a singular |
117 | | // wait_policy field. Once we start batching read requests (issue #2495), we will need a repeated |
118 | | // wait policies field. |
119 | 2.86M | out->set_wait_policy(wait_policy); |
120 | 2.86M | } |
121 | | |
122 | | CHECKED_STATUS AddIntent(const PgsqlExpressionPB& ybctid, WaitPolicy wait_policy, |
123 | 101k | KeyValueWriteBatchPB* out) { |
124 | 101k | const auto &val = ybctid.value().binary_value(); |
125 | 101k | SCHECK(!val.empty(), InternalError, "empty ybctid"); |
126 | 101k | AddIntent(val, wait_policy, out); |
127 | 101k | return Status::OK(); |
128 | 101k | } |
129 | | |
130 | | template<class R, class Request, class DocKeyProcessor, class EncodedDocKeyProcessor> |
131 | | Result<R> FetchDocKeyImpl(const Schema& schema, |
132 | | const Request& req, |
133 | | const DocKeyProcessor& dk_processor, |
134 | 15.6M | const EncodedDocKeyProcessor& edk_processor) { |
135 | | // Init DocDB key using either ybctid or partition and range values. |
136 | 15.6M | if (req.has_ybctid_column_value()) { |
137 | 5.86M | const auto& ybctid = req.ybctid_column_value().value().binary_value(); |
138 | 5.86M | SCHECK(!ybctid.empty(), InternalError, "empty ybctid"); |
139 | 5.86M | return edk_processor(ybctid); |
140 | 9.76M | } else { |
141 | 9.76M | auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( |
142 | 9.76M | req.partition_column_values(), schema, 0 /* start_idx */)); |
143 | 9.76M | auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( |
144 | 9.76M | req.range_column_values(), schema, schema.num_hash_key_columns())); |
145 | 9.76M | return dk_processor(hashed_components.empty() |
146 | 9.76M | ? DocKey(schema, std::move(range_components))6.51M |
147 | 9.76M | : DocKey( |
148 | 3.25M | schema, req.hash_code(), std::move(hashed_components), std::move(range_components))); |
149 | 9.76M | } |
150 | 15.6M | } pgsql_operation.cc:yb::Result<yb::docdb::DocKey> yb::docdb::(anonymous namespace)::FetchDocKeyImpl<yb::docdb::DocKey, yb::PgsqlWriteRequestPB, yb::docdb::(anonymous namespace)::FetchDocKey(yb::Schema const&, yb::PgsqlWriteRequestPB const&)::$_4, yb::docdb::(anonymous namespace)::FetchDocKey(yb::Schema const&, yb::PgsqlWriteRequestPB const&)::$_5>(yb::Schema const&, yb::PgsqlWriteRequestPB const&, yb::docdb::(anonymous namespace)::FetchDocKey(yb::Schema const&, yb::PgsqlWriteRequestPB const&)::$_4 const&, yb::docdb::(anonymous namespace)::FetchDocKey(yb::Schema const&, yb::PgsqlWriteRequestPB const&)::$_5 const&) Line | Count | Source | 134 | 12.8M | const EncodedDocKeyProcessor& edk_processor) { | 135 | | // Init DocDB key using either ybctid or partition and range values. | 136 | 12.8M | if (req.has_ybctid_column_value()) { | 137 | 5.86M | const auto& ybctid = req.ybctid_column_value().value().binary_value(); | 138 | 5.86M | SCHECK(!ybctid.empty(), InternalError, "empty ybctid"); | 139 | 5.86M | return edk_processor(ybctid); | 140 | 7.01M | } else { | 141 | 7.01M | auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( | 142 | 7.01M | req.partition_column_values(), schema, 0 /* start_idx */)); | 143 | 7.01M | auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( | 144 | 7.01M | req.range_column_values(), schema, schema.num_hash_key_columns())); | 145 | 7.01M | return dk_processor(hashed_components.empty() | 146 | 7.01M | ? DocKey(schema, std::move(range_components))6.49M | 147 | 7.01M | : DocKey( | 148 | 514k | schema, req.hash_code(), std::move(hashed_components), std::move(range_components))); | 149 | 7.01M | } | 150 | 12.8M | } |
pgsql_operation.cc:yb::Result<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > yb::docdb::(anonymous namespace)::FetchDocKeyImpl<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::PgsqlReadRequestPB, yb::docdb::(anonymous namespace)::FetchEncodedDocKey(yb::Schema const&, yb::PgsqlReadRequestPB const&)::$_3, yb::docdb::(anonymous namespace)::FetchEncodedDocKey(yb::Schema const&, yb::PgsqlReadRequestPB const&)::$_2>(yb::Schema const&, yb::PgsqlReadRequestPB const&, yb::docdb::(anonymous namespace)::FetchEncodedDocKey(yb::Schema const&, yb::PgsqlReadRequestPB const&)::$_3 const&, yb::docdb::(anonymous namespace)::FetchEncodedDocKey(yb::Schema const&, yb::PgsqlReadRequestPB const&)::$_2 const&) Line | Count | Source | 134 | 2.76M | const EncodedDocKeyProcessor& edk_processor) { | 135 | | // Init DocDB key using either ybctid or partition and range values. | 136 | 2.76M | if (req.has_ybctid_column_value()) { | 137 | 5.33k | const auto& ybctid = req.ybctid_column_value().value().binary_value(); | 138 | 5.33k | SCHECK(!ybctid.empty(), InternalError, "empty ybctid"); | 139 | 5.33k | return edk_processor(ybctid); | 140 | 2.75M | } else { | 141 | 2.75M | auto hashed_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( | 142 | 2.75M | req.partition_column_values(), schema, 0 /* start_idx */)); | 143 | 2.75M | auto range_components = VERIFY_RESULT(InitKeyColumnPrimitiveValues( | 144 | 2.75M | req.range_column_values(), schema, schema.num_hash_key_columns())); | 145 | 2.75M | return dk_processor(hashed_components.empty() | 146 | 2.75M | ? DocKey(schema, std::move(range_components))19.4k | 147 | 2.75M | : DocKey( | 148 | 2.73M | schema, req.hash_code(), std::move(hashed_components), std::move(range_components))); | 149 | 2.75M | } | 150 | 2.76M | } |
|
151 | | |
152 | 2.76M | Result<string> FetchEncodedDocKey(const Schema& schema, const PgsqlReadRequestPB& request) { |
153 | 2.76M | return FetchDocKeyImpl<string>( |
154 | 2.76M | schema, request, |
155 | 2.76M | [](const auto& doc_key) { return doc_key.Encode().ToStringBuffer(); }2.75M , |
156 | 2.76M | [](const auto& encoded_doc_key) { return encoded_doc_key; }5.33k ); |
157 | 2.76M | } |
158 | | |
159 | 12.8M | Result<DocKey> FetchDocKey(const Schema& schema, const PgsqlWriteRequestPB& request) { |
160 | 12.8M | return FetchDocKeyImpl<DocKey>( |
161 | 12.8M | schema, request, |
162 | 12.8M | [](const auto& doc_key) { return doc_key; }7.01M , |
163 | 12.8M | [&schema](const auto& encoded_doc_key) -> Result<DocKey> { |
164 | 5.86M | DocKey key(schema); |
165 | 5.86M | RETURN_NOT_OK(key.DecodeFrom(encoded_doc_key)); |
166 | 5.86M | return key; |
167 | 5.86M | }); |
168 | 12.8M | } |
169 | | |
170 | | Result<YQLRowwiseIteratorIf::UniPtr> CreateIterator( |
171 | | const YQLStorageIf& ql_storage, |
172 | | const PgsqlReadRequestPB& request, |
173 | | const Schema& projection, |
174 | | const Schema& schema, |
175 | | const TransactionOperationContext& txn_op_context, |
176 | | CoarseTimePoint deadline, |
177 | | const ReadHybridTime& read_time, |
178 | 3.91M | bool is_explicit_request_read_time) { |
179 | 3.91M | VLOG_IF(2, request.is_for_backfill()) << "Creating iterator for " << yb::ToString(request)1.30k ; |
180 | | |
181 | 3.91M | YQLRowwiseIteratorIf::UniPtr result; |
182 | | // TODO(neil) Remove the following IF block when it is completely obsolete. |
183 | | // The following IF block has not been used since 2.1 release. |
184 | | // We keep it here only for rolling upgrade purpose. |
185 | 3.91M | if (request.has_ybctid_column_value()) { |
186 | 9.94k | SCHECK(!request.has_paging_state(), |
187 | 9.94k | InternalError, |
188 | 9.94k | "Each ybctid value identifies one row in the table while paging state " |
189 | 9.94k | "is only used for multi-row queries."); |
190 | 9.94k | RETURN_NOT_OK(ql_storage.GetIterator( |
191 | 9.94k | request.stmt_id(), projection, schema, txn_op_context, |
192 | 9.94k | deadline, read_time, request.ybctid_column_value().value(), &result)); |
193 | 3.90M | } else { |
194 | 3.90M | SubDocKey start_sub_doc_key; |
195 | 3.90M | auto actual_read_time = read_time; |
196 | | // Decode the start SubDocKey from the paging state and set scan start key. |
197 | 3.90M | if (request.has_paging_state() && |
198 | 3.90M | request.paging_state().has_next_row_key()121k && |
199 | 3.90M | !request.paging_state().next_row_key().empty()44.6k ) { |
200 | 44.6k | KeyBytes start_key_bytes(request.paging_state().next_row_key()); |
201 | 44.6k | RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice())); |
202 | | // TODO(dmitry) Remove backward compatibility block when obsolete. |
203 | 44.6k | if (!is_explicit_request_read_time) { |
204 | 0 | if (request.paging_state().has_read_time()) { |
205 | 0 | actual_read_time = ReadHybridTime::FromPB(request.paging_state().read_time()); |
206 | 0 | } else { |
207 | 0 | actual_read_time.read = start_sub_doc_key.hybrid_time(); |
208 | 0 | } |
209 | 0 | } |
210 | 3.86M | } else if (request.is_for_backfill()) { |
211 | 2.25k | RSTATUS_DCHECK(is_explicit_request_read_time, InvalidArgument, |
212 | 2.25k | "Backfill request should already be using explicit read times."); |
213 | 2.25k | PgsqlBackfillSpecPB spec; |
214 | 2.25k | spec.ParseFromString(a2b_hex(request.backfill_spec())); |
215 | 2.25k | if (!spec.next_row_key().empty()) { |
216 | 390 | KeyBytes start_key_bytes(spec.next_row_key()); |
217 | 390 | RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice())); |
218 | 390 | } |
219 | 2.25k | } |
220 | 3.90M | RETURN_NOT_OK(ql_storage.GetIterator( |
221 | 3.90M | request, projection, schema, txn_op_context, |
222 | 3.90M | deadline, read_time, start_sub_doc_key.doc_key(), &result)); |
223 | 3.90M | } |
224 | 3.91M | return std::move(result); |
225 | 3.91M | } |
226 | | |
227 | | class DocKeyColumnPathBuilder { |
228 | | public: |
229 | | explicit DocKeyColumnPathBuilder(const RefCntPrefix& doc_key) |
230 | 18.1M | : doc_key_(doc_key.as_slice()) { |
231 | 18.1M | } |
232 | | |
233 | 28.0M | RefCntPrefix Build(ColumnIdRep column_id) { |
234 | 28.0M | buffer_.Clear(); |
235 | 28.0M | buffer_.AppendValueType(ValueType::kColumnId); |
236 | 28.0M | buffer_.AppendColumnId(ColumnId(column_id)); |
237 | 28.0M | RefCntBuffer path(doc_key_.size() + buffer_.size()); |
238 | 28.0M | doc_key_.CopyTo(path.data()); |
239 | 28.0M | buffer_.AsSlice().CopyTo(path.data() + doc_key_.size()); |
240 | 28.0M | return path; |
241 | 28.0M | } |
242 | | |
243 | | private: |
244 | | Slice doc_key_; |
245 | | KeyBytes buffer_; |
246 | | }; |
247 | | |
248 | | } // namespace |
249 | | |
250 | | //-------------------------------------------------------------------------------------------------- |
251 | | |
252 | 12.8M | Status PgsqlWriteOperation::Init(PgsqlResponsePB* response) { |
253 | | // Initialize operation inputs. |
254 | 12.8M | response_ = response; |
255 | | |
256 | 12.8M | doc_key_ = VERIFY_RESULT(FetchDocKey(schema_, request_)); |
257 | 0 | encoded_doc_key_ = doc_key_->EncodeAsRefCntPrefix(); |
258 | | |
259 | 12.8M | return Status::OK(); |
260 | 12.8M | } |
261 | | |
262 | | // Check if a duplicate value is inserted into a unique index. |
263 | 602 | Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue(const DocOperationApplyData& data) { |
264 | 602 | VLOG(3) << "Looking for collisions in\n" |
265 | 0 | << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db()); |
266 | | // We need to check backwards only for backfilled entries. |
267 | 602 | bool ret = |
268 | 602 | VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kForward)) || |
269 | 602 | (595 request_.is_backfill()595 && |
270 | 595 | VERIFY_RESULT(HasDuplicateUniqueIndexValue(data, Direction::kBackward))); |
271 | 602 | if (!ret) { |
272 | 594 | VLOG(3) << "No collisions found"0 ; |
273 | 594 | } |
274 | 602 | return ret; |
275 | 602 | } |
276 | | |
277 | | Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue( |
278 | 1.19k | const DocOperationApplyData& data, Direction direction) { |
279 | 1.19k | VLOG(2) << "Looking for collision while going " << yb::ToString(direction) |
280 | 0 | << ". Trying to insert " << *doc_key_; |
281 | 1.19k | auto requested_read_time = data.read_time; |
282 | 1.19k | if (direction == Direction::kForward) { |
283 | 602 | return HasDuplicateUniqueIndexValue(data, requested_read_time); |
284 | 602 | } |
285 | | |
286 | 595 | auto iter = CreateIntentAwareIterator( |
287 | 595 | data.doc_write_batch->doc_db(), |
288 | 595 | BloomFilterMode::USE_BLOOM_FILTER, |
289 | 595 | doc_key_->Encode().AsSlice(), |
290 | 595 | rocksdb::kDefaultQueryId, |
291 | 595 | txn_op_context_, |
292 | 595 | data.deadline, |
293 | 595 | ReadHybridTime::Max()); |
294 | | |
295 | 595 | HybridTime oldest_past_min_ht = VERIFY_RESULT(FindOldestOverwrittenTimestamp( |
296 | 595 | iter.get(), SubDocKey(*doc_key_), requested_read_time.read)); |
297 | 0 | const HybridTime oldest_past_min_ht_liveness = |
298 | 595 | VERIFY_RESULT(FindOldestOverwrittenTimestamp( |
299 | 595 | iter.get(), |
300 | 595 | SubDocKey(*doc_key_, PrimitiveValue::kLivenessColumn), |
301 | 595 | requested_read_time.read)); |
302 | 0 | oldest_past_min_ht.MakeAtMost(oldest_past_min_ht_liveness); |
303 | 595 | if (!oldest_past_min_ht.is_valid()) { |
304 | 593 | return false; |
305 | 593 | } |
306 | 2 | return HasDuplicateUniqueIndexValue( |
307 | 2 | data, ReadHybridTime::SingleTime(oldest_past_min_ht)); |
308 | 595 | } |
309 | | |
310 | | Result<bool> PgsqlWriteOperation::HasDuplicateUniqueIndexValue( |
311 | 604 | const DocOperationApplyData& data, ReadHybridTime read_time) { |
312 | | // Set up the iterator to read the current primary key associated with the index key. |
313 | 604 | DocPgsqlScanSpec spec(schema_, request_.stmt_id(), *doc_key_); |
314 | 604 | DocRowwiseIterator iterator(schema_, |
315 | 604 | schema_, |
316 | 604 | txn_op_context_, |
317 | 604 | data.doc_write_batch->doc_db(), |
318 | 604 | data.deadline, |
319 | 604 | read_time); |
320 | 604 | RETURN_NOT_OK(iterator.Init(spec)); |
321 | | |
322 | | // It is a duplicate value if the index key exists already and the index value (corresponding to |
323 | | // the indexed table's primary key) is not the same. |
324 | 604 | if (!VERIFY_RESULT(iterator.HasNext())) { |
325 | 505 | VLOG(2) << "No collision found while checking at " << yb::ToString(read_time)0 ; |
326 | 505 | return false; |
327 | 505 | } |
328 | | |
329 | 99 | QLTableRow table_row; |
330 | 99 | RETURN_NOT_OK(iterator.NextRow(&table_row)); |
331 | 99 | for (const auto& column_value : request_.column_values()) { |
332 | | // Get the column. |
333 | 99 | if (!column_value.has_column_id()) { |
334 | 0 | return STATUS(InternalError, "column id missing", column_value.DebugString()); |
335 | 0 | } |
336 | 99 | const ColumnId column_id(column_value.column_id()); |
337 | | |
338 | | // Check column-write operator. |
339 | 99 | CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert) |
340 | 0 | << "Illegal write instruction"; |
341 | | |
342 | | // Evaluate column value. |
343 | 99 | QLExprResult expr_result; |
344 | 99 | RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer())); |
345 | | |
346 | 99 | boost::optional<const QLValuePB&> existing_value = table_row.GetValue(column_id); |
347 | 99 | const QLValuePB& new_value = expr_result.Value(); |
348 | 99 | if (existing_value && *existing_value != new_value) { |
349 | 8 | VLOG(2) << "Found collision while checking at " << yb::ToString(read_time) |
350 | 0 | << "\nExisting: " << yb::ToString(*existing_value) |
351 | 0 | << " vs New: " << yb::ToString(new_value) |
352 | 0 | << "\nUsed read time as " << yb::ToString(data.read_time); |
353 | 8 | DVLOG(3) << "DocDB is now:\n" |
354 | 0 | << docdb::DocDBDebugDumpToStr(data.doc_write_batch->doc_db()); |
355 | 8 | return true; |
356 | 8 | } |
357 | 99 | } |
358 | | |
359 | 91 | VLOG(2) << "No collision while checking at " << yb::ToString(read_time)0 ; |
360 | 91 | return false; |
361 | 99 | } |
362 | | |
363 | | Result<HybridTime> PgsqlWriteOperation::FindOldestOverwrittenTimestamp( |
364 | | IntentAwareIterator* iter, |
365 | | const SubDocKey& sub_doc_key, |
366 | 1.19k | HybridTime min_read_time) { |
367 | 1.19k | HybridTime result; |
368 | 1.19k | VLOG(3) << "Doing iter->Seek " << *doc_key_0 ; |
369 | 1.19k | iter->Seek(*doc_key_); |
370 | 1.19k | if (iter->valid()) { |
371 | 994 | const KeyBytes bytes = sub_doc_key.EncodeWithoutHt(); |
372 | 994 | const Slice& sub_key_slice = bytes.AsSlice(); |
373 | 994 | result = VERIFY_RESULT( |
374 | 0 | iter->FindOldestRecord(sub_key_slice, min_read_time)); |
375 | 0 | VLOG(2) << "iter->FindOldestRecord returned " << result << " for " |
376 | 0 | << SubDocKey::DebugSliceToString(sub_key_slice); |
377 | 994 | } else { |
378 | 196 | VLOG(3) << "iter->Seek " << *doc_key_ << " turned out to be invalid"0 ; |
379 | 196 | } |
380 | 1.19k | return result; |
381 | 1.19k | } |
382 | | |
383 | 12.8M | Status PgsqlWriteOperation::Apply(const DocOperationApplyData& data) { |
384 | 12.8M | VLOG(4) << "Write, read time: " << data.read_time << ", txn: " << txn_op_context_186 ; |
385 | | |
386 | 12.8M | auto scope_exit = ScopeExit([this] { |
387 | 12.8M | if (!result_buffer_.empty()) { |
388 | 12.8M | NetworkByteOrder::Store64(result_buffer_.data(), result_rows_); |
389 | 12.8M | } |
390 | 12.8M | }); |
391 | | |
392 | 12.8M | switch (request_.stmt_type()) { |
393 | 4.61M | case PgsqlWriteRequestPB::PGSQL_INSERT: |
394 | 4.61M | return ApplyInsert(data, IsUpsert::kFalse); |
395 | | |
396 | 695k | case PgsqlWriteRequestPB::PGSQL_UPDATE: |
397 | 695k | return ApplyUpdate(data); |
398 | | |
399 | 907k | case PgsqlWriteRequestPB::PGSQL_DELETE: |
400 | 907k | return ApplyDelete(data, request_.is_delete_persist_needed()); |
401 | | |
402 | 6.61M | case PgsqlWriteRequestPB::PGSQL_UPSERT: { |
403 | | // Upserts should not have column refs (i.e. require read). |
404 | 6.61M | RSTATUS_DCHECK(request_.col_refs().empty(), |
405 | 6.61M | IllegalState, |
406 | 6.61M | "Upsert operation should not have column references"); |
407 | 6.61M | return ApplyInsert(data, IsUpsert::kTrue); |
408 | 0 | } |
409 | | |
410 | 90 | case PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED: |
411 | 90 | return ApplyTruncateColocated(data); |
412 | 12.8M | } |
413 | 0 | return Status::OK(); |
414 | 12.8M | } |
415 | | |
416 | 11.2M | Status PgsqlWriteOperation::ApplyInsert(const DocOperationApplyData& data, IsUpsert is_upsert) { |
417 | 11.2M | QLTableRow table_row; |
418 | 11.2M | if (!is_upsert) { |
419 | 4.61M | if (request_.is_backfill()) { |
420 | 602 | if (VERIFY_RESULT(HasDuplicateUniqueIndexValue(data))) { |
421 | | // Unique index value conflict found. |
422 | 8 | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR); |
423 | 8 | response_->set_error_message("Duplicate key found in unique index"); |
424 | 8 | return Status::OK(); |
425 | 8 | } |
426 | 4.61M | } else { |
427 | | // Non-backfill requests shouldn't use HasDuplicateUniqueIndexValue because |
428 | | // - they should error even if the conflicting row matches |
429 | | // - retrieving and calculating whether the conflicting row matches is a waste |
430 | 4.61M | RETURN_NOT_OK(ReadColumns(data, &table_row)); |
431 | 4.61M | if (!table_row.IsEmpty()) { |
432 | 11.1k | VLOG(4) << "Duplicate row: " << table_row.ToString()0 ; |
433 | | // Primary key or unique index value found. |
434 | 11.1k | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_DUPLICATE_KEY_ERROR); |
435 | 11.1k | response_->set_error_message("Duplicate key found in primary key or unique index"); |
436 | 11.1k | return Status::OK(); |
437 | 11.1k | } |
438 | 4.61M | } |
439 | 4.61M | } |
440 | | |
441 | 11.2M | RETURN_NOT_OK(data.doc_write_batch->SetPrimitive( |
442 | 11.2M | DocPath(encoded_doc_key_.as_slice(), PrimitiveValue::kLivenessColumn), |
443 | 11.2M | ValueControlFields(), ValueRef(ValueType::kNullLow), data.read_time, data.deadline, |
444 | 11.2M | request_.stmt_id())); |
445 | | |
446 | 46.8M | for (const auto& column_value : request_.column_values())11.2M { |
447 | | // Get the column. |
448 | 46.8M | if (!column_value.has_column_id()) { |
449 | 0 | return STATUS(InternalError, "column id missing", column_value.DebugString()); |
450 | 0 | } |
451 | 46.8M | const ColumnId column_id(column_value.column_id()); |
452 | 46.8M | const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id)); |
453 | | |
454 | | // Check column-write operator. |
455 | 1.35k | CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert) |
456 | 1.35k | << "Illegal write instruction"; |
457 | | |
458 | | // Evaluate column value. |
459 | 46.8M | QLExprResult expr_result; |
460 | 46.8M | RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer())); |
461 | | |
462 | | // Inserting into specified column. |
463 | 46.8M | DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id)); |
464 | 46.8M | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
465 | 46.8M | sub_path, ValueRef(expr_result.Value(), column.sorting_type()), |
466 | 46.8M | data.read_time, data.deadline, request_.stmt_id())); |
467 | 46.8M | } |
468 | | |
469 | 11.2M | RETURN_NOT_OK(PopulateResultSet(table_row)); |
470 | | |
471 | 11.2M | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK); |
472 | 11.2M | return Status::OK(); |
473 | 11.2M | } |
474 | | |
475 | 695k | Status PgsqlWriteOperation::ApplyUpdate(const DocOperationApplyData& data) { |
476 | 695k | QLTableRow table_row; |
477 | 695k | RETURN_NOT_OK(ReadColumns(data, &table_row)); |
478 | 695k | if (table_row.IsEmpty()) { |
479 | | // Row not found. |
480 | 12 | response_->set_skipped(true); |
481 | 12 | return Status::OK(); |
482 | 12 | } |
483 | 695k | QLTableRow returning_table_row; |
484 | 695k | if (request_.targets_size()) { |
485 | 6 | returning_table_row = table_row; |
486 | 6 | } |
487 | | |
488 | | // skipped is set to false if this operation produces some data to write. |
489 | 695k | bool skipped = true; |
490 | | |
491 | 695k | if (request_.has_ybctid_column_value()) { |
492 | 692k | DocPgExprExecutor expr_exec(&schema_); |
493 | 692k | std::vector<QLExprResult> results; |
494 | 692k | int num_exprs = 0; |
495 | 692k | int cur_expr = 0; |
496 | 971k | for (const auto& column_value : request_.column_new_values()) { |
497 | 971k | if (GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kPgEvalExprCall) { |
498 | 16.7k | RETURN_NOT_OK(expr_exec.AddTargetExpression(column_value.expr())); |
499 | 16.7k | VLOG(1) << "Added target expression to the executor"0 ; |
500 | 16.7k | num_exprs++; |
501 | 16.7k | } |
502 | 971k | } |
503 | 692k | if (num_exprs > 0) { |
504 | 11.8k | bool match; |
505 | 16.7k | for (const PgsqlColRefPB& column_ref : request_.col_refs()) { |
506 | 16.7k | RETURN_NOT_OK(expr_exec.AddColumnRef(column_ref)); |
507 | 16.7k | VLOG(1) << "Added column reference to the executor"0 ; |
508 | 16.7k | } |
509 | 11.8k | results.resize(num_exprs); |
510 | 11.8k | RETURN_NOT_OK(expr_exec.Exec(table_row, &results, &match)); |
511 | 11.8k | } |
512 | 971k | for (const auto& column_value : request_.column_new_values())692k { |
513 | | // Get the column. |
514 | 971k | if (!column_value.has_column_id()) { |
515 | 0 | return STATUS(InternalError, "column id missing", column_value.DebugString()); |
516 | 0 | } |
517 | 971k | const ColumnId column_id(column_value.column_id()); |
518 | 971k | const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id)); |
519 | | // Evaluate column value. |
520 | 0 | QLExprResult expr_result; |
521 | | |
522 | 971k | if (GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kPgEvalExprCall) { |
523 | 16.7k | expr_result = std::move(results[cur_expr++]); |
524 | 954k | } else { |
525 | | // Check column-write operator. |
526 | 954k | SCHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert, |
527 | 954k | InternalError, |
528 | 954k | "Unsupported DocDB Expression"); |
529 | | |
530 | 954k | RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer(), &schema_)); |
531 | 954k | } |
532 | | |
533 | | // Update RETURNING values |
534 | 971k | if (request_.targets_size()) { |
535 | 6 | returning_table_row.AllocColumn(column_id, expr_result.Value()); |
536 | 6 | } |
537 | | |
538 | | // Inserting into specified column. |
539 | 971k | DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id)); |
540 | 971k | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
541 | 971k | sub_path, ValueRef(expr_result.Value(), column.sorting_type()), data.read_time, |
542 | 971k | data.deadline, request_.stmt_id())); |
543 | 971k | skipped = false; |
544 | 971k | } |
545 | 692k | } else { |
546 | | // This UPDATE is calling PGGATE directly without going thru PostgreSQL layer. |
547 | | // Keep it here as we might need it. |
548 | | |
549 | | // Very limited support for where expressions. Only used for updates to the sequences data |
550 | | // table. |
551 | 3.02k | bool is_match = true; |
552 | 3.02k | if (request_.has_where_expr()) { |
553 | 2.97k | QLExprResult match; |
554 | 2.97k | RETURN_NOT_OK(EvalExpr(request_.where_expr(), table_row, match.Writer())); |
555 | 2.97k | is_match = match.Value().bool_value(); |
556 | 2.97k | } |
557 | | |
558 | 3.02k | if (is_match) { |
559 | 5.99k | for (const auto &column_value : request_.column_new_values()) { |
560 | | // Get the column. |
561 | 5.99k | if (!column_value.has_column_id()) { |
562 | 0 | return STATUS(InternalError, "column id missing", column_value.DebugString()); |
563 | 0 | } |
564 | 5.99k | const ColumnId column_id(column_value.column_id()); |
565 | 5.99k | const ColumnSchema& column = VERIFY_RESULT(schema_.column_by_id(column_id)); |
566 | | |
567 | | // Check column-write operator. |
568 | 0 | CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert) |
569 | 0 | << "Illegal write instruction"; |
570 | | |
571 | | // Evaluate column value. |
572 | 5.99k | QLExprResult expr_result; |
573 | 5.99k | RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, expr_result.Writer())); |
574 | | |
575 | | // Inserting into specified column. |
576 | 5.99k | DocPath sub_path(encoded_doc_key_.as_slice(), PrimitiveValue(column_id)); |
577 | 5.99k | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
578 | 5.99k | sub_path, ValueRef(expr_result.Value(), column.sorting_type()), data.read_time, |
579 | 5.99k | data.deadline, request_.stmt_id())); |
580 | 5.99k | skipped = false; |
581 | 5.99k | } |
582 | 2.98k | } |
583 | 3.02k | } |
584 | | |
585 | 695k | if (request_.targets_size()) { |
586 | | // Returning the values after the update. |
587 | 6 | RETURN_NOT_OK(PopulateResultSet(returning_table_row)); |
588 | 695k | } else { |
589 | | // Returning the values before the update. |
590 | 695k | RETURN_NOT_OK(PopulateResultSet(table_row)); |
591 | 695k | } |
592 | | |
593 | 695k | if (skipped) { |
594 | 0 | response_->set_skipped(true); |
595 | 0 | } |
596 | 695k | response_->set_rows_affected_count(1); |
597 | 695k | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK); |
598 | 695k | return Status::OK(); |
599 | 695k | } |
600 | | |
601 | | Status PgsqlWriteOperation::ApplyDelete( |
602 | | const DocOperationApplyData& data, |
603 | 907k | const bool is_persist_needed) { |
604 | 907k | int num_deleted = 1; |
605 | 907k | QLTableRow table_row; |
606 | 907k | RETURN_NOT_OK(ReadColumns(data, &table_row)); |
607 | 907k | if (table_row.IsEmpty()) { |
608 | | // Row not found. |
609 | | // Return early unless we still want to apply the delete for backfill purposes. Deletes to |
610 | | // nonexistent rows are expected to get written to the index when the index has the delete |
611 | | // permission during an online schema migration. num_deleted should be 0 because we don't want |
612 | | // to report back to the user that we deleted 1 row; response_ should not set skipped because it |
613 | | // will prevent tombstone intents from getting applied. |
614 | 42 | if (!is_persist_needed) { |
615 | 37 | response_->set_skipped(true); |
616 | 37 | return Status::OK(); |
617 | 37 | } |
618 | 5 | num_deleted = 0; |
619 | 5 | } |
620 | | |
621 | | // TODO(neil) Add support for WHERE clause. |
622 | 907k | CHECK(request_.column_values_size() == 0) << "WHERE clause condition is not yet fully supported"11 ; |
623 | | |
624 | | // Otherwise, delete the referenced row (all columns). |
625 | 907k | RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(DocPath( |
626 | 907k | encoded_doc_key_.as_slice()), data.read_time, data.deadline)); |
627 | | |
628 | 907k | RETURN_NOT_OK(PopulateResultSet(table_row)); |
629 | | |
630 | 907k | response_->set_rows_affected_count(num_deleted); |
631 | 907k | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK); |
632 | 907k | return Status::OK(); |
633 | 907k | } |
634 | | |
635 | 90 | Status PgsqlWriteOperation::ApplyTruncateColocated(const DocOperationApplyData& data) { |
636 | 90 | RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(DocPath( |
637 | 90 | encoded_doc_key_.as_slice()), data.read_time, data.deadline)); |
638 | 90 | response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK); |
639 | 90 | return Status::OK(); |
640 | 90 | } |
641 | | |
642 | | Status PgsqlWriteOperation::ReadColumns(const DocOperationApplyData& data, |
643 | 6.21M | QLTableRow* table_row) { |
644 | | // Filter the columns using primary key. |
645 | 6.21M | if (doc_key_) { |
646 | 6.21M | Schema projection; |
647 | 6.21M | RETURN_NOT_OK(CreateProjection(schema_, request_.column_refs(), &projection)); |
648 | 6.21M | DocPgsqlScanSpec spec(projection, request_.stmt_id(), *doc_key_); |
649 | 6.21M | DocRowwiseIterator iterator(projection, |
650 | 6.21M | schema_, |
651 | 6.21M | txn_op_context_, |
652 | 6.21M | data.doc_write_batch->doc_db(), |
653 | 6.21M | data.deadline, |
654 | 6.21M | data.read_time); |
655 | 6.21M | RETURN_NOT_OK(iterator.Init(spec)); |
656 | 6.21M | if (VERIFY_RESULT(iterator.HasNext())) { |
657 | 1.61M | RETURN_NOT_OK(iterator.NextRow(table_row)); |
658 | 4.60M | } else { |
659 | 4.60M | table_row->Clear(); |
660 | 4.60M | } |
661 | 6.21M | data.restart_read_ht->MakeAtLeast(iterator.RestartReadHt()); |
662 | 6.21M | } |
663 | | |
664 | 6.21M | return Status::OK(); |
665 | 6.21M | } |
666 | | |
667 | 12.8M | Status PgsqlWriteOperation::PopulateResultSet(const QLTableRow& table_row) { |
668 | 12.8M | if (result_buffer_.empty()) { |
669 | | // Reserve space for num rows. |
670 | 12.8M | pggate::PgWire::WriteInt64(0, &result_buffer_); |
671 | 12.8M | } |
672 | 12.8M | ++result_rows_; |
673 | 12.8M | int rscol_index = 0; |
674 | 12.8M | for (const PgsqlExpressionPB& expr : request_.targets()) { |
675 | 23 | if (expr.has_column_id()) { |
676 | 23 | QLExprResult value; |
677 | 23 | if (expr.column_id() == static_cast<int>(PgSystemAttrNum::kYBTupleId)) { |
678 | | // Strip cotable ID / colocation ID from the serialized DocKey before returning it |
679 | | // as ybctid. |
680 | 0 | Slice tuple_id = encoded_doc_key_.as_slice(); |
681 | 0 | if (tuple_id.starts_with(ValueTypeAsChar::kTableId)) { |
682 | 0 | tuple_id.remove_prefix(1 + kUuidSize); |
683 | 0 | } else if (tuple_id.starts_with(ValueTypeAsChar::kColocationId)) { |
684 | 0 | tuple_id.remove_prefix(1 + sizeof(ColocationId)); |
685 | 0 | } |
686 | 0 | value.Writer().NewValue().set_binary_value(tuple_id.data(), tuple_id.size()); |
687 | 23 | } else { |
688 | 23 | RETURN_NOT_OK(EvalExpr(expr, table_row, value.Writer())); |
689 | 23 | } |
690 | 23 | RETURN_NOT_OK(pggate::WriteColumn(value.Value(), &result_buffer_)); |
691 | 23 | } |
692 | 23 | rscol_index++; |
693 | 23 | } |
694 | 12.8M | return Status::OK(); |
695 | 12.8M | } |
696 | | |
697 | | Status PgsqlWriteOperation::GetDocPaths(GetDocPathsMode mode, |
698 | | DocPathsToLock *paths, |
699 | 20.0M | IsolationLevel *level) const { |
700 | | // When this write operation requires a read, it requires a read snapshot so paths will be locked |
701 | | // in snapshot isolation for consistency. Otherwise, pure writes will happen in serializable |
702 | | // isolation so that they will serialize but do not conflict with one another. |
703 | | // |
704 | | // Currently, only keys that are being written are locked, no lock is taken on read at the |
705 | | // snapshot isolation level. |
706 | 20.0M | *level = RequireReadSnapshot() ? IsolationLevel::SNAPSHOT_ISOLATION12.5M |
707 | 20.0M | : IsolationLevel::SERIALIZABLE_ISOLATION7.54M ; |
708 | | |
709 | 20.0M | switch (mode) { |
710 | 12.9M | case GetDocPathsMode::kLock: { |
711 | | // Weak intent is required to lock the row and prevent it from being removed. |
712 | | // For this purpose path for row's SystemColumnIds::kLivenessColumn column is returned. |
713 | | // The caller code will create strong intent for returned path (raw's column doc key) |
714 | | // and weak intents for all its prefixes (including row's doc key). |
715 | 12.9M | if (!encoded_doc_key_) { |
716 | 0 | return Status::OK(); |
717 | 0 | } |
718 | 12.9M | if (request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) { |
719 | | // In case of UPDATE some columns may have expressions instead of exact value. |
720 | | // These expressions may read column value. |
721 | | // Potentially expression for updating column v1 may read value of column v2. |
722 | | // |
723 | | // UPDATE t SET v = v + 10 WHERE k = 1 |
724 | | // UPDATE t SET v1 = v2 + 10 WHERE k = 1 |
725 | | // |
726 | | // Strong intent for the whole row is required in this case as it may be too expensive to |
727 | | // determine what exact columns are read by the expression. |
728 | | |
729 | 995k | for (const auto& column_value : request_.column_new_values()) { |
730 | 995k | if (!column_value.expr().has_value()) { |
731 | 11.9k | paths->push_back(encoded_doc_key_); |
732 | 11.9k | return Status::OK(); |
733 | 11.9k | } |
734 | 995k | } |
735 | 718k | } |
736 | 12.9M | DocKeyColumnPathBuilder builder(encoded_doc_key_); |
737 | 12.9M | paths->push_back(builder.Build(to_underlying(SystemColumnIds::kLivenessColumn))); |
738 | 12.9M | break; |
739 | 12.9M | } |
740 | 7.10M | case GetDocPathsMode::kIntents: { |
741 | 7.10M | const google::protobuf::RepeatedPtrField<PgsqlColumnValuePB>* column_values = nullptr; |
742 | 7.10M | if (request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_INSERT || |
743 | 7.10M | request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPSERT2.52M ) { |
744 | 5.49M | column_values = &request_.column_values(); |
745 | 5.49M | } else if (1.60M request_.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE1.60M ) { |
746 | 702k | column_values = &request_.column_new_values(); |
747 | 702k | } |
748 | | |
749 | 7.10M | if (column_values != nullptr && !column_values->empty()6.20M ) { |
750 | 5.17M | DocKeyColumnPathBuilder builder(encoded_doc_key_); |
751 | 15.1M | for (const auto& column_value : *column_values) { |
752 | 15.1M | paths->push_back(builder.Build(column_value.column_id())); |
753 | 15.1M | } |
754 | 5.17M | } else if (1.92M encoded_doc_key_1.92M ) { |
755 | 1.92M | paths->push_back(encoded_doc_key_); |
756 | 1.92M | } |
757 | 7.10M | break; |
758 | 12.9M | } |
759 | 20.0M | } |
760 | 20.0M | return Status::OK(); |
761 | 20.0M | } |
762 | | |
763 | | //-------------------------------------------------------------------------------------------------- |
764 | | |
765 | | Result<size_t> PgsqlReadOperation::Execute(const YQLStorageIf& ql_storage, |
766 | | CoarseTimePoint deadline, |
767 | | const ReadHybridTime& read_time, |
768 | | bool is_explicit_request_read_time, |
769 | | const Schema& schema, |
770 | | const Schema *index_schema, |
771 | | faststring *result_buffer, |
772 | 3.47M | HybridTime *restart_read_ht) { |
773 | 3.47M | size_t fetched_rows = 0; |
774 | | // Reserve space for fetched rows count. |
775 | 3.47M | pggate::PgWire::WriteInt64(0, result_buffer); |
776 | 3.47M | auto se = ScopeExit([&fetched_rows, result_buffer] { |
777 | 3.47M | NetworkByteOrder::Store64(result_buffer->data(), fetched_rows); |
778 | 3.47M | }); |
779 | 3.47M | VLOG(4) << "Read, read time: " << read_time << ", txn: " << txn_op_context_2.24k ; |
780 | | |
781 | | // Fetching data. |
782 | 3.47M | bool has_paging_state = false; |
783 | 3.47M | if (request_.batch_arguments_size() > 0) { |
784 | 6.35k | SCHECK(request_.has_ybctid_column_value(), |
785 | 6.35k | InternalError, |
786 | 6.35k | "ybctid arguments can be batched only"); |
787 | 6.35k | fetched_rows = VERIFY_RESULT(ExecuteBatchYbctid( |
788 | 6.35k | ql_storage, deadline, read_time, schema, |
789 | 6.35k | result_buffer, restart_read_ht)); |
790 | 3.47M | } else if (request_.has_sampling_state()) { |
791 | 406 | fetched_rows = VERIFY_RESULT(ExecuteSample( |
792 | 406 | ql_storage, deadline, read_time, is_explicit_request_read_time, schema, |
793 | 406 | result_buffer, restart_read_ht, &has_paging_state)); |
794 | 3.46M | } else { |
795 | 3.46M | fetched_rows = VERIFY_RESULT(ExecuteScalar( |
796 | 3.46M | ql_storage, deadline, read_time, is_explicit_request_read_time, schema, index_schema, |
797 | 3.46M | result_buffer, restart_read_ht, &has_paging_state)); |
798 | 3.46M | } |
799 | | |
800 | 3.47M | VTRACE(1, "Fetched $0 rows. $1 paging state", fetched_rows, (has_paging_state ? "No" : "Has")); |
801 | 3.47M | *restart_read_ht = table_iter_->RestartReadHt(); |
802 | 3.47M | return fetched_rows; |
803 | 3.47M | } |
804 | | |
805 | | Result<size_t> PgsqlReadOperation::ExecuteSample(const YQLStorageIf& ql_storage, |
806 | | CoarseTimePoint deadline, |
807 | | const ReadHybridTime& read_time, |
808 | | bool is_explicit_request_read_time, |
809 | | const Schema& schema, |
810 | | faststring *result_buffer, |
811 | | HybridTime *restart_read_ht, |
812 | 406 | bool *has_paging_state) { |
813 | 406 | *has_paging_state = false; |
814 | 406 | size_t scanned_rows = 0; |
815 | 406 | PgsqlSamplingStatePB sampling_state = request_.sampling_state(); |
816 | | // Requested total number of rows to collect |
817 | 406 | int targrows = sampling_state.targrows(); |
818 | | // Number of rows collected so far |
819 | 406 | int numrows = sampling_state.numrows(); |
820 | | // Total number of rows scanned |
821 | 406 | double samplerows = sampling_state.samplerows(); |
822 | | // Current number of rows to skip before collecting next one for sample |
823 | 406 | double rowstoskip = sampling_state.rowstoskip(); |
824 | | // Variables for the random numbers generator |
825 | 406 | YbgPrepareMemoryContext(); |
826 | 406 | YbgReservoirState rstate = NULL; |
827 | 406 | YbgSamplerCreate(sampling_state.rstate_w(), sampling_state.rand_state(), &rstate); |
828 | | // Buffer to hold selected row ids from the current page |
829 | 406 | std::unique_ptr<QLValuePB[]> reservoir = std::make_unique<QLValuePB[]>(targrows); |
830 | | // Number of rows to scan for the current page. |
831 | | // Too low row count limit is inefficient since we have to allocate and initialize a reservoir |
832 | | // capable to hold potentially large (targrows) number of tuples. The row count has to be at least |
833 | | // targrows for a chance to fill up the reservoir. Actually, the algorithm selects targrows only |
834 | | // for very first page of the table, then it starts to skip tuples, the further it goes, the more |
835 | | // it skips. For a large enough table it eventually starts to select less than targrows per page, |
836 | | // regardless of the row_count_limit. |
837 | | // Anyways, double targrows seems like reasonable minimum for the row_count_limit. |
838 | 406 | size_t row_count_limit = 2 * targrows; |
839 | 406 | if (request_.has_limit() && request_.limit() > row_count_limit) { |
840 | 0 | row_count_limit = request_.limit(); |
841 | 0 | } |
842 | | // Request is not supposed to contain any column refs, we just need the liveness column. |
843 | 406 | Schema projection; |
844 | 406 | RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection)); |
845 | | // Request may carry paging state, CreateIterator takes care of positioning |
846 | 406 | table_iter_ = VERIFY_RESULT(CreateIterator( |
847 | 0 | ql_storage, request_, projection, schema, txn_op_context_, |
848 | 0 | deadline, read_time, is_explicit_request_read_time)); |
849 | 0 | bool scan_time_exceeded = false; |
850 | 406 | CoarseTimePoint stop_scan = deadline - FLAGS_ysql_scan_deadline_margin_ms * 1ms; |
851 | 162k | while (scanned_rows++ < row_count_limit && |
852 | 162k | VERIFY_RESULT(table_iter_->HasNext()) && |
853 | 162k | !scan_time_exceeded161k ) { |
854 | 161k | if (numrows < targrows) { |
855 | | // Select first targrows of the table. If first partition(s) have less than that, next |
856 | | // partition starts to continue populating it's reservoir starting from the numrows' position: |
857 | | // the numrows, as well as other sampling state variables is returned and copied over to the |
858 | | // next sampling request |
859 | 161k | Slice ybctid = VERIFY_RESULT(table_iter_->GetTupleId()); |
860 | 0 | reservoir[numrows++].set_binary_value(ybctid.data(), ybctid.size()); |
861 | 161k | } else { |
862 | | // At least targrows tuples have already been collected, now algorithm skips increasing number |
863 | | // of row before taking next one into the reservoir |
864 | 0 | if (rowstoskip <= 0) { |
865 | | // Take ybctid of the current row |
866 | 0 | Slice ybctid = VERIFY_RESULT(table_iter_->GetTupleId()); |
867 | | // Pick random tuple in the reservoir to replace |
868 | 0 | double rvalue; |
869 | 0 | int k; |
870 | 0 | YbgSamplerRandomFract(rstate, &rvalue); |
871 | 0 | k = static_cast<int>(targrows * rvalue); |
872 | | // Overwrite previous value with new one |
873 | 0 | reservoir[k].set_binary_value(ybctid.data(), ybctid.size()); |
874 | | // Choose next number of rows to skip |
875 | 0 | YbgReservoirGetNextS(rstate, samplerows, targrows, &rowstoskip); |
876 | 0 | } else { |
877 | 0 | rowstoskip -= 1; |
878 | 0 | } |
879 | 0 | } |
880 | | // Taking tuple ID does not advance the table iterator. Move it now. |
881 | 161k | table_iter_->SkipRow(); |
882 | | // Check if we are running out of time |
883 | 161k | scan_time_exceeded = CoarseMonoClock::now() >= stop_scan; |
884 | 161k | } |
885 | | // Count live rows we have scanned TODO how to count dead rows? |
886 | 406 | samplerows += (scanned_rows - 1); |
887 | | // Return collected tuples from the reservoir. |
888 | | // Tuples are returned as (index, ybctid) pairs, where index is in [0..targrows-1] range. |
889 | | // As mentioned above, for large tables reservoirs become increasingly sparse from page to page. |
890 | | // So we hope to save by sending variable number of index/ybctid pairs vs exactly targrows of |
891 | | // nullable ybctids. It also helps in case of extremely small table or partition. |
892 | 406 | int fetched_rows = 0; |
893 | 295k | for (int i = 0; i < numrows; i++294k ) { |
894 | 294k | QLValuePB index; |
895 | 294k | if (reservoir[i].has_binary_value()) { |
896 | 161k | index.set_int32_value(i); |
897 | 161k | RETURN_NOT_OK(pggate::WriteColumn(index, result_buffer)); |
898 | 161k | RETURN_NOT_OK(pggate::WriteColumn(reservoir[i], result_buffer)); |
899 | 161k | fetched_rows++; |
900 | 161k | } |
901 | 294k | } |
902 | | |
903 | | // Return sampling state to continue with next page |
904 | 406 | PgsqlSamplingStatePB *new_sampling_state = response_.mutable_sampling_state(); |
905 | 406 | new_sampling_state->set_numrows(numrows); |
906 | 406 | new_sampling_state->set_targrows(targrows); |
907 | 406 | new_sampling_state->set_samplerows(samplerows); |
908 | 406 | new_sampling_state->set_rowstoskip(rowstoskip); |
909 | 406 | uint64_t randstate = 0; |
910 | 406 | double rstate_w = 0; |
911 | 406 | YbgSamplerGetState(rstate, &rstate_w, &randstate); |
912 | 406 | new_sampling_state->set_rstate_w(rstate_w); |
913 | 406 | new_sampling_state->set_rand_state(randstate); |
914 | 406 | YbgDeleteMemoryContext(); |
915 | | |
916 | | // Return paging state if scan has not been completed |
917 | 406 | RETURN_NOT_OK(SetPagingStateIfNecessary(table_iter_.get(), scanned_rows, row_count_limit, |
918 | 406 | scan_time_exceeded, &schema, read_time, |
919 | 406 | has_paging_state)); |
920 | 406 | return fetched_rows; |
921 | 406 | } |
922 | | |
923 | | Result<size_t> PgsqlReadOperation::ExecuteScalar(const YQLStorageIf& ql_storage, |
924 | | CoarseTimePoint deadline, |
925 | | const ReadHybridTime& read_time, |
926 | | bool is_explicit_request_read_time, |
927 | | const Schema& schema, |
928 | | const Schema *index_schema, |
929 | | faststring *result_buffer, |
930 | | HybridTime *restart_read_ht, |
931 | 3.46M | bool *has_paging_state) { |
932 | 3.46M | *has_paging_state = false; |
933 | | |
934 | 3.46M | size_t fetched_rows = 0; |
935 | 3.46M | size_t row_count_limit = std::numeric_limits<std::size_t>::max(); |
936 | 3.46M | if (request_.has_limit()) { |
937 | 3.46M | if (request_.limit() == 0) { |
938 | 0 | return fetched_rows; |
939 | 0 | } |
940 | 3.46M | row_count_limit = request_.limit(); |
941 | 3.46M | } |
942 | | |
943 | | // Create the projection of regular columns selected by the row block plus any referenced in |
944 | | // the WHERE condition. When DocRowwiseIterator::NextRow() populates the value map, it uses this |
945 | | // projection only to scan sub-documents. The query schema is used to select only referenced |
946 | | // columns and key columns. |
947 | 3.46M | Schema projection; |
948 | 3.46M | Schema index_projection; |
949 | 3.46M | YQLRowwiseIteratorIf *iter; |
950 | 3.46M | const Schema* scan_schema; |
951 | 3.46M | DocPgExprExecutor expr_exec(&schema); |
952 | | |
953 | 3.46M | if (!request_.col_refs().empty()) { |
954 | 3.44M | RETURN_NOT_OK(CreateProjection(schema, request_.col_refs(), &projection)); |
955 | 3.44M | } else { |
956 | | // Compatibility: Either request indeed has no column refs, or it comes from a legacy node. |
957 | 22.4k | RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection)); |
958 | 22.4k | } |
959 | 3.46M | table_iter_ = VERIFY_RESULT(CreateIterator( |
960 | 0 | ql_storage, request_, projection, schema, txn_op_context_, |
961 | 0 | deadline, read_time, is_explicit_request_read_time)); |
962 | | |
963 | 0 | ColumnId ybbasectid_id; |
964 | 3.46M | if (request_.has_index_request()) { |
965 | 458k | const PgsqlReadRequestPB& index_request = request_.index_request(); |
966 | 458k | RETURN_NOT_OK(CreateProjection(*index_schema, index_request.column_refs(), &index_projection)); |
967 | 458k | index_iter_ = VERIFY_RESULT(CreateIterator( |
968 | 0 | ql_storage, index_request, index_projection, *index_schema, txn_op_context_, |
969 | 0 | deadline, read_time, is_explicit_request_read_time)); |
970 | 0 | iter = index_iter_.get(); |
971 | 458k | const auto idx = index_schema->find_column("ybidxbasectid"); |
972 | 458k | SCHECK_NE(idx, Schema::kColumnNotFound, Corruption, "ybidxbasectid not found in index schema"); |
973 | 458k | ybbasectid_id = index_schema->column_id(idx); |
974 | 458k | scan_schema = index_schema; |
975 | 3.00M | } else { |
976 | 3.00M | iter = table_iter_.get(); |
977 | 3.00M | scan_schema = &schema; |
978 | 20.7M | for (const PgsqlColRefPB& column_ref : request_.col_refs()) { |
979 | 20.7M | RETURN_NOT_OK(expr_exec.AddColumnRef(column_ref)); |
980 | 20.7M | VLOG(1) << "Added column reference to the executor"4.12k ; |
981 | 20.7M | } |
982 | 3.00M | for (const PgsqlExpressionPB& expr : request_.where_clauses()) { |
983 | 372 | RETURN_NOT_OK(expr_exec.AddWhereExpression(expr)); |
984 | 372 | VLOG(1) << "Added where expression to the executor"1 ; |
985 | 372 | } |
986 | 3.00M | } |
987 | | |
988 | 3.46M | VLOG(1) << "Started iterator"2.22k ; |
989 | | |
990 | | // Set scan start time. |
991 | 3.46M | bool scan_time_exceeded = false; |
992 | 3.46M | CoarseTimePoint stop_scan = deadline - FLAGS_ysql_scan_deadline_margin_ms * 1ms; |
993 | | |
994 | | // Fetching data. |
995 | 3.46M | int match_count = 0; |
996 | 3.46M | QLTableRow row; |
997 | 60.2M | while (fetched_rows < row_count_limit && VERIFY_RESULT(iter->HasNext()) && |
998 | 60.2M | !scan_time_exceeded56.8M ) { |
999 | 56.8M | row.Clear(); |
1000 | | |
1001 | | // If there is an index request, fetch ybbasectid from the index and use it as ybctid |
1002 | | // to fetch from the base table. Otherwise, fetch from the base table directly. |
1003 | 56.8M | if (request_.has_index_request()) { |
1004 | 1.13M | RETURN_NOT_OK(iter->NextRow(&row)); |
1005 | 1.13M | const auto& tuple_id = row.GetValue(ybbasectid_id); |
1006 | 1.13M | SCHECK_NE(tuple_id, boost::none, Corruption, "ybbasectid not found in index row"); |
1007 | 1.13M | if (!VERIFY_RESULT(table_iter_->SeekTuple(tuple_id->binary_value()))) { |
1008 | 0 | DocKey doc_key; |
1009 | 0 | RETURN_NOT_OK(doc_key.DecodeFrom(tuple_id->binary_value())); |
1010 | 0 | return STATUS_FORMAT(Corruption, "ybctid $0 not found in indexed table", doc_key); |
1011 | 0 | } |
1012 | 1.13M | row.Clear(); |
1013 | 1.13M | RETURN_NOT_OK(table_iter_->NextRow(projection, &row)); |
1014 | 55.6M | } else { |
1015 | 55.6M | RETURN_NOT_OK(iter->NextRow(projection, &row)); |
1016 | 55.6M | } |
1017 | | |
1018 | | // Match the row with the where condition before adding to the row block. |
1019 | 56.8M | bool is_match = true; |
1020 | 56.8M | RETURN_NOT_OK(expr_exec.Exec(row, nullptr, &is_match)); |
1021 | 56.8M | if (is_match) { |
1022 | 56.7M | match_count++; |
1023 | 56.7M | if (request_.is_aggregate()) { |
1024 | 3.14M | RETURN_NOT_OK(EvalAggregate(row)); |
1025 | 53.6M | } else { |
1026 | 53.6M | RETURN_NOT_OK(PopulateResultSet(row, result_buffer)); |
1027 | 53.6M | ++fetched_rows; |
1028 | 53.6M | } |
1029 | 56.7M | } |
1030 | | |
1031 | | // Check if we are running out of time |
1032 | 56.8M | scan_time_exceeded = CoarseMonoClock::now() >= stop_scan; |
1033 | 56.8M | } |
1034 | | |
1035 | 3.46M | VLOG(1) << "Stopped iterator after " << match_count << " matches, " |
1036 | 4.23k | << fetched_rows << " rows fetched"; |
1037 | 3.46M | VLOG(1) << "Deadline is " << (2.18k scan_time_exceeded2.18k ? ""0 : "not "2.18k ) << "exceeded"; |
1038 | | |
1039 | 3.46M | if (request_.is_aggregate() && match_count > 018.1k ) { |
1040 | 16.2k | RETURN_NOT_OK(PopulateAggregate(row, result_buffer)); |
1041 | 16.2k | ++fetched_rows; |
1042 | 16.2k | } |
1043 | | |
1044 | 3.46M | if (PREDICT_FALSE(FLAGS_TEST_slowdown_pgsql_aggregate_read_ms > 0) && request_.is_aggregate()1.14k ) { |
1045 | 425 | TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_pgsql_aggregate_read_ms); |
1046 | 425 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_pgsql_aggregate_read_ms)); |
1047 | 425 | } |
1048 | | |
1049 | 3.46M | RETURN_NOT_OK(SetPagingStateIfNecessary( |
1050 | 3.46M | iter, fetched_rows, row_count_limit, scan_time_exceeded, scan_schema, |
1051 | 3.46M | read_time, has_paging_state)); |
1052 | 3.46M | return fetched_rows; |
1053 | 3.46M | } |
1054 | | |
1055 | | Result<size_t> PgsqlReadOperation::ExecuteBatchYbctid(const YQLStorageIf& ql_storage, |
1056 | | CoarseTimePoint deadline, |
1057 | | const ReadHybridTime& read_time, |
1058 | | const Schema& schema, |
1059 | | faststring *result_buffer, |
1060 | 6.36k | HybridTime *restart_read_ht) { |
1061 | 6.36k | Schema projection; |
1062 | 6.36k | RETURN_NOT_OK(CreateProjection(schema, request_.column_refs(), &projection)); |
1063 | | |
1064 | 6.36k | QLTableRow row; |
1065 | 6.36k | size_t row_count = 0; |
1066 | 1.54M | for (const PgsqlBatchArgumentPB& batch_argument : request_.batch_arguments()) { |
1067 | | // Get the row. |
1068 | 1.54M | RETURN_NOT_OK(ql_storage.GetIterator(request_.stmt_id(), projection, schema, txn_op_context_, |
1069 | 1.54M | deadline, read_time, batch_argument.ybctid().value(), |
1070 | 1.54M | &table_iter_)); |
1071 | | |
1072 | 1.54M | if (VERIFY_RESULT(table_iter_->HasNext())) { |
1073 | 1.54M | row.Clear(); |
1074 | 1.54M | RETURN_NOT_OK(table_iter_->NextRow(projection, &row)); |
1075 | | |
1076 | | // Populate result set. |
1077 | 1.54M | RETURN_NOT_OK(PopulateResultSet(row, result_buffer)); |
1078 | 1.54M | response_.add_batch_orders(batch_argument.order()); |
1079 | 1.54M | row_count++; |
1080 | 1.54M | } |
1081 | 1.54M | } |
1082 | | |
1083 | | // Set status for this batch. |
1084 | | // Mark all rows were processed even in case some of the ybctids were not found. |
1085 | 6.36k | response_.set_batch_arg_count(request_.batch_arguments_size()); |
1086 | | |
1087 | 6.36k | return row_count; |
1088 | 6.36k | } |
1089 | | |
1090 | | Status PgsqlReadOperation::SetPagingStateIfNecessary(const YQLRowwiseIteratorIf* iter, |
1091 | | size_t fetched_rows, |
1092 | | const size_t row_count_limit, |
1093 | | const bool scan_time_exceeded, |
1094 | | const Schema* schema, |
1095 | | const ReadHybridTime& read_time, |
1096 | 3.46M | bool *has_paging_state) { |
1097 | 3.46M | *has_paging_state = false; |
1098 | 3.46M | if (!request_.return_paging_state()) { |
1099 | 3.23k | return Status::OK(); |
1100 | 3.23k | } |
1101 | | |
1102 | | // Set the paging state for next row. |
1103 | 3.46M | if (fetched_rows >= row_count_limit || scan_time_exceeded3.39M ) { |
1104 | 65.3k | SubDocKey next_row_key; |
1105 | 65.3k | RETURN_NOT_OK(iter->GetNextReadSubDocKey(&next_row_key)); |
1106 | | // When the "limit" number of rows are returned and we are asked to return the paging state, |
1107 | | // return the partition key and row key of the next row to read in the paging state if there are |
1108 | | // still more rows to read. Otherwise, leave the paging state empty which means we are done |
1109 | | // reading from this tablet. |
1110 | 65.3k | if (!next_row_key.doc_key().empty()) { |
1111 | 62.8k | const auto& keybytes = next_row_key.Encode(); |
1112 | 62.8k | PgsqlPagingStatePB* paging_state = response_.mutable_paging_state(); |
1113 | 62.8k | RSTATUS_DCHECK(schema != nullptr, IllegalState, "Missing schema"); |
1114 | 62.8k | if (schema->num_hash_key_columns() > 0) { |
1115 | 28.7k | paging_state->set_next_partition_key( |
1116 | 28.7k | PartitionSchema::EncodeMultiColumnHashValue(next_row_key.doc_key().hash())); |
1117 | 34.1k | } else { |
1118 | 34.1k | paging_state->set_next_partition_key(keybytes.ToStringBuffer()); |
1119 | 34.1k | } |
1120 | 62.8k | paging_state->set_next_row_key(keybytes.ToStringBuffer()); |
1121 | 62.8k | *has_paging_state = true; |
1122 | 62.8k | } |
1123 | 65.3k | } |
1124 | 3.46M | if (*has_paging_state) { |
1125 | 62.8k | if (FLAGS_pgsql_consistent_transactional_paging) { |
1126 | 62.8k | read_time.AddToPB(response_.mutable_paging_state()); |
1127 | 62.8k | } else { |
1128 | | // Using SingleTime will help avoid read restarts on second page and later but will |
1129 | | // potentially produce stale results on those pages. |
1130 | 17 | auto per_row_consistent_read_time = ReadHybridTime::SingleTime(read_time.read); |
1131 | 17 | per_row_consistent_read_time.AddToPB(response_.mutable_paging_state()); |
1132 | 17 | } |
1133 | 62.8k | } |
1134 | | |
1135 | 3.46M | return Status::OK(); |
1136 | 3.46M | } |
1137 | | |
1138 | | Status PgsqlReadOperation::PopulateResultSet(const QLTableRow& table_row, |
1139 | 55.1M | faststring *result_buffer) { |
1140 | 55.1M | QLExprResult result; |
1141 | 656M | for (const PgsqlExpressionPB& expr : request_.targets()) { |
1142 | 656M | RETURN_NOT_OK(EvalExpr(expr, table_row, result.Writer())); |
1143 | 656M | RETURN_NOT_OK(pggate::WriteColumn(result.Value(), result_buffer)); |
1144 | 656M | } |
1145 | 55.1M | return Status::OK(); |
1146 | 55.1M | } |
1147 | | |
1148 | 41.2M | Status PgsqlReadOperation::GetTupleId(QLValue *result) const { |
1149 | | // Get row key and save to QLValue. |
1150 | | // TODO(neil) Check if we need to append a table_id and other info to TupleID. For example, we |
1151 | | // might need info to make sure the TupleId by itself is a valid reference to a specific row of |
1152 | | // a valid table. |
1153 | 41.2M | const Slice tuple_id = VERIFY_RESULT(table_iter_->GetTupleId()); |
1154 | 0 | result->set_binary_value(tuple_id.data(), tuple_id.size()); |
1155 | 41.2M | return Status::OK(); |
1156 | 41.2M | } |
1157 | | |
1158 | 3.14M | Status PgsqlReadOperation::EvalAggregate(const QLTableRow& table_row) { |
1159 | 3.14M | if (aggr_result_.empty()) { |
1160 | 16.1k | int column_count = request_.targets().size(); |
1161 | 16.1k | aggr_result_.resize(column_count); |
1162 | 16.1k | } |
1163 | | |
1164 | 3.14M | int aggr_index = 0; |
1165 | 3.14M | for (const PgsqlExpressionPB& expr : request_.targets()) { |
1166 | 3.14M | RETURN_NOT_OK(EvalExpr(expr, table_row, aggr_result_[aggr_index++].Writer())); |
1167 | 3.14M | } |
1168 | 3.14M | return Status::OK(); |
1169 | 3.14M | } |
1170 | | |
1171 | | Status PgsqlReadOperation::PopulateAggregate(const QLTableRow& table_row, |
1172 | 16.3k | faststring *result_buffer) { |
1173 | 16.3k | int column_count = request_.targets().size(); |
1174 | 32.6k | for (int rscol_index = 0; rscol_index < column_count; rscol_index++16.3k ) { |
1175 | 16.3k | RETURN_NOT_OK(pggate::WriteColumn(aggr_result_[rscol_index].Value(), result_buffer)); |
1176 | 16.3k | } |
1177 | 16.3k | return Status::OK(); |
1178 | 16.3k | } |
1179 | | |
1180 | 2.76M | Status PgsqlReadOperation::GetIntents(const Schema& schema, KeyValueWriteBatchPB* out) { |
1181 | 2.76M | if (request_.batch_arguments_size() > 0 && request_.has_ybctid_column_value()2.26k ) { |
1182 | 101k | for (const auto& batch_argument : request_.batch_arguments()) { |
1183 | 101k | SCHECK(batch_argument.has_ybctid(), InternalError, "ybctid batch argument is expected"); |
1184 | 101k | RETURN_NOT_OK(AddIntent(batch_argument.ybctid(), request_.wait_policy(), out)); |
1185 | 101k | } |
1186 | 2.76M | } else { |
1187 | 2.76M | AddIntent(VERIFY_RESULT(FetchEncodedDocKey(schema, request_)), request_.wait_policy(), out); |
1188 | 2.76M | } |
1189 | 2.76M | return Status::OK(); |
1190 | 2.76M | } |
1191 | | |
1192 | | } // namespace docdb |
1193 | | } // namespace yb |