YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/document/document_db.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
23
#include "yb/rocksdb/cache.h"
24
#include "yb/rocksdb/comparator.h"
25
#include "yb/rocksdb/db.h"
26
#include "yb/rocksdb/table.h"
27
#include "yb/rocksdb/util/coding.h"
28
#include "yb/rocksdb/util/mutexlock.h"
29
#include "yb/rocksdb/utilities/document_db.h"
30
31
namespace rocksdb {
32
33
// IMPORTANT NOTE: Secondary index column families should be very small and
34
// generally fit in memory. Assume that accessing secondary index column
35
// families is much faster than accessing primary index (data heap) column
36
// family. Accessing a key (i.e. checking for existance) from a column family in
37
// RocksDB is not much faster than accessing both key and value since they are
38
// kept together and loaded from storage together.
39
40
namespace {
41
// < 0   <=>  lhs < rhs
42
// == 0  <=>  lhs == rhs
43
// > 0   <=>  lhs == rhs
44
// TODO(icanadi) move this to JSONDocument?
45
172
int DocumentCompare(const JSONDocument& lhs, const JSONDocument& rhs) {
46
172
  assert(lhs.IsObject() == false && rhs.IsObject() == false &&
47
172
         lhs.type() == rhs.type());
48
49
172
  switch (lhs.type()) {
50
0
    case JSONDocument::kNull:
51
0
      return 0;
52
0
    case JSONDocument::kBool:
53
0
      return static_cast<int>(lhs.GetBool()) - static_cast<int>(rhs.GetBool());
54
34
    case JSONDocument::kDouble: {
55
34
      double res = lhs.GetDouble() - rhs.GetDouble();
56
34
      return res == 0.0 ? 0 : (res < 0.0 ? -1 : 1);
57
0
    }
58
77
    case JSONDocument::kInt64: {
59
77
      int64_t res = lhs.GetInt64() - rhs.GetInt64();
60
59
      return res == 0 ? 0 : (res < 0 ? -1 : 1);
61
0
    }
62
61
    case JSONDocument::kString:
63
61
      return Slice(lhs.GetString()).compare(Slice(rhs.GetString()));
64
0
    default:
65
0
      assert(false);
66
172
  }
67
0
  return 0;
68
172
}
69
}  // namespace
70
71
class Filter {
72
 public:
73
  // returns nullptr on parse failure
74
  static Filter* ParseFilter(const JSONDocument& filter);
75
76
  struct Interval {
77
    JSONDocument upper_bound;
78
    JSONDocument lower_bound;
79
    bool upper_inclusive;
80
    bool lower_inclusive;
81
    Interval()
82
        : upper_bound(),
83
          lower_bound(),
84
          upper_inclusive(false),
85
17
          lower_inclusive(false) {}
86
    Interval(const JSONDocument& ub, const JSONDocument& lb, bool ui, bool li)
87
        : upper_bound(ub),
88
          lower_bound(lb),
89
          upper_inclusive(ui),
90
5
          lower_inclusive(li) {
91
5
    }
92
93
    void UpdateUpperBound(const JSONDocument& ub, bool inclusive);
94
    void UpdateLowerBound(const JSONDocument& lb, bool inclusive);
95
  };
96
97
  bool SatisfiesFilter(const JSONDocument& document) const;
98
  const Interval* GetInterval(const std::string& field) const;
99
100
 private:
101
19
  explicit Filter(const JSONDocument& filter) : filter_(filter.Copy()) {
102
19
    assert(filter_.IsOwner());
103
19
  }
104
105
  // copied from the parameter
106
  const JSONDocument filter_;
107
  // constant after construction
108
  std::unordered_map<std::string, Interval> intervals_;
109
};
110
111
void Filter::Interval::UpdateUpperBound(const JSONDocument& ub,
112
13
                                        bool inclusive) {
113
13
  bool update = upper_bound.IsNull();
114
13
  if (!update) {
115
1
    int cmp = DocumentCompare(upper_bound, ub);
116
1
    update = (cmp > 0) || (cmp == 0 && !inclusive);
117
1
  }
118
13
  if (update) {
119
13
    upper_bound = ub;
120
13
    upper_inclusive = inclusive;
121
13
  }
122
13
}
123
124
void Filter::Interval::UpdateLowerBound(const JSONDocument& lb,
125
14
                                        bool inclusive) {
126
14
  bool update = lower_bound.IsNull();
127
14
  if (!update) {
128
1
    int cmp = DocumentCompare(lower_bound, lb);
129
1
    update = (cmp < 0) || (cmp == 0 && !inclusive);
130
1
  }
131
14
  if (update) {
132
14
    lower_bound = lb;
133
14
    lower_inclusive = inclusive;
134
14
  }
135
14
}
136
137
19
Filter* Filter::ParseFilter(const JSONDocument& filter) {
138
19
  if (filter.IsObject() == false) {
139
0
    return nullptr;
140
0
  }
141
142
19
  std::unique_ptr<Filter> f(new Filter(filter));
143
144
37
  for (const auto& items : f->filter_.Items()) {
145
37
    if (items.first.size() && items.first[0] == '$') {
146
      // fields starting with '$' are commands
147
15
      continue;
148
15
    }
149
22
    assert(f->intervals_.find(items.first) == f->intervals_.end());
150
22
    if (items.second.IsObject()) {
151
17
      if (items.second.Count() == 0) {
152
        // uhm...?
153
0
        return nullptr;
154
0
      }
155
17
      Interval interval;
156
27
      for (const auto& condition : items.second.Items()) {
157
27
        if (condition.second.IsObject() || condition.second.IsArray()) {
158
          // comparison operators not defined on objects. invalid array
159
0
          return nullptr;
160
0
        }
161
        // comparison operators:
162
27
        if (condition.first == "$gt") {
163
10
          interval.UpdateLowerBound(condition.second, false);
164
17
        } else if (condition.first == "$gte") {
165
4
          interval.UpdateLowerBound(condition.second, true);
166
13
        } else if (condition.first == "$lt") {
167
10
          interval.UpdateUpperBound(condition.second, false);
168
3
        } else if (condition.first == "$lte") {
169
3
          interval.UpdateUpperBound(condition.second, true);
170
0
        } else {
171
          // TODO(icanadi) more logical operators
172
0
          return nullptr;
173
0
        }
174
27
      }
175
17
      f->intervals_.insert({items.first, interval});
176
5
    } else {
177
      // equality
178
5
      f->intervals_.insert(
179
5
          {items.first, Interval(items.second,
180
5
                                 items.second, true, true)});
181
5
    }
182
22
  }
183
184
19
  return f.release();
185
19
}
186
187
101
const Filter::Interval* Filter::GetInterval(const std::string& field) const {
188
101
  auto itr = intervals_.find(field);
189
101
  if (itr == intervals_.end()) {
190
0
    return nullptr;
191
0
  }
192
  // we can do that since intervals_ is constant after construction
193
101
  return &itr->second;
194
101
}
195
196
94
bool Filter::SatisfiesFilter(const JSONDocument& document) const {
197
112
  for (const auto& interval : intervals_) {
198
112
    if (!document.Contains(interval.first)) {
199
      // doesn't have the value, doesn't satisfy the filter
200
      // (we don't support null queries yet)
201
0
      return false;
202
0
    }
203
112
    auto value = document[interval.first];
204
112
    if (!interval.second.upper_bound.IsNull()) {
205
84
      if (value.type() != interval.second.upper_bound.type()) {
206
        // no cross-type queries yet
207
        // TODO(icanadi) do this at least for numbers!
208
0
        return false;
209
0
      }
210
84
      int cmp = DocumentCompare(interval.second.upper_bound, value);
211
84
      if (cmp < 0 || (cmp == 0 && interval.second.upper_inclusive == false)) {
212
        // bigger (or equal) than upper bound
213
17
        return false;
214
17
      }
215
95
    }
216
95
    if (!interval.second.lower_bound.IsNull()) {
217
86
      if (value.type() != interval.second.lower_bound.type()) {
218
        // no cross-type queries yet
219
0
        return false;
220
0
      }
221
86
      int cmp = DocumentCompare(interval.second.lower_bound, value);
222
86
      if (cmp > 0 || (cmp == 0 && interval.second.lower_inclusive == false)) {
223
        // smaller (or equal) than the lower bound
224
27
        return false;
225
27
      }
226
86
    }
227
95
  }
228
50
  return true;
229
94
}
230
231
class Index {
232
 public:
233
5
  Index() = default;
234
5
  virtual ~Index() {}
235
236
  virtual const char* Name() const = 0;
237
238
  // Functions that are executed during write time
239
  // ---------------------------------------------
240
  // GetIndexKey() generates a key that will be used to index document and
241
  // returns the key though the second std::string* parameter
242
  virtual void GetIndexKey(const JSONDocument& document,
243
                           std::string* key) const = 0;
244
  // Keys generated with GetIndexKey() will be compared using this comparator.
245
  // It should be assumed that there will be a suffix added to the index key
246
  // according to IndexKey implementation
247
  virtual const Comparator* GetComparator() const = 0;
248
249
  // Functions that are executed during query time
250
  // ---------------------------------------------
251
  enum Direction {
252
    kForwards,
253
    kBackwards,
254
  };
255
  // Returns true if this index can provide some optimization for satisfying
256
  // filter. False otherwise
257
  virtual bool UsefulIndex(const Filter& filter) const = 0;
258
  // For every filter (assuming UsefulIndex()) there is a continuous interval of
259
  // keys in the index that satisfy the index conditions. That interval can be
260
  // three things:
261
  // * [A, B]
262
  // * [A, infinity>
263
  // * <-infinity, B]
264
  //
265
  // Query engine that uses this Index for optimization will access the interval
266
  // by first calling Position() and then iterating in the Direction (returned
267
  // by Position()) while ShouldContinueLooking() is true.
268
  // * For [A, B] interval Position() will Seek() to A and return kForwards.
269
  // ShouldContinueLooking() will be true until the iterator value gets beyond B
270
  // -- then it will return false
271
  // * For [A, infinity> Position() will Seek() to A and return kForwards.
272
  // ShouldContinueLooking() will always return true
273
  // * For <-infinity, B] Position() will Seek() to B and return kBackwards.
274
  // ShouldContinueLooking() will always return true (given that iterator is
275
  // advanced by calling Prev())
276
  virtual Direction Position(const Filter& filter,
277
                             Iterator* iterator) const = 0;
278
  virtual bool ShouldContinueLooking(const Filter& filter,
279
                                     const Slice& secondary_key,
280
                                     Direction direction) const = 0;
281
282
  // Static function that is executed when Index is created
283
  // ---------------------------------------------
284
  // Create Index from user-supplied description. Return nullptr on parse
285
  // failure.
286
  static Index* CreateIndexFromDescription(const JSONDocument& description,
287
                                           const std::string& name);
288
289
 private:
290
  // No copying allowed
291
  Index(const Index&);
292
  void operator=(const Index&);
293
};
294
295
// Encoding helper function
296
namespace {
297
5
std::string InternalSecondaryIndexName(const std::string& user_name) {
298
5
  return "index_" + user_name;
299
5
}
300
301
// Don't change these, they are persisted in secondary indexes
302
enum JSONPrimitivesEncoding : char {
303
  kNull = 0x1,
304
  kBool = 0x2,
305
  kDouble = 0x3,
306
  kInt64 = 0x4,
307
  kString = 0x5,
308
};
309
310
// encodes simple JSON members (meaning string, integer, etc)
311
// the end result of this will be lexicographically compared to each other
312
182
bool EncodeJSONPrimitive(const JSONDocument& json, std::string* dst) {
313
  // TODO(icanadi) revise this at some point, have a custom comparator
314
182
  switch (json.type()) {
315
0
    case JSONDocument::kNull:
316
0
      dst->push_back(kNull);
317
0
      break;
318
0
    case JSONDocument::kBool:
319
0
      dst->push_back(kBool);
320
0
      dst->push_back(static_cast<char>(json.GetBool()));
321
0
      break;
322
37
    case JSONDocument::kDouble:
323
37
      dst->push_back(kDouble);
324
37
      PutFixed64(dst, static_cast<uint64_t>(json.GetDouble()));
325
37
      break;
326
100
    case JSONDocument::kInt64:
327
100
      dst->push_back(kInt64);
328
100
      {
329
100
        auto val = json.GetInt64();
330
95
        dst->push_back((val < 0) ? '0' : '1');
331
100
        PutFixed64(dst, static_cast<uint64_t>(val));
332
100
      }
333
100
      break;
334
45
    case JSONDocument::kString:
335
45
      dst->push_back(kString);
336
45
      dst->append(json.GetString());
337
45
      break;
338
0
    default:
339
0
      return false;
340
182
  }
341
182
  return true;
342
182
}
343
344
}  // namespace
345
346
// format of the secondary key is:
347
// <secondary_key><primary_key><offset_of_primary_key uint32_t>
348
class IndexKey {
349
 public:
350
15
  IndexKey() : ok_(false) {}
351
71
  explicit IndexKey(const Slice& slice) {
352
71
    if (slice.size() < sizeof(uint32_t)) {
353
0
      ok_ = false;
354
0
      return;
355
0
    }
356
71
    uint32_t primary_key_offset =
357
71
        DecodeFixed32(slice.data() + slice.size() - sizeof(uint32_t));
358
71
    if (primary_key_offset >= slice.size() - sizeof(uint32_t)) {
359
0
      ok_ = false;
360
0
      return;
361
0
    }
362
71
    parts_[0] = Slice(slice.data(), primary_key_offset);
363
71
    parts_[1] = Slice(slice.data() + primary_key_offset,
364
71
                      slice.size() - primary_key_offset - sizeof(uint32_t));
365
71
    ok_ = true;
366
71
  }
367
70
  IndexKey(const Slice& secondary_key, const Slice& primary_key) : ok_(true) {
368
70
    parts_[0] = secondary_key;
369
70
    parts_[1] = primary_key;
370
70
  }
371
372
70
  SliceParts GetSliceParts() {
373
70
    uint32_t primary_key_offset = static_cast<uint32_t>(parts_[0].size());
374
70
    EncodeFixed32(primary_key_offset_buf_, primary_key_offset);
375
70
    parts_[2] = Slice(primary_key_offset_buf_, sizeof(uint32_t));
376
70
    return SliceParts(parts_, 3);
377
70
  }
378
379
63
  const Slice& GetPrimaryKey() const { return parts_[1]; }
380
71
  const Slice& GetSecondaryKey() const { return parts_[0]; }
381
382
71
  bool ok() const { return ok_; }
383
384
 private:
385
  bool ok_;
386
  // 0 -- secondary key
387
  // 1 -- primary key
388
  // 2 -- primary key offset
389
  Slice parts_[3];
390
  char primary_key_offset_buf_[sizeof(uint32_t)];
391
};
392
393
class SimpleSortedIndex : public Index {
394
 public:
395
  SimpleSortedIndex(const std::string& field, const std::string& name)
396
5
      : field_(field), name_(name) {}
397
398
1
  const char* Name() const override { return name_.c_str(); }
399
400
  virtual void GetIndexKey(const JSONDocument& document, std::string* key) const
401
94
      override {
402
94
    if (!document.Contains(field_)) {
403
0
      if (!EncodeJSONPrimitive(JSONDocument(JSONDocument::kNull), key)) {
404
0
        assert(false);
405
0
      }
406
94
    } else {
407
94
      if (!EncodeJSONPrimitive(document[field_], key)) {
408
0
        assert(false);
409
0
      }
410
94
    }
411
94
  }
412
0
  const Comparator* GetComparator() const override {
413
0
    return BytewiseComparator();
414
0
  }
415
416
15
  bool UsefulIndex(const Filter& filter) const override {
417
15
    return filter.GetInterval(field_) != nullptr;
418
15
  }
419
  // REQUIRES: UsefulIndex(filter) == true
420
  virtual Direction Position(const Filter& filter,
421
15
                             Iterator* iterator) const override {
422
15
    auto interval = filter.GetInterval(field_);
423
15
    assert(interval != nullptr);  // because index is useful
424
15
    Direction direction;
425
426
15
    const JSONDocument* limit;
427
15
    if (!interval->lower_bound.IsNull()) {
428
12
      limit = &(interval->lower_bound);
429
12
      direction = kForwards;
430
3
    } else {
431
3
      limit = &(interval->upper_bound);
432
3
      direction = kBackwards;
433
3
    }
434
435
15
    std::string encoded_limit;
436
15
    if (!EncodeJSONPrimitive(*limit, &encoded_limit)) {
437
0
      assert(false);
438
0
    }
439
15
    iterator->Seek(Slice(encoded_limit));
440
441
15
    return direction;
442
15
  }
443
  // REQUIRES: UsefulIndex(filter) == true
444
  virtual bool ShouldContinueLooking(
445
      const Filter& filter, const Slice& secondary_key,
446
71
      Index::Direction direction) const override {
447
71
    auto interval = filter.GetInterval(field_);
448
71
    assert(interval != nullptr);  // because index is useful
449
71
    if (direction == kForwards) {
450
62
      if (interval->upper_bound.IsNull()) {
451
        // continue looking, no upper bound
452
20
        return true;
453
20
      }
454
42
      std::string encoded_upper_bound;
455
42
      if (!EncodeJSONPrimitive(interval->upper_bound, &encoded_upper_bound)) {
456
        // uhm...?
457
        // TODO(icanadi) store encoded upper and lower bounds in Filter*?
458
0
        assert(false);
459
0
      }
460
      // TODO(icanadi) we need to somehow decode this and use DocumentCompare()
461
42
      int compare = secondary_key.compare(Slice(encoded_upper_bound));
462
      // if (current key is bigger than upper bound) OR (current key is equal to
463
      // upper bound, but inclusive is false) THEN stop looking. otherwise,
464
      // continue
465
42
      return (compare > 0 ||
466
35
              (compare == 0 && interval->upper_inclusive == false))
467
8
                 ? false
468
34
                 : true;
469
9
    } else {
470
9
      assert(direction == kBackwards);
471
9
      if (interval->lower_bound.IsNull()) {
472
        // continue looking, no lower bound
473
9
        return true;
474
9
      }
475
0
      std::string encoded_lower_bound;
476
0
      if (!EncodeJSONPrimitive(interval->lower_bound, &encoded_lower_bound)) {
477
        // uhm...?
478
        // TODO(icanadi) store encoded upper and lower bounds in Filter*?
479
0
        assert(false);
480
0
      }
481
      // TODO(icanadi) we need to somehow decode this and use DocumentCompare()
482
0
      int compare = secondary_key.compare(Slice(encoded_lower_bound));
483
      // if (current key is smaller than lower bound) OR (current key is equal
484
      // to lower bound, but inclusive is false) THEN stop looking. otherwise,
485
      // continue
486
0
      return (compare < 0 ||
487
0
              (compare == 0 && interval->lower_inclusive == false))
488
0
                 ? false
489
0
                 : true;
490
0
    }
491
492
0
    assert(false);
493
    // this is here just so compiler doesn't complain
494
0
    return false;
495
0
  }
496
497
 private:
498
  std::string field_;
499
  std::string name_;
500
};
501
502
Index* Index::CreateIndexFromDescription(const JSONDocument& description,
503
5
                                         const std::string& name) {
504
5
  if (!description.IsObject() || description.Count() != 1) {
505
    // not supported yet
506
0
    return nullptr;
507
0
  }
508
5
  const auto& field = *description.Items().begin();
509
5
  if (field.second.IsInt64() == false || field.second.GetInt64() != 1) {
510
    // not supported yet
511
0
    return nullptr;
512
0
  }
513
5
  return new SimpleSortedIndex(field.first, name);
514
5
}
515
516
class CursorWithFilterIndexed : public Cursor {
517
 public:
518
  CursorWithFilterIndexed(Iterator* primary_index_iter,
519
                          Iterator* secondary_index_iter, const Index* index,
520
                          const Filter* filter)
521
      : primary_index_iter_(primary_index_iter),
522
        secondary_index_iter_(secondary_index_iter),
523
        index_(index),
524
        filter_(filter),
525
        valid_(true),
526
15
        current_json_document_(nullptr) {
527
15
    assert(filter_.get() != nullptr);
528
15
    direction_ = index->Position(*filter_.get(), secondary_index_iter_.get());
529
15
    UpdateIndexKey();
530
15
    AdvanceUntilSatisfies();
531
15
  }
532
533
211
  bool Valid() const override {
534
211
    return valid_ && secondary_index_iter_->Valid();
535
211
  }
536
42
  void Next() override {
537
42
    assert(Valid());
538
42
    Advance();
539
42
    AdvanceUntilSatisfies();
540
42
  }
541
  // temporary object. copy it if you want to use it
542
77
  const JSONDocument& document() const override {
543
77
    assert(Valid());
544
77
    return *current_json_document_;
545
77
  }
546
15
  Status status() const override {
547
15
    if (!status_.ok()) {
548
0
      return status_;
549
0
    }
550
15
    if (!primary_index_iter_->status().ok()) {
551
0
      return primary_index_iter_->status();
552
0
    }
553
15
    return secondary_index_iter_->status();
554
15
  }
555
556
 private:
557
63
  void Advance() {
558
63
    if (direction_ == Index::kForwards) {
559
54
      secondary_index_iter_->Next();
560
9
    } else {
561
9
      secondary_index_iter_->Prev();
562
9
    }
563
63
    UpdateIndexKey();
564
63
  }
565
57
  void AdvanceUntilSatisfies() {
566
57
    bool found = false;
567
78
    while (secondary_index_iter_->Valid() &&
568
71
           index_->ShouldContinueLooking(
569
63
               *filter_.get(), index_key_.GetSecondaryKey(), direction_)) {
570
63
      if (!UpdateJSONDocument()) {
571
        // corruption happened
572
0
        return;
573
0
      }
574
63
      if (filter_->SatisfiesFilter(*current_json_document_)) {
575
        // we found satisfied!
576
42
        found = true;
577
42
        break;
578
21
      } else {
579
        // doesn't satisfy :(
580
21
        Advance();
581
21
      }
582
63
    }
583
57
    if (!found) {
584
15
      valid_ = false;
585
15
    }
586
57
  }
587
588
63
  bool UpdateJSONDocument() {
589
63
    assert(secondary_index_iter_->Valid());
590
63
    primary_index_iter_->Seek(index_key_.GetPrimaryKey());
591
63
    if (!primary_index_iter_->Valid()) {
592
0
      status_ = STATUS(Corruption,
593
0
          "Inconsistency between primary and secondary index");
594
0
      valid_ = false;
595
0
      return false;
596
0
    }
597
63
    current_json_document_.reset(
598
63
        JSONDocument::Deserialize(primary_index_iter_->value()));
599
63
    assert(current_json_document_->IsOwner());
600
63
    if (current_json_document_.get() == nullptr) {
601
0
      status_ = STATUS(Corruption, "JSON deserialization failed");
602
0
      valid_ = false;
603
0
      return false;
604
0
    }
605
63
    return true;
606
63
  }
607
78
  void UpdateIndexKey() {
608
78
    if (secondary_index_iter_->Valid()) {
609
71
      index_key_ = IndexKey(secondary_index_iter_->key());
610
71
      if (!index_key_.ok()) {
611
0
        status_ = STATUS(Corruption, "Invalid index key");
612
0
        valid_ = false;
613
0
      }
614
71
    }
615
78
  }
616
  std::unique_ptr<Iterator> primary_index_iter_;
617
  std::unique_ptr<Iterator> secondary_index_iter_;
618
  // we don't own index_
619
  const Index* index_;
620
  Index::Direction direction_;
621
  std::unique_ptr<const Filter> filter_;
622
  bool valid_;
623
  IndexKey index_key_;
624
  std::unique_ptr<JSONDocument> current_json_document_;
625
  Status status_;
626
};
627
628
class CursorFromIterator : public Cursor {
629
 public:
630
  explicit CursorFromIterator(Iterator* iter)
631
9
      : iter_(iter), current_json_document_(nullptr) {
632
9
    iter_->SeekToFirst();
633
9
    UpdateCurrentJSON();
634
9
  }
635
636
158
  bool Valid() const override { return status_.ok() && iter_->Valid(); }
637
32
  void Next() override {
638
32
    iter_->Next();
639
32
    UpdateCurrentJSON();
640
32
  }
641
43
  const JSONDocument& document() const override {
642
43
    assert(Valid());
643
43
    return *current_json_document_;
644
43
  };
645
16
  Status status() const override {
646
16
    if (!status_.ok()) {
647
0
      return status_;
648
0
    }
649
16
    return iter_->status();
650
16
  }
651
652
  // not part of public Cursor interface
653
0
  Slice key() const { return iter_->key(); }
654
655
 private:
656
41
  void UpdateCurrentJSON() {
657
41
    if (Valid()) {
658
32
      current_json_document_.reset(JSONDocument::Deserialize(iter_->value()));
659
32
      if (current_json_document_.get() == nullptr) {
660
0
        status_ = STATUS(Corruption, "JSON deserialization failed");
661
0
      }
662
32
    }
663
41
  }
664
665
  Status status_;
666
  std::unique_ptr<Iterator> iter_;
667
  std::unique_ptr<JSONDocument> current_json_document_;
668
};
669
670
class CursorWithFilter : public Cursor {
671
 public:
672
  CursorWithFilter(Cursor* base_cursor, const Filter* filter)
673
4
      : base_cursor_(base_cursor), filter_(filter) {
674
4
    assert(filter_.get() != nullptr);
675
4
    SeekToNextSatisfies();
676
4
  }
677
32
  bool Valid() const override { return base_cursor_->Valid(); }
678
8
  void Next() override {
679
8
    assert(Valid());
680
8
    base_cursor_->Next();
681
8
    SeekToNextSatisfies();
682
8
  }
683
10
  const JSONDocument& document() const override {
684
10
    assert(Valid());
685
10
    return base_cursor_->document();
686
10
  }
687
12
  Status status() const override { return base_cursor_->status(); }
688
689
 private:
690
12
  void SeekToNextSatisfies() {
691
35
    for (; base_cursor_->Valid(); base_cursor_->Next()) {
692
31
      if (filter_->SatisfiesFilter(base_cursor_->document())) {
693
8
        break;
694
8
      }
695
31
    }
696
12
  }
697
  std::unique_ptr<Cursor> base_cursor_;
698
  std::unique_ptr<const Filter> filter_;
699
};
700
701
class CursorError : public Cursor {
702
 public:
703
0
  explicit CursorError(Status s) : s_(s) { assert(!s.ok()); }
704
0
  Status status() const override { return s_; }
705
0
  bool Valid() const override { return false; }
706
0
  void Next() override {}
707
0
  const JSONDocument& document() const override {
708
0
    assert(false);
709
    // compiler complains otherwise
710
0
    return trash_;
711
0
  }
712
713
 private:
714
  Status s_;
715
  JSONDocument trash_;
716
};
717
718
class DocumentDBImpl : public DocumentDB {
719
 public:
720
  DocumentDBImpl(
721
      DB* db, ColumnFamilyHandle* primary_key_column_family,
722
      const std::vector<std::pair<Index*, ColumnFamilyHandle*>>& indexes,
723
      const Options& rocksdb_options)
724
      : DocumentDB(db),
725
        primary_key_column_family_(primary_key_column_family),
726
3
        rocksdb_options_(rocksdb_options) {
727
1
    for (const auto& index : indexes) {
728
1
      name_to_index_.insert(
729
1
          {index.first->Name(), IndexColumnFamily(index.first, index.second)});
730
1
    }
731
3
  }
732
733
3
  ~DocumentDBImpl() {
734
4
    for (auto& iter : name_to_index_) {
735
4
      delete iter.second.index;
736
4
      delete iter.second.column_family;
737
4
    }
738
3
    delete primary_key_column_family_;
739
3
  }
740
741
  virtual Status CreateIndex(const WriteOptions& write_options,
742
4
                             const IndexDescriptor& index) override {
743
4
    auto index_obj =
744
4
        Index::CreateIndexFromDescription(*index.description, index.name);
745
4
    if (index_obj == nullptr) {
746
0
      return STATUS(InvalidArgument, "Failed parsing index description");
747
0
    }
748
749
4
    ColumnFamilyHandle* cf_handle;
750
4
    Status s =
751
4
        CreateColumnFamily(ColumnFamilyOptions(rocksdb_options_),
752
4
                           InternalSecondaryIndexName(index.name), &cf_handle);
753
4
    if (!s.ok()) {
754
0
      delete index_obj;
755
0
      return s;
756
0
    }
757
758
4
    MutexLock l(&write_mutex_);
759
760
4
    std::unique_ptr<CursorFromIterator> cursor(new CursorFromIterator(
761
4
        DocumentDB::NewIterator(ReadOptions(), primary_key_column_family_)));
762
763
4
    WriteBatch batch;
764
4
    for (; cursor->Valid(); cursor->Next()) {
765
0
      std::string secondary_index_key;
766
0
      index_obj->GetIndexKey(cursor->document(), &secondary_index_key);
767
0
      IndexKey index_key(Slice(secondary_index_key), cursor->key());
768
0
      batch.Put(cf_handle, index_key.GetSliceParts(), SliceParts());
769
0
    }
770
771
4
    if (!cursor->status().ok()) {
772
0
      delete index_obj;
773
0
      return cursor->status();
774
0
    }
775
776
4
    {
777
4
      MutexLock l_nti(&name_to_index_mutex_);
778
4
      name_to_index_.insert(
779
4
          {index.name, IndexColumnFamily(index_obj, cf_handle)});
780
4
    }
781
782
4
    return DocumentDB::Write(write_options, &batch);
783
4
  }
784
785
2
  Status DropIndex(const std::string& name) override {
786
2
    MutexLock l(&write_mutex_);
787
788
2
    auto index_iter = name_to_index_.find(name);
789
2
    if (index_iter == name_to_index_.end()) {
790
1
      return STATUS(InvalidArgument, "No such index");
791
1
    }
792
793
1
    Status s = DropColumnFamily(index_iter->second.column_family);
794
1
    if (!s.ok()) {
795
0
      return s;
796
0
    }
797
798
1
    delete index_iter->second.index;
799
1
    delete index_iter->second.column_family;
800
801
    // remove from name_to_index_
802
1
    {
803
1
      MutexLock l_nti(&name_to_index_mutex_);
804
1
      name_to_index_.erase(index_iter);
805
1
    }
806
807
1
    return Status::OK();
808
1
  }
809
810
  virtual Status Insert(const WriteOptions& options,
811
18
                        const JSONDocument& document) override {
812
18
    WriteBatch batch;
813
814
18
    if (!document.IsObject()) {
815
0
      return STATUS(InvalidArgument, "Document not an object");
816
0
    }
817
18
    if (!document.Contains(kPrimaryKey)) {
818
0
      return STATUS(InvalidArgument, "No primary key");
819
0
    }
820
18
    auto primary_key = document[kPrimaryKey];
821
18
    if (primary_key.IsNull() ||
822
18
        (!primary_key.IsString() && !primary_key.IsInt64())) {
823
0
      return STATUS(InvalidArgument,
824
0
          "Primary key format error");
825
0
    }
826
18
    std::string encoded_document;
827
18
    document.Serialize(&encoded_document);
828
18
    std::string primary_key_encoded;
829
18
    if (!EncodeJSONPrimitive(primary_key, &primary_key_encoded)) {
830
      // previous call should be guaranteed to pass because of all primary_key
831
      // conditions checked before
832
0
      assert(false);
833
0
    }
834
18
    Slice primary_key_slice(primary_key_encoded);
835
836
    // Lock now, since we're starting DB operations
837
18
    MutexLock l(&write_mutex_);
838
    // check if there is already a document with the same primary key
839
18
    std::string value;
840
18
    Status s = DocumentDB::Get(ReadOptions(), primary_key_column_family_,
841
18
                               primary_key_slice, &value);
842
18
    if (!s.IsNotFound()) {
843
1
      return s.ok() ? STATUS(InvalidArgument, "Duplicate primary key!") : s;
844
1
    }
845
846
17
    batch.Put(primary_key_column_family_, primary_key_slice, encoded_document);
847
848
43
    for (const auto& iter : name_to_index_) {
849
43
      std::string secondary_index_key;
850
43
      iter.second.index->GetIndexKey(document, &secondary_index_key);
851
43
      IndexKey index_key(Slice(secondary_index_key), primary_key_slice);
852
43
      batch.Put(iter.second.column_family, index_key.GetSliceParts(),
853
43
                SliceParts());
854
43
    }
855
856
17
    return DocumentDB::Write(options, &batch);
857
17
  }
858
859
  virtual Status Remove(const ReadOptions& read_options,
860
                        const WriteOptions& write_options,
861
2
                        const JSONDocument& query) override {
862
2
    MutexLock l(&write_mutex_);
863
2
    std::unique_ptr<Cursor> cursor(
864
2
        ConstructFilterCursor(read_options, nullptr, query));
865
866
2
    WriteBatch batch;
867
9
    for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) {
868
7
      const auto& document = cursor->document();
869
7
      if (!document.IsObject()) {
870
0
        return STATUS(Corruption, "Document corruption");
871
0
      }
872
7
      if (!document.Contains(kPrimaryKey)) {
873
0
        return STATUS(Corruption, "Document corruption");
874
0
      }
875
7
      auto primary_key = document[kPrimaryKey];
876
7
      if (primary_key.IsNull() ||
877
7
          (!primary_key.IsString() && !primary_key.IsInt64())) {
878
0
        return STATUS(Corruption, "Document corruption");
879
0
      }
880
881
      // TODO(icanadi) Instead of doing this, just get primary key encoding from
882
      // cursor, as it already has this information
883
7
      std::string primary_key_encoded;
884
7
      if (!EncodeJSONPrimitive(primary_key, &primary_key_encoded)) {
885
        // previous call should be guaranteed to pass because of all primary_key
886
        // conditions checked before
887
0
        assert(false);
888
0
      }
889
7
      Slice primary_key_slice(primary_key_encoded);
890
7
      batch.Delete(primary_key_column_family_, primary_key_slice);
891
892
15
      for (const auto& iter : name_to_index_) {
893
15
        std::string secondary_index_key;
894
15
        iter.second.index->GetIndexKey(document, &secondary_index_key);
895
15
        IndexKey index_key(Slice(secondary_index_key), primary_key_slice);
896
15
        batch.Delete(iter.second.column_family, index_key.GetSliceParts());
897
15
      }
898
7
    }
899
900
2
    if (!cursor->status().ok()) {
901
0
      return cursor->status();
902
0
    }
903
904
2
    return DocumentDB::Write(write_options, &batch);
905
2
  }
906
907
  virtual Status Update(const ReadOptions& read_options,
908
                        const WriteOptions& write_options,
909
                        const JSONDocument& filter,
910
3
                        const JSONDocument& updates) override {
911
3
    MutexLock l(&write_mutex_);
912
3
    std::unique_ptr<Cursor> cursor(
913
3
        ConstructFilterCursor(read_options, nullptr, filter));
914
915
3
    if (!updates.IsObject()) {
916
0
        return STATUS(Corruption, "Bad update document format");
917
0
    }
918
3
    WriteBatch batch;
919
9
    for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) {
920
6
      const auto& old_document = cursor->document();
921
6
      JSONDocument new_document(old_document);
922
6
      if (!new_document.IsObject()) {
923
0
        return STATUS(Corruption, "Document corruption");
924
0
      }
925
      // TODO(icanadi) Make this nicer, something like class Filter
926
10
      for (const auto& update : updates.Items()) {
927
10
        if (update.first == "$set") {
928
10
          JSONDocumentBuilder builder;
929
10
          bool res __attribute__((unused)) = builder.WriteStartObject();
930
10
          assert(res);
931
14
          for (const auto& itr : update.second.Items()) {
932
14
            if (itr.first == kPrimaryKey) {
933
0
              return STATUS(NotSupported, "Please don't change primary key");
934
0
            }
935
14
            res = builder.WriteKeyValue(itr.first, itr.second);
936
14
            assert(res);
937
14
          }
938
10
          res = builder.WriteEndObject();
939
10
          assert(res);
940
10
          JSONDocument update_document = builder.GetJSONDocument();
941
10
          builder.Reset();
942
10
          res = builder.WriteStartObject();
943
10
          assert(res);
944
40
          for (const auto& itr : new_document.Items()) {
945
40
            if (update_document.Contains(itr.first)) {
946
14
              res = builder.WriteKeyValue(itr.first,
947
14
                                          update_document[itr.first]);
948
26
            } else {
949
26
              res = builder.WriteKeyValue(itr.first, new_document[itr.first]);
950
26
            }
951
40
            assert(res);
952
40
          }
953
10
          res = builder.WriteEndObject();
954
10
          assert(res);
955
10
          new_document = builder.GetJSONDocument();
956
10
          assert(new_document.IsOwner());
957
0
        } else {
958
          // TODO(icanadi) more commands
959
0
          return STATUS(InvalidArgument, "Can't understand update command");
960
0
        }
961
10
      }
962
963
      // TODO(icanadi) reuse some of this code
964
6
      if (!new_document.Contains(kPrimaryKey)) {
965
0
        return STATUS(Corruption, "Corrupted document -- primary key missing");
966
0
      }
967
6
      auto primary_key = new_document[kPrimaryKey];
968
6
      if (primary_key.IsNull() ||
969
6
          (!primary_key.IsString() && !primary_key.IsInt64())) {
970
        // This will happen when document on storage doesn't have primary key,
971
        // since we don't support any update operations on primary key. That's
972
        // why this is corruption error
973
0
        return STATUS(Corruption, "Corrupted document -- primary key missing");
974
0
      }
975
6
      std::string encoded_document;
976
6
      new_document.Serialize(&encoded_document);
977
6
      std::string primary_key_encoded;
978
6
      if (!EncodeJSONPrimitive(primary_key, &primary_key_encoded)) {
979
        // previous call should be guaranteed to pass because of all primary_key
980
        // conditions checked before
981
0
        assert(false);
982
0
      }
983
6
      Slice primary_key_slice(primary_key_encoded);
984
6
      batch.Put(primary_key_column_family_, primary_key_slice,
985
6
                encoded_document);
986
987
18
      for (const auto& iter : name_to_index_) {
988
18
        std::string old_key, new_key;
989
18
        iter.second.index->GetIndexKey(old_document, &old_key);
990
18
        iter.second.index->GetIndexKey(new_document, &new_key);
991
18
        if (old_key == new_key) {
992
          // don't need to update this secondary index
993
12
          continue;
994
12
        }
995
996
6
        IndexKey old_index_key(Slice(old_key), primary_key_slice);
997
6
        IndexKey new_index_key(Slice(new_key), primary_key_slice);
998
999
6
        batch.Delete(iter.second.column_family, old_index_key.GetSliceParts());
1000
6
        batch.Put(iter.second.column_family, new_index_key.GetSliceParts(),
1001
6
                  SliceParts());
1002
6
      }
1003
6
    }
1004
1005
3
    if (!cursor->status().ok()) {
1006
0
      return cursor->status();
1007
0
    }
1008
1009
3
    return DocumentDB::Write(write_options, &batch);
1010
3
  }
