/Users/deen/code/yugabyte-db/src/yb/docdb/doc_pgsql_scanspec.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/doc_pgsql_scanspec.h" |
15 | | |
16 | | #include <boost/optional/optional_io.hpp> |
17 | | |
18 | | #include "yb/common/pgsql_protocol.pb.h" |
19 | | #include "yb/common/schema.h" |
20 | | |
21 | | #include "yb/docdb/doc_key.h" |
22 | | #include "yb/docdb/doc_scanspec_util.h" |
23 | | #include "yb/docdb/value_type.h" |
24 | | |
25 | | #include "yb/rocksdb/db/compaction.h" |
26 | | |
27 | | #include "yb/util/result.h" |
28 | | #include "yb/util/status_format.h" |
29 | | |
30 | | DECLARE_bool(disable_hybrid_scan); |
31 | | |
32 | | namespace yb { |
33 | | namespace docdb { |
34 | | |
35 | | //-------------------------------------------------------------------------------------------------- |
36 | | extern rocksdb::UserBoundaryTag TagForRangeComponent(size_t index); |
37 | | |
38 | | // TODO(neil) The following implementation is just a prototype. Need to complete the implementation |
39 | | // and test accordingly. |
40 | | class PgsqlRangeBasedFileFilter : public rocksdb::ReadFileFilter { |
41 | | public: |
42 | | PgsqlRangeBasedFileFilter(const std::vector<PrimitiveValue>& lower_bounds, |
43 | | const std::vector<PrimitiveValue>& upper_bounds) |
44 | | : lower_bounds_(EncodePrimitiveValues(lower_bounds, upper_bounds.size())), |
45 | 11.6M | upper_bounds_(EncodePrimitiveValues(upper_bounds, lower_bounds.size())) { |
46 | 11.6M | } |
47 | | |
48 | | std::vector<KeyBytes> EncodePrimitiveValues(const std::vector<PrimitiveValue>& source, |
49 | 23.3M | size_t min_size) { |
50 | 23.3M | size_t size = source.size(); |
51 | 23.3M | std::vector<KeyBytes> result(std::max(min_size, size)); |
52 | 37.4M | for (size_t i = 0; i != size; ++i14.0M ) { |
53 | 14.0M | if (source[i].value_type() != ValueType::kTombstone) { |
54 | 14.0M | source[i].AppendToKey(&result[i]); |
55 | 14.0M | } |
56 | 14.0M | } |
57 | 23.3M | return result; |
58 | 23.3M | } |
59 | | |
60 | 9.20M | bool Filter(const rocksdb::FdWithBoundaries& file) const override { |
61 | 20.6M | for (size_t i = 0; i != lower_bounds_.size(); ++i11.4M ) { |
62 | 11.4M | const Slice lower_bound = lower_bounds_[i].AsSlice(); |
63 | 11.4M | const Slice upper_bound = upper_bounds_[i].AsSlice(); |
64 | | |
65 | 11.4M | rocksdb::UserBoundaryTag tag = TagForRangeComponent(i); |
66 | 11.4M | const Slice *smallest = file.smallest.user_value_with_tag(tag); |
67 | 11.4M | const Slice *largest = file.largest.user_value_with_tag(tag); |
68 | | |
69 | 11.4M | if (!GreaterOrEquals(&upper_bound, smallest) || !GreaterOrEquals(largest, &lower_bound)11.4M ) { |
70 | 2.34k | return false; |
71 | 2.34k | } |
72 | 11.4M | } |
73 | 9.20M | return true; |
74 | 9.20M | } |
75 | | |
76 | 22.9M | bool GreaterOrEquals(const Slice *lhs, const Slice *rhs) const { |
77 | | // TODO(neil) Need to double check this NULL-equals-all logic or make the code clearer. |
78 | 22.9M | if (lhs == nullptr || rhs == nullptr20.4M ) { |
79 | 4.82M | return true; |
80 | 4.82M | } |
81 | 18.0M | if (lhs->empty() || rhs->empty()18.0M ) { |
82 | 6.79M | return true; |
83 | 6.79M | } |
84 | 11.2M | return lhs->compare(*rhs) >= 0; |
85 | 18.0M | } |
86 | | |
87 | | private: |
88 | | std::vector<KeyBytes> lower_bounds_; |
89 | | std::vector<KeyBytes> upper_bounds_; |
90 | | }; |
91 | | |
92 | | //-------------------------------------------------------------------------------------------------- |
93 | | |
94 | | DocPgsqlScanSpec::DocPgsqlScanSpec(const Schema& schema, |
95 | | const rocksdb::QueryId query_id, |
96 | | const DocKey& doc_key, |
97 | | const boost::optional<int32_t> hash_code, |
98 | | const boost::optional<int32_t> max_hash_code, |
99 | | const DocKey& start_doc_key, |
100 | | bool is_forward_scan) |
101 | | : PgsqlScanSpec(nullptr), |
102 | | schema_(schema), |
103 | | query_id_(query_id), |
104 | | hashed_components_(nullptr), |
105 | | range_components_(nullptr), |
106 | | hash_code_(hash_code), |
107 | | max_hash_code_(max_hash_code), |
108 | | start_doc_key_(start_doc_key.empty() ? KeyBytes() : start_doc_key.Encode()), |
109 | | lower_doc_key_(doc_key.Encode()), |
110 | 8.65M | is_forward_scan_(is_forward_scan) { |
111 | | |
112 | | // Compute lower and upper doc_key. |
113 | | // We add +inf as an extra component to make sure this is greater than all keys in range. |
114 | | // For lower bound, this is true already, because dockey + suffix is > dockey |
115 | 8.65M | upper_doc_key_ = lower_doc_key_; |
116 | | |
117 | 8.65M | if (hash_code && !doc_key.has_hash()289k ) { |
118 | 92.4k | DocKey lower_doc_key = DocKey(doc_key); |
119 | 92.4k | lower_doc_key.set_hash(*hash_code); |
120 | 92.5k | if (lower_doc_key.hashed_group().empty()92.4k ) { |
121 | 92.5k | lower_doc_key.hashed_group() |
122 | 92.5k | .push_back(PrimitiveValue(ValueType::kLowest)); |
123 | 92.5k | } |
124 | 92.4k | lower_doc_key_ = lower_doc_key.Encode(); |
125 | 92.4k | } |
126 | | |
127 | 8.65M | if (max_hash_code) { |
128 | 210k | DocKey upper_doc_key = DocKey(doc_key); |
129 | 210k | upper_doc_key.set_hash(*max_hash_code); |
130 | 210k | if (upper_doc_key.hashed_group().empty()) { |
131 | 13.4k | upper_doc_key.hashed_group() |
132 | 13.4k | .push_back(PrimitiveValue(ValueType::kHighest)); |
133 | 13.4k | } |
134 | 210k | upper_doc_key_ = upper_doc_key.Encode(); |
135 | 210k | } |
136 | | |
137 | 8.65M | upper_doc_key_.AppendValueTypeBeforeGroupEnd(ValueType::kHighest); |
138 | 8.65M | } |
139 | | |
140 | | DocPgsqlScanSpec::DocPgsqlScanSpec( |
141 | | const Schema& schema, |
142 | | const rocksdb::QueryId query_id, |
143 | | std::reference_wrapper<const std::vector<PrimitiveValue>> hashed_components, |
144 | | std::reference_wrapper<const std::vector<PrimitiveValue>> range_components, |
145 | | const PgsqlConditionPB* condition, |
146 | | const boost::optional<int32_t> hash_code, |
147 | | const boost::optional<int32_t> max_hash_code, |
148 | | const PgsqlExpressionPB *where_expr, |
149 | | const DocKey& start_doc_key, |
150 | | bool is_forward_scan, |
151 | | const DocKey& lower_doc_key, |
152 | | const DocKey& upper_doc_key) |
153 | | : PgsqlScanSpec(where_expr), |
154 | | range_bounds_(condition ? new QLScanRange(schema, *condition) : nullptr), |
155 | | schema_(schema), |
156 | | query_id_(query_id), |
157 | | hashed_components_(&hashed_components.get()), |
158 | | range_components_(&range_components.get()), |
159 | | hash_code_(hash_code), |
160 | | max_hash_code_(max_hash_code), |
161 | | start_doc_key_(start_doc_key.empty() ? KeyBytes() : start_doc_key.Encode()), |
162 | | lower_doc_key_(lower_doc_key.Encode()), |
163 | | upper_doc_key_(upper_doc_key.Encode()), |
164 | 3.03M | is_forward_scan_(is_forward_scan) { |
165 | | |
166 | 3.03M | auto lower_bound_key = bound_key(schema, true); |
167 | 3.03M | lower_doc_key_ = lower_bound_key > lower_doc_key_ |
168 | 3.03M | || lower_doc_key.empty()568k |
169 | 3.03M | ? lower_bound_key3.03M : lower_doc_key_1.94k ; |
170 | | |
171 | 3.03M | auto upper_bound_key = bound_key(schema, false); |
172 | 3.03M | upper_doc_key_ = upper_bound_key < upper_doc_key_ |
173 | 3.03M | || upper_doc_key.empty()2.87M |
174 | 18.4E | ? upper_bound_key3.03M : upper_doc_key_; |
175 | | |
176 | 3.03M | if (where_expr_) { |
177 | | // Should never get here until WHERE clause is supported. |
178 | 0 | LOG(FATAL) << "DEVELOPERS: Add support for condition (where clause)"; |
179 | 0 | } |
180 | | |
181 | 3.03M | if (range_bounds_) { |
182 | 13.2k | range_bounds_indexes_ = range_bounds_->GetColIds(); |
183 | 13.2k | } |
184 | | |
185 | | // If the hash key is fixed and we have range columns with IN condition, try to construct the |
186 | | // exact list of range options to scan for. |
187 | 3.03M | if ((!hashed_components_->empty() || schema_.num_hash_key_columns() == 01.19M ) && |
188 | 3.03M | schema_.num_range_key_columns() > 03.02M && |
189 | 3.03M | range_bounds_3.02M && range_bounds_->has_in_range_options()13.2k ) { |
190 | 678 | DCHECK(condition); |
191 | 678 | range_options_ = |
192 | 678 | std::make_shared<std::vector<std::vector<PrimitiveValue>>>(schema_.num_range_key_columns()); |
193 | 678 | InitRangeOptions(*condition); |
194 | | |
195 | 678 | if (FLAGS_disable_hybrid_scan) { |
196 | | // Range options are only valid if all |
197 | | // range columns are set (i.e. have one or more options) |
198 | | // when hybrid scan is disabled |
199 | 0 | for (size_t i = 0; i < schema_.num_range_key_columns(); i++) { |
200 | 0 | if ((*range_options_)[i].empty()) { |
201 | 0 | range_options_ = nullptr; |
202 | 0 | break; |
203 | 0 | } |
204 | 0 | } |
205 | 0 | } |
206 | 678 | } |
207 | 3.03M | } |
208 | | |
209 | 1.43k | void DocPgsqlScanSpec::InitRangeOptions(const PgsqlConditionPB& condition) { |
210 | 1.43k | size_t num_hash_cols = schema_.num_hash_key_columns(); |
211 | 1.43k | switch (condition.op()) { |
212 | 678 | case QLOperator::QL_OP_AND: |
213 | 753 | for (const auto& operand : condition.operands()) { |
214 | 753 | DCHECK(operand.has_condition()); |
215 | 753 | InitRangeOptions(operand.condition()); |
216 | 753 | } |
217 | 678 | break; |
218 | | |
219 | 15 | case QLOperator::QL_OP_EQUAL: |
220 | 727 | case QLOperator::QL_OP_IN: { |
221 | 727 | DCHECK_EQ(condition.operands_size(), 2); |
222 | | // Skip any condition where LHS is not a column (e.g. subscript columns: 'map[k] = v') |
223 | 727 | if (condition.operands(0).expr_case() != PgsqlExpressionPB::kColumnId) { |
224 | 0 | return; |
225 | 0 | } |
226 | | |
227 | | // Skip any RHS expressions that are not evaluated yet. |
228 | 727 | if (condition.operands(1).expr_case() != PgsqlExpressionPB::kValue) { |
229 | 0 | return; |
230 | 0 | } |
231 | | |
232 | 727 | int col_idx = schema_.find_column_by_id(ColumnId(condition.operands(0).column_id())); |
233 | | |
234 | | // Skip any non-range columns. |
235 | 727 | if (!schema_.is_range_column(col_idx)) { |
236 | 0 | return; |
237 | 0 | } |
238 | | |
239 | 727 | SortingType sortingType = schema_.column(col_idx).sorting_type(); |
240 | 727 | range_options_indexes_.emplace_back(condition.operands(0).column_id()); |
241 | | |
242 | 727 | if (condition.op() == QL_OP_EQUAL) { |
243 | 15 | auto pv = PrimitiveValue::FromQLValuePB(condition.operands(1).value(), sortingType); |
244 | 15 | (*range_options_)[col_idx - num_hash_cols].push_back(std::move(pv)); |
245 | 712 | } else { // QL_OP_IN |
246 | 712 | DCHECK_EQ(condition.op(), QL_OP_IN); |
247 | 712 | DCHECK(condition.operands(1).value().has_list_value()); |
248 | 712 | const auto &options = condition.operands(1).value().list_value(); |
249 | 712 | int opt_size = options.elems_size(); |
250 | 712 | (*range_options_)[col_idx - num_hash_cols].reserve(opt_size); |
251 | | |
252 | | // IN arguments should have been de-duplicated and ordered ascendingly by the executor. |
253 | 712 | bool is_reverse_order = is_forward_scan_ ^ (sortingType == SortingType::kAscending || |
254 | 712 | sortingType == SortingType::kAscendingNullsLast); |
255 | 2.59k | for (int i = 0; i < opt_size; i++1.88k ) { |
256 | 1.88k | int elem_idx = is_reverse_order ? opt_size - i - 1412 : i1.46k ; |
257 | 1.88k | const auto &elem = options.elems(elem_idx); |
258 | 1.88k | auto pv = PrimitiveValue::FromQLValuePB(elem, sortingType); |
259 | 1.88k | (*range_options_)[col_idx - num_hash_cols].push_back(std::move(pv)); |
260 | 1.88k | } |
261 | 712 | } |
262 | | |
263 | 727 | break; |
264 | 727 | } |
265 | | |
266 | 26 | default: |
267 | | // We don't support any other operators at this level. |
268 | 26 | break; |
269 | 1.43k | } |
270 | 1.43k | } |
271 | | |
272 | 6.06M | KeyBytes DocPgsqlScanSpec::bound_key(const Schema& schema, const bool lower_bound) const { |
273 | 6.06M | KeyBytes result; |
274 | 6.06M | auto encoder = DocKeyEncoder(&result).Schema(schema); |
275 | | |
276 | 6.06M | bool has_hash_columns = schema_.num_hash_key_columns() > 0; |
277 | 6.06M | bool hash_components_unset = has_hash_columns && hashed_components_->empty()3.70M ; |
278 | 6.06M | if (hash_components_unset) { |
279 | | // use lower bound hash code if set in request (for scans using token) |
280 | 17.0k | if (lower_bound && hash_code_8.51k ) { |
281 | 8.51k | encoder.HashAndRange(*hash_code_, |
282 | 8.51k | {PrimitiveValue(ValueType::kLowest)}, |
283 | 8.51k | {PrimitiveValue(ValueType::kLowest)}); |
284 | 8.51k | } |
285 | | // use upper bound hash code if set in request (for scans using token) |
286 | 17.0k | if (!lower_bound) { |
287 | 8.51k | if (max_hash_code_) { |
288 | 1.00k | encoder.HashAndRange(*max_hash_code_, |
289 | 1.00k | {PrimitiveValue(ValueType::kHighest)}, |
290 | 1.00k | {PrimitiveValue(ValueType::kHighest)}); |
291 | 7.50k | } else { |
292 | 7.50k | result.AppendValueTypeBeforeGroupEnd(ValueType::kHighest); |
293 | 7.50k | } |
294 | 8.51k | } |
295 | 17.0k | return result; |
296 | 17.0k | } |
297 | | |
298 | 6.05M | if (has_hash_columns) { |
299 | 3.68M | uint16_t hash = lower_bound |
300 | 3.68M | ? hash_code_.get_value_or(std::numeric_limits<DocKeyHash>::min())1.84M |
301 | 3.68M | : max_hash_code_.get_value_or(std::numeric_limits<DocKeyHash>::max())1.84M ; |
302 | | |
303 | 3.68M | encoder.HashAndRange(hash, *hashed_components_, range_components(lower_bound)); |
304 | 3.68M | } else { |
305 | | // If no hash columns use default hash code (0). |
306 | 2.36M | encoder.Hash(false, 0, *hashed_components_).Range(range_components(lower_bound)); |
307 | 2.36M | } |
308 | 6.05M | return result; |
309 | 6.06M | } |
310 | | |
311 | 29.4M | std::vector<PrimitiveValue> DocPgsqlScanSpec::range_components(const bool lower_bound) const { |
312 | 29.4M | return GetRangeKeyScanSpec(schema_, range_components_, range_bounds_.get(), lower_bound); |
313 | 29.4M | } |
314 | | |
315 | | // Return inclusive lower/upper range doc key considering the start_doc_key. |
316 | 23.3M | Result<KeyBytes> DocPgsqlScanSpec::Bound(const bool lower_bound) const { |
317 | 23.3M | if (start_doc_key_.empty()) { |
318 | 23.2M | return lower_bound ? lower_doc_key_11.6M : upper_doc_key_11.6M ; |
319 | 23.2M | } |
320 | | |
321 | | // When paging state is present, start_doc_key_ should have been provided, and the scan starting |
322 | | // point should be start_doc_key_ instead of the initial bounds. |
323 | 96.9k | if (start_doc_key_ < lower_doc_key_ || start_doc_key_ > upper_doc_key_90.0k ) { |
324 | 0 | return STATUS_FORMAT(Corruption, "Invalid start_doc_key: $0. Range: $1, $2", |
325 | 0 | start_doc_key_, lower_doc_key_, upper_doc_key_); |
326 | 0 | } |
327 | | |
328 | | // Paging state + forward scan. |
329 | 96.9k | if (is_forward_scan_) { |
330 | 90.0k | return lower_bound ? start_doc_key_45.0k : upper_doc_key_45.0k ; |
331 | 90.0k | } |
332 | | |
333 | | // Paging state + reverse scan. |
334 | 6.91k | if (lower_bound) { |
335 | 5 | return lower_doc_key_; |
336 | 5 | } |
337 | | |
338 | | // If using start_doc_key_ as upper bound append +inf as extra component to ensure it includes |
339 | | // the target start_doc_key itself (dockey + suffix < dockey + kHighest). |
340 | | // For lower bound, this is true already, because dockey + suffix is > dockey. |
341 | 6.90k | KeyBytes result = start_doc_key_; |
342 | 6.90k | result.AppendValueTypeBeforeGroupEnd(ValueType::kHighest); |
343 | 6.90k | return result; |
344 | 6.91k | } |
345 | | |
346 | 11.6M | std::shared_ptr<rocksdb::ReadFileFilter> DocPgsqlScanSpec::CreateFileFilter() const { |
347 | 11.6M | auto lower_bound = range_components(true); |
348 | 11.6M | auto upper_bound = range_components(false); |
349 | 11.6M | if (lower_bound.empty() && upper_bound.empty()11.0M ) { |
350 | 0 | return std::shared_ptr<rocksdb::ReadFileFilter>(); |
351 | 11.6M | } else { |
352 | 11.6M | return std::make_shared<PgsqlRangeBasedFileFilter>(std::move(lower_bound), |
353 | 11.6M | std::move(upper_bound)); |
354 | 11.6M | } |
355 | 11.6M | } |
356 | | |
357 | 11.6M | Result<KeyBytes> DocPgsqlScanSpec::LowerBound() const { |
358 | 11.6M | return Bound(true /* lower_bound */); |
359 | 11.6M | } |
360 | | |
361 | 11.6M | Result<KeyBytes> DocPgsqlScanSpec::UpperBound() const { |
362 | 11.6M | return Bound(false /* upper_bound */); |
363 | 11.6M | } |
364 | | |
365 | 7.79M | const DocKey& DocPgsqlScanSpec::DefaultStartDocKey() { |
366 | 7.79M | static const DocKey result; |
367 | 7.79M | return result; |
368 | 7.79M | } |
369 | | |
370 | | } // namespace docdb |
371 | | } // namespace yb |