/Users/deen/code/yugabyte-db/src/yb/docdb/doc_pgsql_scanspec.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/doc_pgsql_scanspec.h" |
15 | | |
16 | | #include <boost/optional/optional_io.hpp> |
17 | | |
18 | | #include "yb/common/pgsql_protocol.pb.h" |
19 | | #include "yb/common/schema.h" |
20 | | |
21 | | #include "yb/docdb/doc_key.h" |
22 | | #include "yb/docdb/doc_scanspec_util.h" |
23 | | #include "yb/docdb/value_type.h" |
24 | | |
25 | | #include "yb/rocksdb/db/compaction.h" |
26 | | |
27 | | #include "yb/util/result.h" |
28 | | #include "yb/util/status_format.h" |
29 | | |
30 | | DECLARE_bool(disable_hybrid_scan); |
31 | | |
32 | | namespace yb { |
33 | | namespace docdb { |
34 | | |
35 | | //-------------------------------------------------------------------------------------------------- |
36 | | extern rocksdb::UserBoundaryTag TagForRangeComponent(size_t index); |
37 | | |
38 | | // TODO(neil) The following implementation is just a prototype. Need to complete the implementation |
39 | | // and test accordingly. |
40 | | class PgsqlRangeBasedFileFilter : public rocksdb::ReadFileFilter { |
41 | | public: |
42 | | PgsqlRangeBasedFileFilter(const std::vector<PrimitiveValue>& lower_bounds, |
43 | | const std::vector<PrimitiveValue>& upper_bounds) |
44 | | : lower_bounds_(EncodePrimitiveValues(lower_bounds, upper_bounds.size())), |
45 | 4.32M | upper_bounds_(EncodePrimitiveValues(upper_bounds, lower_bounds.size())) { |
46 | 4.32M | } |
47 | | |
48 | | std::vector<KeyBytes> EncodePrimitiveValues(const std::vector<PrimitiveValue>& source, |
49 | 8.65M | size_t min_size) { |
50 | 8.65M | size_t size = source.size(); |
51 | 8.65M | std::vector<KeyBytes> result(std::max(min_size, size)); |
52 | 13.7M | for (size_t i = 0; i != size; ++i) { |
53 | 5.10M | if (source[i].value_type() != ValueType::kTombstone) { |
54 | 5.10M | source[i].AppendToKey(&result[i]); |
55 | 5.10M | } |
56 | 5.10M | } |
57 | 8.65M | return result; |
58 | 8.65M | } |
59 | | |
60 | 2.91M | bool Filter(const rocksdb::FdWithBoundaries& file) const override { |
61 | 6.58M | for (size_t i = 0; i != lower_bounds_.size(); ++i) { |
62 | 3.67M | const Slice lower_bound = lower_bounds_[i].AsSlice(); |
63 | 3.67M | const Slice upper_bound = upper_bounds_[i].AsSlice(); |
64 | | |
65 | 3.67M | rocksdb::UserBoundaryTag tag = TagForRangeComponent(i); |
66 | 3.67M | const Slice *smallest = file.smallest.user_value_with_tag(tag); |
67 | 3.67M | const Slice *largest = file.largest.user_value_with_tag(tag); |
68 | | |
69 | 3.67M | if (!GreaterOrEquals(&upper_bound, smallest) || !GreaterOrEquals(largest, &lower_bound)) { |
70 | 890 | return false; |
71 | 890 | } |
72 | 3.67M | } |
73 | 2.90M | return true; |
74 | 2.91M | } |
75 | | |
76 | 7.34M | bool GreaterOrEquals(const Slice *lhs, const Slice *rhs) const { |
77 | | // TODO(neil) Need to double check this NULL-equals-all logic or make the code clearer. |
78 | 7.34M | if (lhs == nullptr || rhs == nullptr) { |
79 | 163k | return true; |
80 | 163k | } |
81 | 7.18M | if (lhs->empty() || rhs->empty()) { |
82 | 2.82M | return true; |
83 | 2.82M | } |
84 | 4.35M | return lhs->compare(*rhs) >= 0; |
85 | 4.35M | } |
86 | | |
87 | | private: |
88 | | std::vector<KeyBytes> lower_bounds_; |
89 | | std::vector<KeyBytes> upper_bounds_; |
90 | | }; |
91 | | |
92 | | //-------------------------------------------------------------------------------------------------- |
93 | | |
94 | | DocPgsqlScanSpec::DocPgsqlScanSpec(const Schema& schema, |
95 | | const rocksdb::QueryId query_id, |
96 | | const DocKey& doc_key, |
97 | | const boost::optional<int32_t> hash_code, |
98 | | const boost::optional<int32_t> max_hash_code, |
99 | | const DocKey& start_doc_key, |
100 | | bool is_forward_scan) |
101 | | : PgsqlScanSpec(nullptr), |
102 | | schema_(schema), |
103 | | query_id_(query_id), |
104 | | hashed_components_(nullptr), |
105 | | range_components_(nullptr), |
106 | | hash_code_(hash_code), |
107 | | max_hash_code_(max_hash_code), |
108 | | start_doc_key_(start_doc_key.empty() ? KeyBytes() : start_doc_key.Encode()), |
109 | | lower_doc_key_(doc_key.Encode()), |
110 | 3.00M | is_forward_scan_(is_forward_scan) { |
111 | | |
112 | | // Compute lower and upper doc_key. |
113 | | // We add +inf as an extra component to make sure this is greater than all keys in range. |
114 | | // For lower bound, this is true already, because dockey + suffix is > dockey |
115 | 3.00M | upper_doc_key_ = lower_doc_key_; |
116 | | |
117 | 3.00M | if (hash_code && !doc_key.has_hash()) { |
118 | 32.5k | DocKey lower_doc_key = DocKey(doc_key); |
119 | 32.5k | lower_doc_key.set_hash(*hash_code); |
120 | 32.5k | if (lower_doc_key.hashed_group().empty()) { |
121 | 32.5k | lower_doc_key.hashed_group() |
122 | 32.5k | .push_back(PrimitiveValue(ValueType::kLowest)); |
123 | 32.5k | } |
124 | 32.5k | lower_doc_key_ = lower_doc_key.Encode(); |
125 | 32.5k | } |
126 | | |
127 | 3.00M | if (max_hash_code) { |
128 | 128k | DocKey upper_doc_key = DocKey(doc_key); |
129 | 128k | upper_doc_key.set_hash(*max_hash_code); |
130 | 128k | if (upper_doc_key.hashed_group().empty()) { |
131 | 428 | upper_doc_key.hashed_group() |
132 | 428 | .push_back(PrimitiveValue(ValueType::kHighest)); |
133 | 428 | } |
134 | 128k | upper_doc_key_ = upper_doc_key.Encode(); |
135 | 128k | } |
136 | | |
137 | 3.00M | upper_doc_key_.AppendValueTypeBeforeGroupEnd(ValueType::kHighest); |
138 | 3.00M | } |
139 | | |
140 | | DocPgsqlScanSpec::DocPgsqlScanSpec( |
141 | | const Schema& schema, |
142 | | const rocksdb::QueryId query_id, |
143 | | std::reference_wrapper<const std::vector<PrimitiveValue>> hashed_components, |
144 | | std::reference_wrapper<const std::vector<PrimitiveValue>> range_components, |
145 | | const PgsqlConditionPB* condition, |
146 | | const boost::optional<int32_t> hash_code, |
147 | | const boost::optional<int32_t> max_hash_code, |
148 | | const PgsqlExpressionPB *where_expr, |
149 | | const DocKey& start_doc_key, |
150 | | bool is_forward_scan) |
151 | | : PgsqlScanSpec(where_expr), |
152 | | range_bounds_(condition ? new QLScanRange(schema, *condition) : nullptr), |
153 | | schema_(schema), |
154 | | query_id_(query_id), |
155 | | hashed_components_(&hashed_components.get()), |
156 | | range_components_(&range_components.get()), |
157 | | hash_code_(hash_code), |
158 | | max_hash_code_(max_hash_code), |
159 | | start_doc_key_(start_doc_key.empty() ? KeyBytes() : start_doc_key.Encode()), |
160 | | lower_doc_key_(bound_key(schema, true)), |
161 | | upper_doc_key_(bound_key(schema, false)), |
162 | 1.32M | is_forward_scan_(is_forward_scan) { |
163 | 1.32M | if (where_expr_) { |
164 | | // Should never get here until WHERE clause is supported. |
165 | 0 | LOG(FATAL) << "DEVELOPERS: Add support for condition (where clause)"; |
166 | 0 | } |
167 | | |
168 | 1.32M | if (range_bounds_) { |
169 | 4.72k | range_bounds_indexes_ = range_bounds_->GetColIds(); |
170 | 4.72k | } |
171 | | |
172 | | // If the hash key is fixed and we have range columns with IN condition, try to construct the |
173 | | // exact list of range options to scan for. |
174 | 1.32M | if ((!hashed_components_->empty() || schema_.num_hash_key_columns() == 0) && |
175 | 1.31M | schema_.num_range_key_columns() > 0 && |
176 | 1.31M | range_bounds_ && range_bounds_->has_in_range_options()) { |
177 | 612 | DCHECK(condition); |
178 | 612 | range_options_ = |
179 | 612 | std::make_shared<std::vector<std::vector<PrimitiveValue>>>(schema_.num_range_key_columns()); |
180 | 612 | InitRangeOptions(*condition); |
181 | | |
182 | 612 | if (FLAGS_disable_hybrid_scan) { |
183 | | // Range options are only valid if all |
184 | | // range columns are set (i.e. have one or more options) |
185 | | // when hybrid scan is disabled |
186 | 0 | for (size_t i = 0; i < schema_.num_range_key_columns(); i++) { |
187 | 0 | if ((*range_options_)[i].empty()) { |
188 | 0 | range_options_ = nullptr; |
189 | 0 | break; |
190 | 0 | } |
191 | 0 | } |
192 | 0 | } |
193 | 612 | } |
194 | 1.32M | } |
195 | | |
196 | 1.27k | void DocPgsqlScanSpec::InitRangeOptions(const PgsqlConditionPB& condition) { |
197 | 1.27k | size_t num_hash_cols = schema_.num_hash_key_columns(); |
198 | 1.27k | switch (condition.op()) { |
199 | 612 | case QLOperator::QL_OP_AND: |
200 | 659 | for (const auto& operand : condition.operands()) { |
201 | 659 | DCHECK(operand.has_condition()); |
202 | 659 | InitRangeOptions(operand.condition()); |
203 | 659 | } |
204 | 612 | break; |
205 | | |
206 | 7 | case QLOperator::QL_OP_EQUAL: |
207 | 635 | case QLOperator::QL_OP_IN: { |
208 | 635 | DCHECK_EQ(condition.operands_size(), 2); |
209 | | // Skip any condition where LHS is not a column (e.g. subscript columns: 'map[k] = v') |
210 | 635 | if (condition.operands(0).expr_case() != PgsqlExpressionPB::kColumnId) { |
211 | 0 | return; |
212 | 0 | } |
213 | | |
214 | | // Skip any RHS expressions that are not evaluated yet. |
215 | 635 | if (condition.operands(1).expr_case() != PgsqlExpressionPB::kValue) { |
216 | 0 | return; |
217 | 0 | } |
218 | | |
219 | 635 | int col_idx = schema_.find_column_by_id(ColumnId(condition.operands(0).column_id())); |
220 | | |
221 | | // Skip any non-range columns. |
222 | 635 | if (!schema_.is_range_column(col_idx)) { |
223 | 0 | return; |
224 | 0 | } |
225 | | |
226 | 635 | SortingType sortingType = schema_.column(col_idx).sorting_type(); |
227 | 635 | range_options_indexes_.emplace_back(condition.operands(0).column_id()); |
228 | | |
229 | 635 | if (condition.op() == QL_OP_EQUAL) { |
230 | 7 | auto pv = PrimitiveValue::FromQLValuePB(condition.operands(1).value(), sortingType); |
231 | 7 | (*range_options_)[col_idx - num_hash_cols].push_back(std::move(pv)); |
232 | 628 | } else { // QL_OP_IN |
233 | 628 | DCHECK_EQ(condition.op(), QL_OP_IN); |
234 | 628 | DCHECK(condition.operands(1).value().has_list_value()); |
235 | 628 | const auto &options = condition.operands(1).value().list_value(); |
236 | 628 | int opt_size = options.elems_size(); |
237 | 628 | (*range_options_)[col_idx - num_hash_cols].reserve(opt_size); |
238 | | |
239 | | // IN arguments should have been de-duplicated and ordered ascendingly by the executor. |
240 | 628 | bool is_reverse_order = is_forward_scan_ ^ (sortingType == SortingType::kAscending || |
241 | 628 | sortingType == SortingType::kAscendingNullsLast); |
242 | 2.27k | for (int i = 0; i < opt_size; i++) { |
243 | 1.33k | int elem_idx = is_reverse_order ? opt_size - i - 1 : i; |
244 | 1.64k | const auto &elem = options.elems(elem_idx); |
245 | 1.64k | auto pv = PrimitiveValue::FromQLValuePB(elem, sortingType); |
246 | 1.64k | (*range_options_)[col_idx - num_hash_cols].push_back(std::move(pv)); |
247 | 1.64k | } |
248 | 628 | } |
249 | | |
250 | 635 | break; |
251 | 635 | } |
252 | | |
253 | 24 | default: |
254 | | // We don't support any other operators at this level. |
255 | 24 | break; |
256 | 1.27k | } |
257 | 1.27k | } |
258 | | |
259 | 2.64M | KeyBytes DocPgsqlScanSpec::bound_key(const Schema& schema, const bool lower_bound) const { |
260 | 2.64M | KeyBytes result; |
261 | 2.64M | auto encoder = DocKeyEncoder(&result).Schema(schema); |
262 | | |
263 | 2.64M | bool has_hash_columns = schema_.num_hash_key_columns() > 0; |
264 | 2.64M | bool hash_components_unset = has_hash_columns && hashed_components_->empty(); |
265 | 2.64M | if (hash_components_unset) { |
266 | | // use lower bound hash code if set in request (for scans using token) |
267 | 11.8k | if (lower_bound && hash_code_) { |
268 | 5.92k | encoder.HashAndRange(*hash_code_, |
269 | 5.92k | {PrimitiveValue(ValueType::kLowest)}, |
270 | 5.92k | {PrimitiveValue(ValueType::kLowest)}); |
271 | 5.92k | } |
272 | | // use upper bound hash code if set in request (for scans using token) |
273 | 11.8k | if (!lower_bound) { |
274 | 5.92k | if (max_hash_code_) { |
275 | 742 | encoder.HashAndRange(*max_hash_code_, |
276 | 742 | {PrimitiveValue(ValueType::kHighest)}, |
277 | 742 | {PrimitiveValue(ValueType::kHighest)}); |
278 | 5.18k | } else { |
279 | 5.18k | result.AppendValueTypeBeforeGroupEnd(ValueType::kHighest); |
280 | 5.18k | } |
281 | 5.92k | } |
282 | 11.8k | return result; |
283 | 11.8k | } |
284 | | |
285 | 2.63M | if (has_hash_columns) { |
286 | 1.85M | uint16_t hash = lower_bound |
287 | 930k | ? hash_code_.get_value_or(std::numeric_limits<DocKeyHash>::min()) |
288 | 929k | : max_hash_code_.get_value_or(std::numeric_limits<DocKeyHash>::max()); |
289 | | |
290 | 1.85M | encoder.HashAndRange(hash, *hashed_components_, range_components(lower_bound)); |
291 | 773k | } else { |
292 | | // If no hash columns use default hash code (0). |
293 | 773k | encoder.Hash(false, 0, *hashed_components_).Range(range_components(lower_bound)); |
294 | 773k | } |
295 | 2.63M | return result; |
296 | 2.63M | } |
297 | | |
298 | 11.2M | std::vector<PrimitiveValue> DocPgsqlScanSpec::range_components(const bool lower_bound) const { |
299 | 11.2M | return GetRangeKeyScanSpec(schema_, range_components_, range_bounds_.get(), lower_bound); |
300 | 11.2M | } |
301 | | |
302 | | // Return inclusive lower/upper range doc key considering the start_doc_key. |
303 | 8.65M | Result<KeyBytes> DocPgsqlScanSpec::Bound(const bool lower_bound) const { |
304 | 8.65M | if (start_doc_key_.empty()) { |
305 | 4.30M | return lower_bound ? lower_doc_key_ : upper_doc_key_; |
306 | 8.60M | } |
307 | | |
308 | | // When paging state is present, start_doc_key_ should have been provided, and the scan starting |
309 | | // point should be start_doc_key_ instead of the initial bounds. |
310 | 51.0k | if (start_doc_key_ < lower_doc_key_ || start_doc_key_ > upper_doc_key_) { |
311 | 0 | return STATUS_FORMAT(Corruption, "Invalid start_doc_key: $0. Range: $1, $2", |
312 | 0 | start_doc_key_, lower_doc_key_, upper_doc_key_); |
313 | 0 | } |
314 | | |
315 | | // Paging state + forward scan. |
316 | 51.0k | if (is_forward_scan_) { |
317 | 23.6k | return lower_bound ? start_doc_key_ : upper_doc_key_; |
318 | 47.3k | } |
319 | | |
320 | | // Paging state + reverse scan. |
321 | 3.75k | if (lower_bound) { |
322 | 0 | return lower_doc_key_; |
323 | 0 | } |
324 | | |
325 | | // If using start_doc_key_ as upper bound append +inf as extra component to ensure it includes |
326 | | // the target start_doc_key itself (dockey + suffix < dockey + kHighest). |
327 | | // For lower bound, this is true already, because dockey + suffix is > dockey. |
328 | 3.75k | KeyBytes result = start_doc_key_; |
329 | 3.75k | result.AppendValueTypeBeforeGroupEnd(ValueType::kHighest); |
330 | 3.75k | return result; |
331 | 3.75k | } |
332 | | |
333 | 4.32M | std::shared_ptr<rocksdb::ReadFileFilter> DocPgsqlScanSpec::CreateFileFilter() const { |
334 | 4.32M | auto lower_bound = range_components(true); |
335 | 4.32M | auto upper_bound = range_components(false); |
336 | 4.32M | if (lower_bound.empty() && upper_bound.empty()) { |
337 | 0 | return std::shared_ptr<rocksdb::ReadFileFilter>(); |
338 | 4.32M | } else { |
339 | 4.32M | return std::make_shared<PgsqlRangeBasedFileFilter>(std::move(lower_bound), |
340 | 4.32M | std::move(upper_bound)); |
341 | 4.32M | } |
342 | 4.32M | } |
343 | | |
344 | 4.33M | Result<KeyBytes> DocPgsqlScanSpec::LowerBound() const { |
345 | 4.33M | return Bound(true /* lower_bound */); |
346 | 4.33M | } |
347 | | |
348 | 4.32M | Result<KeyBytes> DocPgsqlScanSpec::UpperBound() const { |
349 | 4.32M | return Bound(false /* upper_bound */); |
350 | 4.32M | } |
351 | | |
352 | 2.68M | const DocKey& DocPgsqlScanSpec::DefaultStartDocKey() { |
353 | 2.68M | static const DocKey result; |
354 | 2.68M | return result; |
355 | 2.68M | } |
356 | | |
357 | | } // namespace docdb |
358 | | } // namespace yb |