1011
1012
  virtual Cursor* Query(const ReadOptions& read_options,
1013
15
                        const JSONDocument& query) override {
1014
15
    Cursor* cursor = nullptr;
1015
1016
15
    if (!query.IsArray()) {
1017
0
      return new CursorError(
1018
0
          STATUS(InvalidArgument, "Query has to be an array"));
1019
0
    }
1020
1021
    // TODO(icanadi) support index "_id"
1022
29
    for (size_t i = 0; i < query.Count(); ++i) {
1023
14
      const auto& command_doc = query[i];
1024
14
      if (command_doc.Count() != 1) {
1025
        // there can be only one key-value pair in each of array elements.
1026
        // key is the command and value are the params
1027
0
        delete cursor;
1028
0
        return new CursorError(STATUS(InvalidArgument, "Invalid query"));
1029
0
      }
1030
14
      const auto& command = *command_doc.Items().begin();
1031
1032
14
      if (command.first == "$filter") {
1033
14
        cursor = ConstructFilterCursor(read_options, cursor, command.second);
1034
0
      } else {
1035
        // only filter is supported for now
1036
0
        delete cursor;
1037
0
        return new CursorError(STATUS(InvalidArgument, "Invalid query"));
1038
0
      }
1039
14
    }
1040
1041
15
    if (cursor == nullptr) {
1042
1
      cursor = new CursorFromIterator(
1043
1
          DocumentDB::NewIterator(read_options, primary_key_column_family_));
1044
1
    }
1045
1046
15
    return cursor;
1047
15
  }
