YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/doc_pgsql_scanspec.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/doc_pgsql_scanspec.h"
15
16
#include <boost/optional/optional_io.hpp>
17
18
#include "yb/common/pgsql_protocol.pb.h"
19
#include "yb/common/schema.h"
20
21
#include "yb/docdb/doc_key.h"
22
#include "yb/docdb/doc_scanspec_util.h"
23
#include "yb/docdb/value_type.h"
24
25
#include "yb/rocksdb/db/compaction.h"
26
27
#include "yb/util/result.h"
28
#include "yb/util/status_format.h"
29
30
DECLARE_bool(disable_hybrid_scan);
31
32
namespace yb {
33
namespace docdb {
34
35
//--------------------------------------------------------------------------------------------------
36
extern rocksdb::UserBoundaryTag TagForRangeComponent(size_t index);
37
38
// TODO(neil) The following implementation is just a prototype. Need to complete the implementation
39
// and test accordingly.
40
class PgsqlRangeBasedFileFilter : public rocksdb::ReadFileFilter {
41
 public:
42
  PgsqlRangeBasedFileFilter(const std::vector<PrimitiveValue>& lower_bounds,
43
                            const std::vector<PrimitiveValue>& upper_bounds)
44
      : lower_bounds_(EncodePrimitiveValues(lower_bounds, upper_bounds.size())),
45
4.32M
        upper_bounds_(EncodePrimitiveValues(upper_bounds, lower_bounds.size())) {
46
4.32M
  }
47
48
  std::vector<KeyBytes> EncodePrimitiveValues(const std::vector<PrimitiveValue>& source,
49
8.65M
                                              size_t min_size) {
50
8.65M
    size_t size = source.size();
51
8.65M
    std::vector<KeyBytes> result(std::max(min_size, size));
52
13.7M
    for (size_t i = 0; i != size; ++i) {
53
5.10M
      if (source[i].value_type() != ValueType::kTombstone) {
54
5.10M
        source[i].AppendToKey(&result[i]);
55
5.10M
      }
56
5.10M
    }
57
8.65M
    return result;
58
8.65M
  }
59
60
2.91M
  bool Filter(const rocksdb::FdWithBoundaries& file) const override {
61
6.58M
    for (size_t i = 0; i != lower_bounds_.size(); ++i) {
62
3.67M
      const Slice lower_bound = lower_bounds_[i].AsSlice();
63
3.67M
      const Slice upper_bound = upper_bounds_[i].AsSlice();
64
65
3.67M
      rocksdb::UserBoundaryTag tag = TagForRangeComponent(i);
66
3.67M
      const Slice *smallest = file.smallest.user_value_with_tag(tag);
67
3.67M
      const Slice *largest = file.largest.user_value_with_tag(tag);
68
69
3.67M
      if (!GreaterOrEquals(&upper_bound, smallest) || !GreaterOrEquals(largest, &lower_bound)) {
70
890
        return false;
71
890
      }
72
3.67M
    }
73
2.90M
    return true;
74
2.91M
  }
75
76
7.34M
  bool GreaterOrEquals(const Slice *lhs, const Slice *rhs) const {
77
    // TODO(neil) Need to double check this NULL-equals-all logic or make the code clearer.
78
7.34M
    if (lhs == nullptr || rhs == nullptr) {
79
163k
      return true;
80
163k
    }
81
7.18M
    if (lhs->empty() || rhs->empty()) {
82
2.82M
      return true;
83
2.82M
    }
84
4.35M
    return lhs->compare(*rhs) >= 0;
85
4.35M
  }
86
87
 private:
88
  std::vector<KeyBytes> lower_bounds_;
89
  std::vector<KeyBytes> upper_bounds_;
90
};
91
92
//--------------------------------------------------------------------------------------------------
93
94
DocPgsqlScanSpec::DocPgsqlScanSpec(const Schema& schema,
95
                                   const rocksdb::QueryId query_id,
96
                                   const DocKey& doc_key,
97
                                   const boost::optional<int32_t> hash_code,
98
                                   const boost::optional<int32_t> max_hash_code,
99
                                   const DocKey& start_doc_key,
100
                                   bool is_forward_scan)
101
    : PgsqlScanSpec(nullptr),
102
      schema_(schema),
103
      query_id_(query_id),
104
      hashed_components_(nullptr),
105
      range_components_(nullptr),
106
      hash_code_(hash_code),
107
      max_hash_code_(max_hash_code),
108
      start_doc_key_(start_doc_key.empty() ? KeyBytes() : start_doc_key.Encode()),
109
      lower_doc_key_(doc_key.Encode()),
110
3.00M
      is_forward_scan_(is_forward_scan) {
111
112
  // Compute lower and upper doc_key.
113
  // We add +inf as an extra component to make sure this is greater than all keys in range.
114
  // For lower bound, this is true already, because dockey + suffix is > dockey
115
3.00M
  upper_doc_key_ = lower_doc_key_;
116
117
3.00M
  if (hash_code && !doc_key.has_hash()) {
118
32.5k
    DocKey lower_doc_key = DocKey(doc_key);
119
32.5k
    lower_doc_key.set_hash(*hash_code);
120
32.5k
    if (lower_doc_key.hashed_group().empty()) {
121
32.5k
      lower_doc_key.hashed_group()
122
32.5k
                    .push_back(PrimitiveValue(ValueType::kLowest));
123
32.5k
    }
124
32.5k
    lower_doc_key_ = lower_doc_key.Encode();
125
32.5k
  }
126
127
3.00M
  if (max_hash_code) {
128
128k
    DocKey upper_doc_key = DocKey(doc_key);
129
128k
    upper_doc_key.set_hash(*max_hash_code);
130
128k
    if (upper_doc_key.hashed_group().empty()) {
131
428
      upper_doc_key.hashed_group()
132
428
                    .push_back(PrimitiveValue(ValueType::kHighest));
133
428
    }
134
128k
    upper_doc_key_ = upper_doc_key.Encode();
135
128k
  }
136
137
3.00M
  upper_doc_key_.AppendValueTypeBeforeGroupEnd(ValueType::kHighest);
138
3.00M
}
139
140
DocPgsqlScanSpec::DocPgsqlScanSpec(
141
    const Schema& schema,
142
    const rocksdb::QueryId query_id,
143
    std::reference_wrapper<const std::vector<PrimitiveValue>> hashed_components,
144
    std::reference_wrapper<const std::vector<PrimitiveValue>> range_components,
145
    const PgsqlConditionPB* condition,
146
    const boost::optional<int32_t> hash_code,
147
    const boost::optional<int32_t> max_hash_code,
148
    const PgsqlExpressionPB *where_expr,
149
    const DocKey& start_doc_key,
150
    bool is_forward_scan)
151
    : PgsqlScanSpec(where_expr),
152
      range_bounds_(condition ? new QLScanRange(schema, *condition) : nullptr),
153
      schema_(schema),
154
      query_id_(query_id),
155
      hashed_components_(&hashed_components.get()),
156
      range_components_(&range_components.get()),
157
      hash_code_(hash_code),
158
      max_hash_code_(max_hash_code),
159
      start_doc_key_(start_doc_key.empty() ? KeyBytes() : start_doc_key.Encode()),
160
      lower_doc_key_(bound_key(schema, true)),
161
      upper_doc_key_(bound_key(schema, false)),
162
1.32M
      is_forward_scan_(is_forward_scan) {
163
1.32M
  if (where_expr_) {
164
    // Should never get here until WHERE clause is supported.
165
0
    LOG(FATAL) << "DEVELOPERS: Add support for condition (where clause)";
166
0
  }
167
168
1.32M
  if (range_bounds_) {
169
4.72k
    range_bounds_indexes_ = range_bounds_->GetColIds();
170
4.72k
  }
171
172
  // If the hash key is fixed and we have range columns with IN condition, try to construct the
173
  // exact list of range options to scan for.
174
1.32M
  if ((!hashed_components_->empty() || schema_.num_hash_key_columns() == 0) &&
175
1.31M
      schema_.num_range_key_columns() > 0 &&
176
1.31M
      range_bounds_ && range_bounds_->has_in_range_options()) {
177
612
    DCHECK(condition);
178
612
    range_options_ =
179
612
        std::make_shared<std::vector<std::vector<PrimitiveValue>>>(schema_.num_range_key_columns());
180
612
    InitRangeOptions(*condition);
181
182
612
    if (FLAGS_disable_hybrid_scan) {
183
        // Range options are only valid if all
184
        // range columns are set (i.e. have one or more options)
185
        // when hybrid scan is disabled
186
0
        for (size_t i = 0; i < schema_.num_range_key_columns(); i++) {
187
0
            if ((*range_options_)[i].empty()) {
188
0
                range_options_ = nullptr;
189
0
                break;
190
0
            }
191
0
        }
192
0
    }
193
612
  }
194
1.32M
}
195
196
1.27k
void DocPgsqlScanSpec::InitRangeOptions(const PgsqlConditionPB& condition) {
197
1.27k
  size_t num_hash_cols = schema_.num_hash_key_columns();
198
1.27k
  switch (condition.op()) {
199
612
    case QLOperator::QL_OP_AND:
200
659
      for (const auto& operand : condition.operands()) {
201
659
        DCHECK(operand.has_condition());
202
659
        InitRangeOptions(operand.condition());
203
659
      }
204
612
      break;
205
206
7
    case QLOperator::QL_OP_EQUAL:
207
635
    case QLOperator::QL_OP_IN: {
208
635
      DCHECK_EQ(condition.operands_size(), 2);
209
      // Skip any condition where LHS is not a column (e.g. subscript columns: 'map[k] = v')
210
635
      if (condition.operands(0).expr_case() != PgsqlExpressionPB::kColumnId) {
211
0
        return;
212
0
      }
213
214
      // Skip any RHS expressions that are not evaluated yet.
215
635
      if (condition.operands(1).expr_case() != PgsqlExpressionPB::kValue) {
216
0
        return;
217
0
      }
218
219
635
      int col_idx = schema_.find_column_by_id(ColumnId(condition.operands(0).column_id()));
220
221
      // Skip any non-range columns.
222
635
      if (!schema_.is_range_column(col_idx)) {
223
0
        return;
224
0
      }
225
226
635
      SortingType sortingType = schema_.column(col_idx).sorting_type();
227
635
      range_options_indexes_.emplace_back(condition.operands(0).column_id());
228
229
635
      if (condition.op() == QL_OP_EQUAL) {
230
7
        auto pv = PrimitiveValue::FromQLValuePB(condition.operands(1).value(), sortingType);
231
7
        (*range_options_)[col_idx - num_hash_cols].push_back(std::move(pv));
232
628
      } else { // QL_OP_IN
233
628
        DCHECK_EQ(condition.op(), QL_OP_IN);
234
628
        DCHECK(condition.operands(1).value().has_list_value());
235
628
        const auto &options = condition.operands(1).value().list_value();
236
628
        int opt_size = options.elems_size();
237
628
        (*range_options_)[col_idx - num_hash_cols].reserve(opt_size);
238
239
        // IN arguments should have been de-duplicated and ordered ascendingly by the executor.
240
628
        bool is_reverse_order = is_forward_scan_ ^ (sortingType == SortingType::kAscending ||
241
628
            sortingType == SortingType::kAscendingNullsLast);
242
2.27k
        for (int i = 0; i < opt_size; i++) {
243
1.33k
          int elem_idx = is_reverse_order ? opt_size - i - 1 : i;
244
1.64k
          const auto &elem = options.elems(elem_idx);
245
1.64k
          auto pv = PrimitiveValue::FromQLValuePB(elem, sortingType);
246
1.64k
          (*range_options_)[col_idx - num_hash_cols].push_back(std::move(pv));
247
1.64k
        }
248
628
      }
249
250
635
      break;
251
635
    }
252
253
24
    default:
254
      // We don't support any other operators at this level.
255
24
      break;
256
1.27k
  }
257
1.27k
}
258
259
2.64M
KeyBytes DocPgsqlScanSpec::bound_key(const Schema& schema, const bool lower_bound) const {
260
2.64M
  KeyBytes result;
261
2.64M
  auto encoder = DocKeyEncoder(&result).Schema(schema);
262
263
2.64M
  bool has_hash_columns = schema_.num_hash_key_columns() > 0;
264
2.64M
  bool hash_components_unset = has_hash_columns && hashed_components_->empty();
265
2.64M
  if (hash_components_unset) {
266
    // use lower bound hash code if set in request (for scans using token)
267
11.8k
    if (lower_bound && hash_code_) {
268
5.92k
      encoder.HashAndRange(*hash_code_,
269
5.92k
      {PrimitiveValue(ValueType::kLowest)},
270
5.92k
      {PrimitiveValue(ValueType::kLowest)});
271
5.92k
    }
272
    // use upper bound hash code if set in request (for scans using token)
273
11.8k
    if (!lower_bound) {
274
5.92k
      if (max_hash_code_) {
275
742
        encoder.HashAndRange(*max_hash_code_,
276
742
        {PrimitiveValue(ValueType::kHighest)},
277
742
        {PrimitiveValue(ValueType::kHighest)});
278
5.18k
      } else {
279
5.18k
        result.AppendValueTypeBeforeGroupEnd(ValueType::kHighest);
280
5.18k
      }
281
5.92k
    }
282
11.8k
    return result;
283
11.8k
  }
284
285
2.63M
  if (has_hash_columns) {
286
1.85M
    uint16_t hash = lower_bound
287
930k
        ? hash_code_.get_value_or(std::numeric_limits<DocKeyHash>::min())
288
929k
        : max_hash_code_.get_value_or(std::numeric_limits<DocKeyHash>::max());
289
290
1.85M
    encoder.HashAndRange(hash, *hashed_components_, range_components(lower_bound));
291
773k
  } else {
292
    // If no hash columns use default hash code (0).
293
773k
    encoder.Hash(false, 0, *hashed_components_).Range(range_components(lower_bound));
294
773k
  }
295
2.63M
  return result;
296
2.63M
}
297
298
11.2M
std::vector<PrimitiveValue> DocPgsqlScanSpec::range_components(const bool lower_bound) const {
299
11.2M
  return GetRangeKeyScanSpec(schema_, range_components_, range_bounds_.get(), lower_bound);
300
11.2M
}
301
302
// Return inclusive lower/upper range doc key considering the start_doc_key.
303
8.65M
Result<KeyBytes> DocPgsqlScanSpec::Bound(const bool lower_bound) const {
304
8.65M
  if (start_doc_key_.empty()) {
305
4.30M
    return lower_bound ? lower_doc_key_ : upper_doc_key_;
306
8.60M
  }
307
308
  // When paging state is present, start_doc_key_ should have been provided, and the scan starting
309
  // point should be start_doc_key_ instead of the initial bounds.
310
51.0k
  if (start_doc_key_ < lower_doc_key_ || start_doc_key_ > upper_doc_key_) {
311
0
    return STATUS_FORMAT(Corruption, "Invalid start_doc_key: $0. Range: $1, $2",
312
0
                         start_doc_key_, lower_doc_key_, upper_doc_key_);
313
0
  }
314
315
  // Paging state + forward scan.
316
51.0k
  if (is_forward_scan_) {
317
23.6k
    return lower_bound ? start_doc_key_ : upper_doc_key_;
318
47.3k
  }
319
320
  // Paging state + reverse scan.
321
3.75k
  if (lower_bound) {
322
0
    return lower_doc_key_;
323
0
  }
324
325
  // If using start_doc_key_ as upper bound append +inf as extra component to ensure it includes
326
  // the target start_doc_key itself (dockey + suffix < dockey + kHighest).
327
  // For lower bound, this is true already, because dockey + suffix is > dockey.
328
3.75k
  KeyBytes result = start_doc_key_;
329
3.75k
  result.AppendValueTypeBeforeGroupEnd(ValueType::kHighest);
330
3.75k
  return result;
331
3.75k
}
332
333
4.32M
std::shared_ptr<rocksdb::ReadFileFilter> DocPgsqlScanSpec::CreateFileFilter() const {
334
4.32M
  auto lower_bound = range_components(true);
335
4.32M
  auto upper_bound = range_components(false);
336
4.32M
  if (lower_bound.empty() && upper_bound.empty()) {
337
0
    return std::shared_ptr<rocksdb::ReadFileFilter>();
338
4.32M
  } else {
339
4.32M
    return std::make_shared<PgsqlRangeBasedFileFilter>(std::move(lower_bound),
340
4.32M
                                                       std::move(upper_bound));
341
4.32M
  }
342
4.32M
}
343
344
4.33M
Result<KeyBytes> DocPgsqlScanSpec::LowerBound() const {
345
4.33M
  return Bound(true /* lower_bound */);
346
4.33M
}
347
348
4.32M
Result<KeyBytes> DocPgsqlScanSpec::UpperBound() const {
349
4.32M
  return Bound(false /* upper_bound */);
350
4.32M
}
351
352
2.68M
const DocKey& DocPgsqlScanSpec::DefaultStartDocKey() {
353
2.68M
  static const DocKey result;
354
2.68M
  return result;
355
2.68M
}
356
357
}  // namespace docdb
358
}  // namespace yb