1048
1049
  // RocksDB functions
1050
  virtual Status Get(const ReadOptions& options,
1051
                     ColumnFamilyHandle* column_family, const Slice& key,
1052
0
                     std::string* value) override {
1053
0
    return STATUS(NotSupported, "");
1054
0
  }
1055
  virtual Status Get(const ReadOptions& options, const Slice& key,
1056
0
                     std::string* value) override {
1057
0
    return STATUS(NotSupported, "");
1058
0
  }
1059
  virtual Status Write(const WriteOptions& options,
1060
0
                       WriteBatch* updates) override {
1061
0
    return STATUS(NotSupported, "");
1062
0
  }
1063
  virtual Iterator* NewIterator(const ReadOptions& options,
1064
0
                                ColumnFamilyHandle* column_family) override {
1065
0
    return nullptr;
1066
0
  }
1067
0
  Iterator* NewIterator(const ReadOptions& options) override {
1068
0
    return nullptr;
1069
0
  }
1070
1071
 private:
1072
  Cursor* ConstructFilterCursor(ReadOptions read_options, Cursor* cursor,
1073
19
                                const JSONDocument& query) {
1074
19
    std::unique_ptr<const Filter> filter(Filter::ParseFilter(query));
1075
19
    if (filter.get() == nullptr) {
1076
0
      return new CursorError(STATUS(InvalidArgument, "Invalid query"));
1077
0
    }
1078
1079
19
    IndexColumnFamily tmp_storage(nullptr, nullptr);
1080
1081
19
    if (cursor == nullptr) {
1082
19
      IndexColumnFamily* index_column_family = nullptr;
1083
19
      if (query.Contains("$index") && query["$index"].IsString()) {
1084
15
        {
1085
15
          auto index_name = query["$index"];
1086
15
          MutexLock l(&name_to_index_mutex_);
1087
15
          auto index_iter = name_to_index_.find(index_name.GetString());
1088
15
          if (index_iter != name_to_index_.end()) {
1089
15
            tmp_storage = index_iter->second;
1090
15
            index_column_family = &tmp_storage;
1091
0
          } else {
1092
0
            return new CursorError(
1093
0
                STATUS(InvalidArgument, "Index does not exist"));
1094
0
          }
1095
19
        }
1096
19
      }
1097
1098
19
      if (index_column_family != nullptr &&
1099
15
          index_column_family->index->UsefulIndex(*filter.get())) {
1100
15
        std::vector<Iterator*> iterators;
1101
15
        Status s = DocumentDB::NewIterators(
1102
15
            read_options,
1103
15
            {primary_key_column_family_, index_column_family->column_family},
1104
15
            &iterators);
1105
15
        if (!s.ok()) {
1106
0
          delete cursor;
1107
0
          return new CursorError(s);
1108
0
        }
1109
15
        assert(iterators.size() == 2);
1110
15
        return new CursorWithFilterIndexed(iterators[0], iterators[1],
1111
15
                                           index_column_family->index,
1112
15
                                           filter.release());
1113
4
      } else {
1114
4
        return new CursorWithFilter(
1115
4
            new CursorFromIterator(DocumentDB::NewIterator(
1116
4
                read_options, primary_key_column_family_)),
1117
4
            filter.release());
1118
4
      }
1119
0
    } else {
1120
0
      return new CursorWithFilter(cursor, filter.release());
1121
0
    }
1122
0
    assert(false);
1123
0
    return nullptr;
1124
0
  }
1125
1126
  // currently, we lock and serialize all writes to rocksdb. reads are not
1127
  // locked and always get consistent view of the database. we should optimize
1128
  // locking in the future
1129
  port::Mutex write_mutex_;
1130
  port::Mutex name_to_index_mutex_;
1131
  const char* kPrimaryKey = "_id";
1132
  struct IndexColumnFamily {
1133
    IndexColumnFamily(Index* _index, ColumnFamilyHandle* _column_family)
1134
24
        : index(_index), column_family(_column_family) {}
1135
    Index* index;
1136
    ColumnFamilyHandle* column_family;
1137
  };
1138
1139
1140
  // name_to_index_ protected:
1141
  // 1) when writing -- 1. lock write_mutex_, 2. lock name_to_index_mutex_
1142
  // 2) when reading -- lock name_to_index_mutex_ OR write_mutex_
1143
  std::unordered_map<std::string, IndexColumnFamily> name_to_index_;
1144
  ColumnFamilyHandle* primary_key_column_family_;
1145
  Options rocksdb_options_;
1146
};
1147
1148
namespace {
1149
3
Options GetRocksDBOptionsFromOptions(const DocumentDBOptions& options) {
1150
3
  Options rocksdb_options;
1151
3
  rocksdb_options.max_background_compactions = options.background_threads - 1;
1152
3
  rocksdb_options.max_background_flushes = 1;
1153
3
  rocksdb_options.write_buffer_size = options.memtable_size;
1154
3
  rocksdb_options.max_write_buffer_number = 6;
1155
3
  BlockBasedTableOptions table_options;
1156
3
  table_options.block_cache = NewLRUCache(options.cache_size);
1157
3
  rocksdb_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1158
3
  return rocksdb_options;
1159
3
}
1160
}  // namespace
1161
1162
Status DocumentDB::Open(const DocumentDBOptions& options,
1163
                        const std::string& name,
1164
                        const std::vector<DocumentDB::IndexDescriptor>& indexes,
1165
3
                        DocumentDB** db, bool read_only) {
1166
3
  Options rocksdb_options = GetRocksDBOptionsFromOptions(options);
1167
3
  rocksdb_options.create_if_missing = true;
1168
1169
3
  std::vector<ColumnFamilyDescriptor> column_families;
1170
3
  column_families.push_back(ColumnFamilyDescriptor(
1171
3
      kDefaultColumnFamilyName, ColumnFamilyOptions(rocksdb_options)));
1172
1
  for (const auto& index : indexes) {
1173
1
    column_families.emplace_back(InternalSecondaryIndexName(index.name),
1174
1
                                 ColumnFamilyOptions(rocksdb_options));
1175
1
  }
1176
3
  std::vector<ColumnFamilyHandle*> handles;
1177
3
  DB* base_db;
1178
3
  Status s;
1179
3
  if (read_only) {
1180
0
    s = DB::OpenForReadOnly(DBOptions(rocksdb_options), name, column_families,
1181
0
                            &handles, &base_db);
1182
3
  } else {
1183
3
    s = DB::Open(DBOptions(rocksdb_options), name, column_families, &handles,
1184
3
                 &base_db);
1185
3
  }
1186
3
  if (!s.ok()) {
1187
0
    return s;
1188
0
  }
1189
1190
3
  std::vector<std::pair<Index*, ColumnFamilyHandle*>> index_cf(indexes.size());
1191
3
  assert(handles.size() == indexes.size() + 1);
1192
4
  for (size_t i = 0; i < indexes.size(); ++i) {
1193
1
    auto index = Index::CreateIndexFromDescription(*indexes[i].description,
1194
1
                                                   indexes[i].name);
1195
1
    index_cf[i] = {index, handles[i + 1]};
1196
1
  }
1197
3
  *db = new DocumentDBImpl(base_db, handles[0], index_cf, rocksdb_options);
1198
3
  return Status::OK();
1199
3
}
1200
1201
}  // namespace rocksdb
1202
#endif  // ROCKSDB_LITE