/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/document/document_db.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef ROCKSDB_LITE |
22 | | |
23 | | #include "yb/rocksdb/cache.h" |
24 | | #include "yb/rocksdb/comparator.h" |
25 | | #include "yb/rocksdb/db.h" |
26 | | #include "yb/rocksdb/table.h" |
27 | | #include "yb/rocksdb/util/coding.h" |
28 | | #include "yb/rocksdb/util/mutexlock.h" |
29 | | #include "yb/rocksdb/utilities/document_db.h" |
30 | | |
31 | | namespace rocksdb { |
32 | | |
33 | | // IMPORTANT NOTE: Secondary index column families should be very small and |
34 | | // generally fit in memory. Assume that accessing secondary index column |
35 | | // families is much faster than accessing primary index (data heap) column |
36 | | // family. Accessing a key (i.e. checking for existance) from a column family in |
37 | | // RocksDB is not much faster than accessing both key and value since they are |
38 | | // kept together and loaded from storage together. |
39 | | |
40 | | namespace { |
41 | | // < 0 <=> lhs < rhs |
42 | | // == 0 <=> lhs == rhs |
43 | | // > 0 <=> lhs == rhs |
44 | | // TODO(icanadi) move this to JSONDocument? |
45 | 172 | int DocumentCompare(const JSONDocument& lhs, const JSONDocument& rhs) { |
46 | 172 | assert(lhs.IsObject() == false && rhs.IsObject() == false && |
47 | 172 | lhs.type() == rhs.type()); |
48 | | |
49 | 172 | switch (lhs.type()) { |
50 | 0 | case JSONDocument::kNull: |
51 | 0 | return 0; |
52 | 0 | case JSONDocument::kBool: |
53 | 0 | return static_cast<int>(lhs.GetBool()) - static_cast<int>(rhs.GetBool()); |
54 | 34 | case JSONDocument::kDouble: { |
55 | 34 | double res = lhs.GetDouble() - rhs.GetDouble(); |
56 | 34 | return res == 0.0 ? 0 : (res < 0.0 ? -1 : 1); |
57 | 0 | } |
58 | 77 | case JSONDocument::kInt64: { |
59 | 77 | int64_t res = lhs.GetInt64() - rhs.GetInt64(); |
60 | 59 | return res == 0 ? 0 : (res < 0 ? -1 : 1); |
61 | 0 | } |
62 | 61 | case JSONDocument::kString: |
63 | 61 | return Slice(lhs.GetString()).compare(Slice(rhs.GetString())); |
64 | 0 | default: |
65 | 0 | assert(false); |
66 | 172 | } |
67 | 0 | return 0; |
68 | 172 | } |
69 | | } // namespace |
70 | | |
71 | | class Filter { |
72 | | public: |
73 | | // returns nullptr on parse failure |
74 | | static Filter* ParseFilter(const JSONDocument& filter); |
75 | | |
76 | | struct Interval { |
77 | | JSONDocument upper_bound; |
78 | | JSONDocument lower_bound; |
79 | | bool upper_inclusive; |
80 | | bool lower_inclusive; |
81 | | Interval() |
82 | | : upper_bound(), |
83 | | lower_bound(), |
84 | | upper_inclusive(false), |
85 | 17 | lower_inclusive(false) {} |
86 | | Interval(const JSONDocument& ub, const JSONDocument& lb, bool ui, bool li) |
87 | | : upper_bound(ub), |
88 | | lower_bound(lb), |
89 | | upper_inclusive(ui), |
90 | 5 | lower_inclusive(li) { |
91 | 5 | } |
92 | | |
93 | | void UpdateUpperBound(const JSONDocument& ub, bool inclusive); |
94 | | void UpdateLowerBound(const JSONDocument& lb, bool inclusive); |
95 | | }; |
96 | | |
97 | | bool SatisfiesFilter(const JSONDocument& document) const; |
98 | | const Interval* GetInterval(const std::string& field) const; |
99 | | |
100 | | private: |
101 | 19 | explicit Filter(const JSONDocument& filter) : filter_(filter.Copy()) { |
102 | 19 | assert(filter_.IsOwner()); |
103 | 19 | } |
104 | | |
105 | | // copied from the parameter |
106 | | const JSONDocument filter_; |
107 | | // constant after construction |
108 | | std::unordered_map<std::string, Interval> intervals_; |
109 | | }; |
110 | | |
111 | | void Filter::Interval::UpdateUpperBound(const JSONDocument& ub, |
112 | 13 | bool inclusive) { |
113 | 13 | bool update = upper_bound.IsNull(); |
114 | 13 | if (!update) { |
115 | 1 | int cmp = DocumentCompare(upper_bound, ub); |
116 | 1 | update = (cmp > 0) || (cmp == 0 && !inclusive); |
117 | 1 | } |
118 | 13 | if (update) { |
119 | 13 | upper_bound = ub; |
120 | 13 | upper_inclusive = inclusive; |
121 | 13 | } |
122 | 13 | } |
123 | | |
124 | | void Filter::Interval::UpdateLowerBound(const JSONDocument& lb, |
125 | 14 | bool inclusive) { |
126 | 14 | bool update = lower_bound.IsNull(); |
127 | 14 | if (!update) { |
128 | 1 | int cmp = DocumentCompare(lower_bound, lb); |
129 | 1 | update = (cmp < 0) || (cmp == 0 && !inclusive); |
130 | 1 | } |
131 | 14 | if (update) { |
132 | 14 | lower_bound = lb; |
133 | 14 | lower_inclusive = inclusive; |
134 | 14 | } |
135 | 14 | } |
136 | | |
137 | 19 | Filter* Filter::ParseFilter(const JSONDocument& filter) { |
138 | 19 | if (filter.IsObject() == false) { |
139 | 0 | return nullptr; |
140 | 0 | } |
141 | | |
142 | 19 | std::unique_ptr<Filter> f(new Filter(filter)); |
143 | | |
144 | 37 | for (const auto& items : f->filter_.Items()) { |
145 | 37 | if (items.first.size() && items.first[0] == '$') { |
146 | | // fields starting with '$' are commands |
147 | 15 | continue; |
148 | 15 | } |
149 | 22 | assert(f->intervals_.find(items.first) == f->intervals_.end()); |
150 | 22 | if (items.second.IsObject()) { |
151 | 17 | if (items.second.Count() == 0) { |
152 | | // uhm...? |
153 | 0 | return nullptr; |
154 | 0 | } |
155 | 17 | Interval interval; |
156 | 27 | for (const auto& condition : items.second.Items()) { |
157 | 27 | if (condition.second.IsObject() || condition.second.IsArray()) { |
158 | | // comparison operators not defined on objects. invalid array |
159 | 0 | return nullptr; |
160 | 0 | } |
161 | | // comparison operators: |
162 | 27 | if (condition.first == "$gt") { |
163 | 10 | interval.UpdateLowerBound(condition.second, false); |
164 | 17 | } else if (condition.first == "$gte") { |
165 | 4 | interval.UpdateLowerBound(condition.second, true); |
166 | 13 | } else if (condition.first == "$lt") { |
167 | 10 | interval.UpdateUpperBound(condition.second, false); |
168 | 3 | } else if (condition.first == "$lte") { |
169 | 3 | interval.UpdateUpperBound(condition.second, true); |
170 | 0 | } else { |
171 | | // TODO(icanadi) more logical operators |
172 | 0 | return nullptr; |
173 | 0 | } |
174 | 27 | } |
175 | 17 | f->intervals_.insert({items.first, interval}); |
176 | 5 | } else { |
177 | | // equality |
178 | 5 | f->intervals_.insert( |
179 | 5 | {items.first, Interval(items.second, |
180 | 5 | items.second, true, true)}); |
181 | 5 | } |
182 | 22 | } |
183 | | |
184 | 19 | return f.release(); |
185 | 19 | } |
186 | | |
187 | 101 | const Filter::Interval* Filter::GetInterval(const std::string& field) const { |
188 | 101 | auto itr = intervals_.find(field); |
189 | 101 | if (itr == intervals_.end()) { |
190 | 0 | return nullptr; |
191 | 0 | } |
192 | | // we can do that since intervals_ is constant after construction |
193 | 101 | return &itr->second; |
194 | 101 | } |
195 | | |
196 | 94 | bool Filter::SatisfiesFilter(const JSONDocument& document) const { |
197 | 112 | for (const auto& interval : intervals_) { |
198 | 112 | if (!document.Contains(interval.first)) { |
199 | | // doesn't have the value, doesn't satisfy the filter |
200 | | // (we don't support null queries yet) |
201 | 0 | return false; |
202 | 0 | } |
203 | 112 | auto value = document[interval.first]; |
204 | 112 | if (!interval.second.upper_bound.IsNull()) { |
205 | 84 | if (value.type() != interval.second.upper_bound.type()) { |
206 | | // no cross-type queries yet |
207 | | // TODO(icanadi) do this at least for numbers! |
208 | 0 | return false; |
209 | 0 | } |
210 | 84 | int cmp = DocumentCompare(interval.second.upper_bound, value); |
211 | 84 | if (cmp < 0 || (cmp == 0 && interval.second.upper_inclusive == false)) { |
212 | | // bigger (or equal) than upper bound |
213 | 17 | return false; |
214 | 17 | } |
215 | 95 | } |
216 | 95 | if (!interval.second.lower_bound.IsNull()) { |
217 | 86 | if (value.type() != interval.second.lower_bound.type()) { |
218 | | // no cross-type queries yet |
219 | 0 | return false; |
220 | 0 | } |
221 | 86 | int cmp = DocumentCompare(interval.second.lower_bound, value); |
222 | 86 | if (cmp > 0 || (cmp == 0 && interval.second.lower_inclusive == false)) { |
223 | | // smaller (or equal) than the lower bound |
224 | 27 | return false; |
225 | 27 | } |
226 | 86 | } |
227 | 95 | } |
228 | 50 | return true; |
229 | 94 | } |
230 | | |
231 | | class Index { |
232 | | public: |
233 | 5 | Index() = default; |
234 | 5 | virtual ~Index() {} |
235 | | |
236 | | virtual const char* Name() const = 0; |
237 | | |
238 | | // Functions that are executed during write time |
239 | | // --------------------------------------------- |
240 | | // GetIndexKey() generates a key that will be used to index document and |
241 | | // returns the key though the second std::string* parameter |
242 | | virtual void GetIndexKey(const JSONDocument& document, |
243 | | std::string* key) const = 0; |
244 | | // Keys generated with GetIndexKey() will be compared using this comparator. |
245 | | // It should be assumed that there will be a suffix added to the index key |
246 | | // according to IndexKey implementation |
247 | | virtual const Comparator* GetComparator() const = 0; |
248 | | |
249 | | // Functions that are executed during query time |
250 | | // --------------------------------------------- |
251 | | enum Direction { |
252 | | kForwards, |
253 | | kBackwards, |
254 | | }; |
255 | | // Returns true if this index can provide some optimization for satisfying |
256 | | // filter. False otherwise |
257 | | virtual bool UsefulIndex(const Filter& filter) const = 0; |
258 | | // For every filter (assuming UsefulIndex()) there is a continuous interval of |
259 | | // keys in the index that satisfy the index conditions. That interval can be |
260 | | // three things: |
261 | | // * [A, B] |
262 | | // * [A, infinity> |
263 | | // * <-infinity, B] |
264 | | // |
265 | | // Query engine that uses this Index for optimization will access the interval |
266 | | // by first calling Position() and then iterating in the Direction (returned |
267 | | // by Position()) while ShouldContinueLooking() is true. |
268 | | // * For [A, B] interval Position() will Seek() to A and return kForwards. |
269 | | // ShouldContinueLooking() will be true until the iterator value gets beyond B |
270 | | // -- then it will return false |
271 | | // * For [A, infinity> Position() will Seek() to A and return kForwards. |
272 | | // ShouldContinueLooking() will always return true |
273 | | // * For <-infinity, B] Position() will Seek() to B and return kBackwards. |
274 | | // ShouldContinueLooking() will always return true (given that iterator is |
275 | | // advanced by calling Prev()) |
276 | | virtual Direction Position(const Filter& filter, |
277 | | Iterator* iterator) const = 0; |
278 | | virtual bool ShouldContinueLooking(const Filter& filter, |
279 | | const Slice& secondary_key, |
280 | | Direction direction) const = 0; |
281 | | |
282 | | // Static function that is executed when Index is created |
283 | | // --------------------------------------------- |
284 | | // Create Index from user-supplied description. Return nullptr on parse |
285 | | // failure. |
286 | | static Index* CreateIndexFromDescription(const JSONDocument& description, |
287 | | const std::string& name); |
288 | | |
289 | | private: |
290 | | // No copying allowed |
291 | | Index(const Index&); |
292 | | void operator=(const Index&); |
293 | | }; |
294 | | |
295 | | // Encoding helper function |
296 | | namespace { |
297 | 5 | std::string InternalSecondaryIndexName(const std::string& user_name) { |
298 | 5 | return "index_" + user_name; |
299 | 5 | } |
300 | | |
301 | | // Don't change these, they are persisted in secondary indexes |
302 | | enum JSONPrimitivesEncoding : char { |
303 | | kNull = 0x1, |
304 | | kBool = 0x2, |
305 | | kDouble = 0x3, |
306 | | kInt64 = 0x4, |
307 | | kString = 0x5, |
308 | | }; |
309 | | |
310 | | // encodes simple JSON members (meaning string, integer, etc) |
311 | | // the end result of this will be lexicographically compared to each other |
312 | 182 | bool EncodeJSONPrimitive(const JSONDocument& json, std::string* dst) { |
313 | | // TODO(icanadi) revise this at some point, have a custom comparator |
314 | 182 | switch (json.type()) { |
315 | 0 | case JSONDocument::kNull: |
316 | 0 | dst->push_back(kNull); |
317 | 0 | break; |
318 | 0 | case JSONDocument::kBool: |
319 | 0 | dst->push_back(kBool); |
320 | 0 | dst->push_back(static_cast<char>(json.GetBool())); |
321 | 0 | break; |
322 | 37 | case JSONDocument::kDouble: |
323 | 37 | dst->push_back(kDouble); |
324 | 37 | PutFixed64(dst, static_cast<uint64_t>(json.GetDouble())); |
325 | 37 | break; |
326 | 100 | case JSONDocument::kInt64: |
327 | 100 | dst->push_back(kInt64); |
328 | 100 | { |
329 | 100 | auto val = json.GetInt64(); |
330 | 95 | dst->push_back((val < 0) ? '0' : '1'); |
331 | 100 | PutFixed64(dst, static_cast<uint64_t>(val)); |
332 | 100 | } |
333 | 100 | break; |
334 | 45 | case JSONDocument::kString: |
335 | 45 | dst->push_back(kString); |
336 | 45 | dst->append(json.GetString()); |
337 | 45 | break; |
338 | 0 | default: |
339 | 0 | return false; |
340 | 182 | } |
341 | 182 | return true; |
342 | 182 | } |
343 | | |
344 | | } // namespace |
345 | | |
346 | | // format of the secondary key is: |
347 | | // <secondary_key><primary_key><offset_of_primary_key uint32_t> |
348 | | class IndexKey { |
349 | | public: |
350 | 15 | IndexKey() : ok_(false) {} |
351 | 71 | explicit IndexKey(const Slice& slice) { |
352 | 71 | if (slice.size() < sizeof(uint32_t)) { |
353 | 0 | ok_ = false; |
354 | 0 | return; |
355 | 0 | } |
356 | 71 | uint32_t primary_key_offset = |
357 | 71 | DecodeFixed32(slice.data() + slice.size() - sizeof(uint32_t)); |
358 | 71 | if (primary_key_offset >= slice.size() - sizeof(uint32_t)) { |
359 | 0 | ok_ = false; |
360 | 0 | return; |
361 | 0 | } |
362 | 71 | parts_[0] = Slice(slice.data(), primary_key_offset); |
363 | 71 | parts_[1] = Slice(slice.data() + primary_key_offset, |
364 | 71 | slice.size() - primary_key_offset - sizeof(uint32_t)); |
365 | 71 | ok_ = true; |
366 | 71 | } |
367 | 70 | IndexKey(const Slice& secondary_key, const Slice& primary_key) : ok_(true) { |
368 | 70 | parts_[0] = secondary_key; |
369 | 70 | parts_[1] = primary_key; |
370 | 70 | } |
371 | | |
372 | 70 | SliceParts GetSliceParts() { |
373 | 70 | uint32_t primary_key_offset = static_cast<uint32_t>(parts_[0].size()); |
374 | 70 | EncodeFixed32(primary_key_offset_buf_, primary_key_offset); |
375 | 70 | parts_[2] = Slice(primary_key_offset_buf_, sizeof(uint32_t)); |
376 | 70 | return SliceParts(parts_, 3); |
377 | 70 | } |
378 | | |
379 | 63 | const Slice& GetPrimaryKey() const { return parts_[1]; } |
380 | 71 | const Slice& GetSecondaryKey() const { return parts_[0]; } |
381 | | |
382 | 71 | bool ok() const { return ok_; } |
383 | | |
384 | | private: |
385 | | bool ok_; |
386 | | // 0 -- secondary key |
387 | | // 1 -- primary key |
388 | | // 2 -- primary key offset |
389 | | Slice parts_[3]; |
390 | | char primary_key_offset_buf_[sizeof(uint32_t)]; |
391 | | }; |
392 | | |
393 | | class SimpleSortedIndex : public Index { |
394 | | public: |
395 | | SimpleSortedIndex(const std::string& field, const std::string& name) |
396 | 5 | : field_(field), name_(name) {} |
397 | | |
398 | 1 | const char* Name() const override { return name_.c_str(); } |
399 | | |
400 | | virtual void GetIndexKey(const JSONDocument& document, std::string* key) const |
401 | 94 | override { |
402 | 94 | if (!document.Contains(field_)) { |
403 | 0 | if (!EncodeJSONPrimitive(JSONDocument(JSONDocument::kNull), key)) { |
404 | 0 | assert(false); |
405 | 0 | } |
406 | 94 | } else { |
407 | 94 | if (!EncodeJSONPrimitive(document[field_], key)) { |
408 | 0 | assert(false); |
409 | 0 | } |
410 | 94 | } |
411 | 94 | } |
412 | 0 | const Comparator* GetComparator() const override { |
413 | 0 | return BytewiseComparator(); |
414 | 0 | } |
415 | | |
416 | 15 | bool UsefulIndex(const Filter& filter) const override { |
417 | 15 | return filter.GetInterval(field_) != nullptr; |
418 | 15 | } |
419 | | // REQUIRES: UsefulIndex(filter) == true |
420 | | virtual Direction Position(const Filter& filter, |
421 | 15 | Iterator* iterator) const override { |
422 | 15 | auto interval = filter.GetInterval(field_); |
423 | 15 | assert(interval != nullptr); // because index is useful |
424 | 15 | Direction direction; |
425 | | |
426 | 15 | const JSONDocument* limit; |
427 | 15 | if (!interval->lower_bound.IsNull()) { |
428 | 12 | limit = &(interval->lower_bound); |
429 | 12 | direction = kForwards; |
430 | 3 | } else { |
431 | 3 | limit = &(interval->upper_bound); |
432 | 3 | direction = kBackwards; |
433 | 3 | } |
434 | | |
435 | 15 | std::string encoded_limit; |
436 | 15 | if (!EncodeJSONPrimitive(*limit, &encoded_limit)) { |
437 | 0 | assert(false); |
438 | 0 | } |
439 | 15 | iterator->Seek(Slice(encoded_limit)); |
440 | | |
441 | 15 | return direction; |
442 | 15 | } |
443 | | // REQUIRES: UsefulIndex(filter) == true |
444 | | virtual bool ShouldContinueLooking( |
445 | | const Filter& filter, const Slice& secondary_key, |
446 | 71 | Index::Direction direction) const override { |
447 | 71 | auto interval = filter.GetInterval(field_); |
448 | 71 | assert(interval != nullptr); // because index is useful |
449 | 71 | if (direction == kForwards) { |
450 | 62 | if (interval->upper_bound.IsNull()) { |
451 | | // continue looking, no upper bound |
452 | 20 | return true; |
453 | 20 | } |
454 | 42 | std::string encoded_upper_bound; |
455 | 42 | if (!EncodeJSONPrimitive(interval->upper_bound, &encoded_upper_bound)) { |
456 | | // uhm...? |
457 | | // TODO(icanadi) store encoded upper and lower bounds in Filter*? |
458 | 0 | assert(false); |
459 | 0 | } |
460 | | // TODO(icanadi) we need to somehow decode this and use DocumentCompare() |
461 | 42 | int compare = secondary_key.compare(Slice(encoded_upper_bound)); |
462 | | // if (current key is bigger than upper bound) OR (current key is equal to |
463 | | // upper bound, but inclusive is false) THEN stop looking. otherwise, |
464 | | // continue |
465 | 42 | return (compare > 0 || |
466 | 35 | (compare == 0 && interval->upper_inclusive == false)) |
467 | 8 | ? false |
468 | 34 | : true; |
469 | 9 | } else { |
470 | 9 | assert(direction == kBackwards); |
471 | 9 | if (interval->lower_bound.IsNull()) { |
472 | | // continue looking, no lower bound |
473 | 9 | return true; |
474 | 9 | } |
475 | 0 | std::string encoded_lower_bound; |
476 | 0 | if (!EncodeJSONPrimitive(interval->lower_bound, &encoded_lower_bound)) { |
477 | | // uhm...? |
478 | | // TODO(icanadi) store encoded upper and lower bounds in Filter*? |
479 | 0 | assert(false); |
480 | 0 | } |
481 | | // TODO(icanadi) we need to somehow decode this and use DocumentCompare() |
482 | 0 | int compare = secondary_key.compare(Slice(encoded_lower_bound)); |
483 | | // if (current key is smaller than lower bound) OR (current key is equal |
484 | | // to lower bound, but inclusive is false) THEN stop looking. otherwise, |
485 | | // continue |
486 | 0 | return (compare < 0 || |
487 | 0 | (compare == 0 && interval->lower_inclusive == false)) |
488 | 0 | ? false |
489 | 0 | : true; |
490 | 0 | } |
491 | | |
492 | 0 | assert(false); |
493 | | // this is here just so compiler doesn't complain |
494 | 0 | return false; |
495 | 0 | } |
496 | | |
497 | | private: |
498 | | std::string field_; |
499 | | std::string name_; |
500 | | }; |
501 | | |
502 | | Index* Index::CreateIndexFromDescription(const JSONDocument& description, |
503 | 5 | const std::string& name) { |
504 | 5 | if (!description.IsObject() || description.Count() != 1) { |
505 | | // not supported yet |
506 | 0 | return nullptr; |
507 | 0 | } |
508 | 5 | const auto& field = *description.Items().begin(); |
509 | 5 | if (field.second.IsInt64() == false || field.second.GetInt64() != 1) { |
510 | | // not supported yet |
511 | 0 | return nullptr; |
512 | 0 | } |
513 | 5 | return new SimpleSortedIndex(field.first, name); |
514 | 5 | } |
515 | | |
516 | | class CursorWithFilterIndexed : public Cursor { |
517 | | public: |
518 | | CursorWithFilterIndexed(Iterator* primary_index_iter, |
519 | | Iterator* secondary_index_iter, const Index* index, |
520 | | const Filter* filter) |
521 | | : primary_index_iter_(primary_index_iter), |
522 | | secondary_index_iter_(secondary_index_iter), |
523 | | index_(index), |
524 | | filter_(filter), |
525 | | valid_(true), |
526 | 15 | current_json_document_(nullptr) { |
527 | 15 | assert(filter_.get() != nullptr); |
528 | 15 | direction_ = index->Position(*filter_.get(), secondary_index_iter_.get()); |
529 | 15 | UpdateIndexKey(); |
530 | 15 | AdvanceUntilSatisfies(); |
531 | 15 | } |
532 | | |
533 | 211 | bool Valid() const override { |
534 | 211 | return valid_ && secondary_index_iter_->Valid(); |
535 | 211 | } |
536 | 42 | void Next() override { |
537 | 42 | assert(Valid()); |
538 | 42 | Advance(); |
539 | 42 | AdvanceUntilSatisfies(); |
540 | 42 | } |
541 | | // temporary object. copy it if you want to use it |
542 | 77 | const JSONDocument& document() const override { |
543 | 77 | assert(Valid()); |
544 | 77 | return *current_json_document_; |
545 | 77 | } |
546 | 15 | Status status() const override { |
547 | 15 | if (!status_.ok()) { |
548 | 0 | return status_; |
549 | 0 | } |
550 | 15 | if (!primary_index_iter_->status().ok()) { |
551 | 0 | return primary_index_iter_->status(); |
552 | 0 | } |
553 | 15 | return secondary_index_iter_->status(); |
554 | 15 | } |
555 | | |
556 | | private: |
557 | 63 | void Advance() { |
558 | 63 | if (direction_ == Index::kForwards) { |
559 | 54 | secondary_index_iter_->Next(); |
560 | 9 | } else { |
561 | 9 | secondary_index_iter_->Prev(); |
562 | 9 | } |
563 | 63 | UpdateIndexKey(); |
564 | 63 | } |
565 | 57 | void AdvanceUntilSatisfies() { |
566 | 57 | bool found = false; |
567 | 78 | while (secondary_index_iter_->Valid() && |
568 | 71 | index_->ShouldContinueLooking( |
569 | 63 | *filter_.get(), index_key_.GetSecondaryKey(), direction_)) { |
570 | 63 | if (!UpdateJSONDocument()) { |
571 | | // corruption happened |
572 | 0 | return; |
573 | 0 | } |
574 | 63 | if (filter_->SatisfiesFilter(*current_json_document_)) { |
575 | | // we found satisfied! |
576 | 42 | found = true; |
577 | 42 | break; |
578 | 21 | } else { |
579 | | // doesn't satisfy :( |
580 | 21 | Advance(); |
581 | 21 | } |
582 | 63 | } |
583 | 57 | if (!found) { |
584 | 15 | valid_ = false; |
585 | 15 | } |
586 | 57 | } |
587 | | |
588 | 63 | bool UpdateJSONDocument() { |
589 | 63 | assert(secondary_index_iter_->Valid()); |
590 | 63 | primary_index_iter_->Seek(index_key_.GetPrimaryKey()); |
591 | 63 | if (!primary_index_iter_->Valid()) { |
592 | 0 | status_ = STATUS(Corruption, |
593 | 0 | "Inconsistency between primary and secondary index"); |
594 | 0 | valid_ = false; |
595 | 0 | return false; |
596 | 0 | } |
597 | 63 | current_json_document_.reset( |
598 | 63 | JSONDocument::Deserialize(primary_index_iter_->value())); |
599 | 63 | assert(current_json_document_->IsOwner()); |
600 | 63 | if (current_json_document_.get() == nullptr) { |
601 | 0 | status_ = STATUS(Corruption, "JSON deserialization failed"); |
602 | 0 | valid_ = false; |
603 | 0 | return false; |
604 | 0 | } |
605 | 63 | return true; |
606 | 63 | } |
607 | 78 | void UpdateIndexKey() { |
608 | 78 | if (secondary_index_iter_->Valid()) { |
609 | 71 | index_key_ = IndexKey(secondary_index_iter_->key()); |
610 | 71 | if (!index_key_.ok()) { |
611 | 0 | status_ = STATUS(Corruption, "Invalid index key"); |
612 | 0 | valid_ = false; |
613 | 0 | } |
614 | 71 | } |
615 | 78 | } |
616 | | std::unique_ptr<Iterator> primary_index_iter_; |
617 | | std::unique_ptr<Iterator> secondary_index_iter_; |
618 | | // we don't own index_ |
619 | | const Index* index_; |
620 | | Index::Direction direction_; |
621 | | std::unique_ptr<const Filter> filter_; |
622 | | bool valid_; |
623 | | IndexKey index_key_; |
624 | | std::unique_ptr<JSONDocument> current_json_document_; |
625 | | Status status_; |
626 | | }; |
627 | | |
628 | | class CursorFromIterator : public Cursor { |
629 | | public: |
630 | | explicit CursorFromIterator(Iterator* iter) |
631 | 9 | : iter_(iter), current_json_document_(nullptr) { |
632 | 9 | iter_->SeekToFirst(); |
633 | 9 | UpdateCurrentJSON(); |
634 | 9 | } |
635 | | |
636 | 158 | bool Valid() const override { return status_.ok() && iter_->Valid(); } |
637 | 32 | void Next() override { |
638 | 32 | iter_->Next(); |
639 | 32 | UpdateCurrentJSON(); |
640 | 32 | } |
641 | 43 | const JSONDocument& document() const override { |
642 | 43 | assert(Valid()); |
643 | 43 | return *current_json_document_; |
644 | 43 | }; |
645 | 16 | Status status() const override { |
646 | 16 | if (!status_.ok()) { |
647 | 0 | return status_; |
648 | 0 | } |
649 | 16 | return iter_->status(); |
650 | 16 | } |
651 | | |
652 | | // not part of public Cursor interface |
653 | 0 | Slice key() const { return iter_->key(); } |
654 | | |
655 | | private: |
656 | 41 | void UpdateCurrentJSON() { |
657 | 41 | if (Valid()) { |
658 | 32 | current_json_document_.reset(JSONDocument::Deserialize(iter_->value())); |
659 | 32 | if (current_json_document_.get() == nullptr) { |
660 | 0 | status_ = STATUS(Corruption, "JSON deserialization failed"); |
661 | 0 | } |
662 | 32 | } |
663 | 41 | } |
664 | | |
665 | | Status status_; |
666 | | std::unique_ptr<Iterator> iter_; |
667 | | std::unique_ptr<JSONDocument> current_json_document_; |
668 | | }; |
669 | | |
670 | | class CursorWithFilter : public Cursor { |
671 | | public: |
672 | | CursorWithFilter(Cursor* base_cursor, const Filter* filter) |
673 | 4 | : base_cursor_(base_cursor), filter_(filter) { |
674 | 4 | assert(filter_.get() != nullptr); |
675 | 4 | SeekToNextSatisfies(); |
676 | 4 | } |
677 | 32 | bool Valid() const override { return base_cursor_->Valid(); } |
678 | 8 | void Next() override { |
679 | 8 | assert(Valid()); |
680 | 8 | base_cursor_->Next(); |
681 | 8 | SeekToNextSatisfies(); |
682 | 8 | } |
683 | 10 | const JSONDocument& document() const override { |
684 | 10 | assert(Valid()); |
685 | 10 | return base_cursor_->document(); |
686 | 10 | } |
687 | 12 | Status status() const override { return base_cursor_->status(); } |
688 | | |
689 | | private: |
690 | 12 | void SeekToNextSatisfies() { |
691 | 35 | for (; base_cursor_->Valid(); base_cursor_->Next()) { |
692 | 31 | if (filter_->SatisfiesFilter(base_cursor_->document())) { |
693 | 8 | break; |
694 | 8 | } |
695 | 31 | } |
696 | 12 | } |
697 | | std::unique_ptr<Cursor> base_cursor_; |
698 | | std::unique_ptr<const Filter> filter_; |
699 | | }; |
700 | | |
701 | | class CursorError : public Cursor { |
702 | | public: |
703 | 0 | explicit CursorError(Status s) : s_(s) { assert(!s.ok()); } |
704 | 0 | Status status() const override { return s_; } |
705 | 0 | bool Valid() const override { return false; } |
706 | 0 | void Next() override {} |
707 | 0 | const JSONDocument& document() const override { |
708 | 0 | assert(false); |
709 | | // compiler complains otherwise |
710 | 0 | return trash_; |
711 | 0 | } |
712 | | |
713 | | private: |
714 | | Status s_; |
715 | | JSONDocument trash_; |
716 | | }; |
717 | | |
718 | | class DocumentDBImpl : public DocumentDB { |
719 | | public: |
720 | | DocumentDBImpl( |
721 | | DB* db, ColumnFamilyHandle* primary_key_column_family, |
722 | | const std::vector<std::pair<Index*, ColumnFamilyHandle*>>& indexes, |
723 | | const Options& rocksdb_options) |
724 | | : DocumentDB(db), |
725 | | primary_key_column_family_(primary_key_column_family), |
726 | 3 | rocksdb_options_(rocksdb_options) { |
727 | 1 | for (const auto& index : indexes) { |
728 | 1 | name_to_index_.insert( |
729 | 1 | {index.first->Name(), IndexColumnFamily(index.first, index.second)}); |
730 | 1 | } |
731 | 3 | } |
732 | | |
733 | 3 | ~DocumentDBImpl() { |
734 | 4 | for (auto& iter : name_to_index_) { |
735 | 4 | delete iter.second.index; |
736 | 4 | delete iter.second.column_family; |
737 | 4 | } |
738 | 3 | delete primary_key_column_family_; |
739 | 3 | } |
740 | | |
741 | | virtual Status CreateIndex(const WriteOptions& write_options, |
742 | 4 | const IndexDescriptor& index) override { |
743 | 4 | auto index_obj = |
744 | 4 | Index::CreateIndexFromDescription(*index.description, index.name); |
745 | 4 | if (index_obj == nullptr) { |
746 | 0 | return STATUS(InvalidArgument, "Failed parsing index description"); |
747 | 0 | } |
748 | | |
749 | 4 | ColumnFamilyHandle* cf_handle; |
750 | 4 | Status s = |
751 | 4 | CreateColumnFamily(ColumnFamilyOptions(rocksdb_options_), |
752 | 4 | InternalSecondaryIndexName(index.name), &cf_handle); |
753 | 4 | if (!s.ok()) { |
754 | 0 | delete index_obj; |
755 | 0 | return s; |
756 | 0 | } |
757 | | |
758 | 4 | MutexLock l(&write_mutex_); |
759 | | |
760 | 4 | std::unique_ptr<CursorFromIterator> cursor(new CursorFromIterator( |
761 | 4 | DocumentDB::NewIterator(ReadOptions(), primary_key_column_family_))); |
762 | | |
763 | 4 | WriteBatch batch; |
764 | 4 | for (; cursor->Valid(); cursor->Next()) { |
765 | 0 | std::string secondary_index_key; |
766 | 0 | index_obj->GetIndexKey(cursor->document(), &secondary_index_key); |
767 | 0 | IndexKey index_key(Slice(secondary_index_key), cursor->key()); |
768 | 0 | batch.Put(cf_handle, index_key.GetSliceParts(), SliceParts()); |
769 | 0 | } |
770 | | |
771 | 4 | if (!cursor->status().ok()) { |
772 | 0 | delete index_obj; |
773 | 0 | return cursor->status(); |
774 | 0 | } |
775 | | |
776 | 4 | { |
777 | 4 | MutexLock l_nti(&name_to_index_mutex_); |
778 | 4 | name_to_index_.insert( |
779 | 4 | {index.name, IndexColumnFamily(index_obj, cf_handle)}); |
780 | 4 | } |
781 | | |
782 | 4 | return DocumentDB::Write(write_options, &batch); |
783 | 4 | } |
784 | | |
785 | 2 | Status DropIndex(const std::string& name) override { |
786 | 2 | MutexLock l(&write_mutex_); |
787 | | |
788 | 2 | auto index_iter = name_to_index_.find(name); |
789 | 2 | if (index_iter == name_to_index_.end()) { |
790 | 1 | return STATUS(InvalidArgument, "No such index"); |
791 | 1 | } |
792 | | |
793 | 1 | Status s = DropColumnFamily(index_iter->second.column_family); |
794 | 1 | if (!s.ok()) { |
795 | 0 | return s; |
796 | 0 | } |
797 | | |
798 | 1 | delete index_iter->second.index; |
799 | 1 | delete index_iter->second.column_family; |
800 | | |
801 | | // remove from name_to_index_ |
802 | 1 | { |
803 | 1 | MutexLock l_nti(&name_to_index_mutex_); |
804 | 1 | name_to_index_.erase(index_iter); |
805 | 1 | } |
806 | | |
807 | 1 | return Status::OK(); |
808 | 1 | } |
809 | | |
810 | | virtual Status Insert(const WriteOptions& options, |
811 | 18 | const JSONDocument& document) override { |
812 | 18 | WriteBatch batch; |
813 | | |
814 | 18 | if (!document.IsObject()) { |
815 | 0 | return STATUS(InvalidArgument, "Document not an object"); |
816 | 0 | } |
817 | 18 | if (!document.Contains(kPrimaryKey)) { |
818 | 0 | return STATUS(InvalidArgument, "No primary key"); |
819 | 0 | } |
820 | 18 | auto primary_key = document[kPrimaryKey]; |
821 | 18 | if (primary_key.IsNull() || |
822 | 18 | (!primary_key.IsString() && !primary_key.IsInt64())) { |
823 | 0 | return STATUS(InvalidArgument, |
824 | 0 | "Primary key format error"); |
825 | 0 | } |
826 | 18 | std::string encoded_document; |
827 | 18 | document.Serialize(&encoded_document); |
828 | 18 | std::string primary_key_encoded; |
829 | 18 | if (!EncodeJSONPrimitive(primary_key, &primary_key_encoded)) { |
830 | | // previous call should be guaranteed to pass because of all primary_key |
831 | | // conditions checked before |
832 | 0 | assert(false); |
833 | 0 | } |
834 | 18 | Slice primary_key_slice(primary_key_encoded); |
835 | | |
836 | | // Lock now, since we're starting DB operations |
837 | 18 | MutexLock l(&write_mutex_); |
838 | | // check if there is already a document with the same primary key |
839 | 18 | std::string value; |
840 | 18 | Status s = DocumentDB::Get(ReadOptions(), primary_key_column_family_, |
841 | 18 | primary_key_slice, &value); |
842 | 18 | if (!s.IsNotFound()) { |
843 | 1 | return s.ok() ? STATUS(InvalidArgument, "Duplicate primary key!") : s; |
844 | 1 | } |
845 | | |
846 | 17 | batch.Put(primary_key_column_family_, primary_key_slice, encoded_document); |
847 | | |
848 | 43 | for (const auto& iter : name_to_index_) { |
849 | 43 | std::string secondary_index_key; |
850 | 43 | iter.second.index->GetIndexKey(document, &secondary_index_key); |
851 | 43 | IndexKey index_key(Slice(secondary_index_key), primary_key_slice); |
852 | 43 | batch.Put(iter.second.column_family, index_key.GetSliceParts(), |
853 | 43 | SliceParts()); |
854 | 43 | } |
855 | | |
856 | 17 | return DocumentDB::Write(options, &batch); |
857 | 17 | } |
858 | | |
859 | | virtual Status Remove(const ReadOptions& read_options, |
860 | | const WriteOptions& write_options, |
861 | 2 | const JSONDocument& query) override { |
862 | 2 | MutexLock l(&write_mutex_); |
863 | 2 | std::unique_ptr<Cursor> cursor( |
864 | 2 | ConstructFilterCursor(read_options, nullptr, query)); |
865 | | |
866 | 2 | WriteBatch batch; |
867 | 9 | for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) { |
868 | 7 | const auto& document = cursor->document(); |
869 | 7 | if (!document.IsObject()) { |
870 | 0 | return STATUS(Corruption, "Document corruption"); |
871 | 0 | } |
872 | 7 | if (!document.Contains(kPrimaryKey)) { |
873 | 0 | return STATUS(Corruption, "Document corruption"); |
874 | 0 | } |
875 | 7 | auto primary_key = document[kPrimaryKey]; |
876 | 7 | if (primary_key.IsNull() || |
877 | 7 | (!primary_key.IsString() && !primary_key.IsInt64())) { |
878 | 0 | return STATUS(Corruption, "Document corruption"); |
879 | 0 | } |
880 | | |
881 | | // TODO(icanadi) Instead of doing this, just get primary key encoding from |
882 | | // cursor, as it already has this information |
883 | 7 | std::string primary_key_encoded; |
884 | 7 | if (!EncodeJSONPrimitive(primary_key, &primary_key_encoded)) { |
885 | | // previous call should be guaranteed to pass because of all primary_key |
886 | | // conditions checked before |
887 | 0 | assert(false); |
888 | 0 | } |
889 | 7 | Slice primary_key_slice(primary_key_encoded); |
890 | 7 | batch.Delete(primary_key_column_family_, primary_key_slice); |
891 | | |
892 | 15 | for (const auto& iter : name_to_index_) { |
893 | 15 | std::string secondary_index_key; |
894 | 15 | iter.second.index->GetIndexKey(document, &secondary_index_key); |
895 | 15 | IndexKey index_key(Slice(secondary_index_key), primary_key_slice); |
896 | 15 | batch.Delete(iter.second.column_family, index_key.GetSliceParts()); |
897 | 15 | } |
898 | 7 | } |
899 | | |
900 | 2 | if (!cursor->status().ok()) { |
901 | 0 | return cursor->status(); |
902 | 0 | } |
903 | | |
904 | 2 | return DocumentDB::Write(write_options, &batch); |
905 | 2 | } |
906 | | |
907 | | virtual Status Update(const ReadOptions& read_options, |
908 | | const WriteOptions& write_options, |
909 | | const JSONDocument& filter, |
910 | 3 | const JSONDocument& updates) override { |
911 | 3 | MutexLock l(&write_mutex_); |
912 | 3 | std::unique_ptr<Cursor> cursor( |
913 | 3 | ConstructFilterCursor(read_options, nullptr, filter)); |
914 | | |
915 | 3 | if (!updates.IsObject()) { |
916 | 0 | return STATUS(Corruption, "Bad update document format"); |
917 | 0 | } |
918 | 3 | WriteBatch batch; |
919 | 9 | for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) { |
920 | 6 | const auto& old_document = cursor->document(); |
921 | 6 | JSONDocument new_document(old_document); |
922 | 6 | if (!new_document.IsObject()) { |
923 | 0 | return STATUS(Corruption, "Document corruption"); |
924 | 0 | } |
925 | | // TODO(icanadi) Make this nicer, something like class Filter |
926 | 10 | for (const auto& update : updates.Items()) { |
927 | 10 | if (update.first == "$set") { |
928 | 10 | JSONDocumentBuilder builder; |
929 | 10 | bool res __attribute__((unused)) = builder.WriteStartObject(); |
930 | 10 | assert(res); |
931 | 14 | for (const auto& itr : update.second.Items()) { |
932 | 14 | if (itr.first == kPrimaryKey) { |
933 | 0 | return STATUS(NotSupported, "Please don't change primary key"); |
934 | 0 | } |
935 | 14 | res = builder.WriteKeyValue(itr.first, itr.second); |
936 | 14 | assert(res); |
937 | 14 | } |
938 | 10 | res = builder.WriteEndObject(); |
939 | 10 | assert(res); |
940 | 10 | JSONDocument update_document = builder.GetJSONDocument(); |
941 | 10 | builder.Reset(); |
942 | 10 | res = builder.WriteStartObject(); |
943 | 10 | assert(res); |
944 | 40 | for (const auto& itr : new_document.Items()) { |
945 | 40 | if (update_document.Contains(itr.first)) { |
946 | 14 | res = builder.WriteKeyValue(itr.first, |
947 | 14 | update_document[itr.first]); |
948 | 26 | } else { |
949 | 26 | res = builder.WriteKeyValue(itr.first, new_document[itr.first]); |
950 | 26 | } |
951 | 40 | assert(res); |
952 | 40 | } |
953 | 10 | res = builder.WriteEndObject(); |
954 | 10 | assert(res); |
955 | 10 | new_document = builder.GetJSONDocument(); |
956 | 10 | assert(new_document.IsOwner()); |
957 | 0 | } else { |
958 | | // TODO(icanadi) more commands |
959 | 0 | return STATUS(InvalidArgument, "Can't understand update command"); |
960 | 0 | } |
961 | 10 | } |
962 | | |
963 | | // TODO(icanadi) reuse some of this code |
964 | 6 | if (!new_document.Contains(kPrimaryKey)) { |
965 | 0 | return STATUS(Corruption, "Corrupted document -- primary key missing"); |
966 | 0 | } |
967 | 6 | auto primary_key = new_document[kPrimaryKey]; |
968 | 6 | if (primary_key.IsNull() || |
969 | 6 | (!primary_key.IsString() && !primary_key.IsInt64())) { |
970 | | // This will happen when document on storage doesn't have primary key, |
971 | | // since we don't support any update operations on primary key. That's |
972 | | // why this is corruption error |
973 | 0 | return STATUS(Corruption, "Corrupted document -- primary key missing"); |
974 | 0 | } |
975 | 6 | std::string encoded_document; |
976 | 6 | new_document.Serialize(&encoded_document); |
977 | 6 | std::string primary_key_encoded; |
978 | 6 | if (!EncodeJSONPrimitive(primary_key, &primary_key_encoded)) { |
979 | | // previous call should be guaranteed to pass because of all primary_key |
980 | | // conditions checked before |
981 | 0 | assert(false); |
982 | 0 | } |
983 | 6 | Slice primary_key_slice(primary_key_encoded); |
984 | 6 | batch.Put(primary_key_column_family_, primary_key_slice, |
985 | 6 | encoded_document); |
986 | | |
987 | 18 | for (const auto& iter : name_to_index_) { |
988 | 18 | std::string old_key, new_key; |
989 | 18 | iter.second.index->GetIndexKey(old_document, &old_key); |
990 | 18 | iter.second.index->GetIndexKey(new_document, &new_key); |
991 | 18 | if (old_key == new_key) { |
992 | | // don't need to update this secondary index |
993 | 12 | continue; |
994 | 12 | } |
995 | | |
996 | 6 | IndexKey old_index_key(Slice(old_key), primary_key_slice); |
997 | 6 | IndexKey new_index_key(Slice(new_key), primary_key_slice); |
998 | | |
999 | 6 | batch.Delete(iter.second.column_family, old_index_key.GetSliceParts()); |
1000 | 6 | batch.Put(iter.second.column_family, new_index_key.GetSliceParts(), |
1001 | 6 | SliceParts()); |
1002 | 6 | } |
1003 | 6 | } |
1004 | | |
1005 | 3 | if (!cursor->status().ok()) { |
1006 | 0 | return cursor->status(); |
1007 | 0 | } |
1008 | | |
1009 | 3 | return DocumentDB::Write(write_options, &batch); |
1010 | 3 | } |
1011 | | |
1012 | | virtual Cursor* Query(const ReadOptions& read_options, |
1013 | 15 | const JSONDocument& query) override { |
1014 | 15 | Cursor* cursor = nullptr; |
1015 | | |
1016 | 15 | if (!query.IsArray()) { |
1017 | 0 | return new CursorError( |
1018 | 0 | STATUS(InvalidArgument, "Query has to be an array")); |
1019 | 0 | } |
1020 | | |
1021 | | // TODO(icanadi) support index "_id" |
1022 | 29 | for (size_t i = 0; i < query.Count(); ++i) { |
1023 | 14 | const auto& command_doc = query[i]; |
1024 | 14 | if (command_doc.Count() != 1) { |
1025 | | // there can be only one key-value pair in each of array elements. |
1026 | | // key is the command and value are the params |
1027 | 0 | delete cursor; |
1028 | 0 | return new CursorError(STATUS(InvalidArgument, "Invalid query")); |
1029 | 0 | } |
1030 | 14 | const auto& command = *command_doc.Items().begin(); |
1031 | | |
1032 | 14 | if (command.first == "$filter") { |
1033 | 14 | cursor = ConstructFilterCursor(read_options, cursor, command.second); |
1034 | 0 | } else { |
1035 | | // only filter is supported for now |
1036 | 0 | delete cursor; |
1037 | 0 | return new CursorError(STATUS(InvalidArgument, "Invalid query")); |
1038 | 0 | } |
1039 | 14 | } |
1040 | | |
1041 | 15 | if (cursor == nullptr) { |
1042 | 1 | cursor = new CursorFromIterator( |
1043 | 1 | DocumentDB::NewIterator(read_options, primary_key_column_family_)); |
1044 | 1 | } |
1045 | | |
1046 | 15 | return cursor; |
1047 | 15 | } |
1048 | | |
1049 | | // RocksDB functions |
1050 | | virtual Status Get(const ReadOptions& options, |
1051 | | ColumnFamilyHandle* column_family, const Slice& key, |
1052 | 0 | std::string* value) override { |
1053 | 0 | return STATUS(NotSupported, ""); |
1054 | 0 | } |
1055 | | virtual Status Get(const ReadOptions& options, const Slice& key, |
1056 | 0 | std::string* value) override { |
1057 | 0 | return STATUS(NotSupported, ""); |
1058 | 0 | } |
1059 | | virtual Status Write(const WriteOptions& options, |
1060 | 0 | WriteBatch* updates) override { |
1061 | 0 | return STATUS(NotSupported, ""); |
1062 | 0 | } |
1063 | | virtual Iterator* NewIterator(const ReadOptions& options, |
1064 | 0 | ColumnFamilyHandle* column_family) override { |
1065 | 0 | return nullptr; |
1066 | 0 | } |
1067 | 0 | Iterator* NewIterator(const ReadOptions& options) override { |
1068 | 0 | return nullptr; |
1069 | 0 | } |
1070 | | |
1071 | | private: |
1072 | | Cursor* ConstructFilterCursor(ReadOptions read_options, Cursor* cursor, |
1073 | 19 | const JSONDocument& query) { |
1074 | 19 | std::unique_ptr<const Filter> filter(Filter::ParseFilter(query)); |
1075 | 19 | if (filter.get() == nullptr) { |
1076 | 0 | return new CursorError(STATUS(InvalidArgument, "Invalid query")); |
1077 | 0 | } |
1078 | | |
1079 | 19 | IndexColumnFamily tmp_storage(nullptr, nullptr); |
1080 | | |
1081 | 19 | if (cursor == nullptr) { |
1082 | 19 | IndexColumnFamily* index_column_family = nullptr; |
1083 | 19 | if (query.Contains("$index") && query["$index"].IsString()) { |
1084 | 15 | { |
1085 | 15 | auto index_name = query["$index"]; |
1086 | 15 | MutexLock l(&name_to_index_mutex_); |
1087 | 15 | auto index_iter = name_to_index_.find(index_name.GetString()); |
1088 | 15 | if (index_iter != name_to_index_.end()) { |
1089 | 15 | tmp_storage = index_iter->second; |
1090 | 15 | index_column_family = &tmp_storage; |
1091 | 0 | } else { |
1092 | 0 | return new CursorError( |
1093 | 0 | STATUS(InvalidArgument, "Index does not exist")); |
1094 | 0 | } |
1095 | 19 | } |
1096 | 19 | } |
1097 | | |
1098 | 19 | if (index_column_family != nullptr && |
1099 | 15 | index_column_family->index->UsefulIndex(*filter.get())) { |
1100 | 15 | std::vector<Iterator*> iterators; |
1101 | 15 | Status s = DocumentDB::NewIterators( |
1102 | 15 | read_options, |
1103 | 15 | {primary_key_column_family_, index_column_family->column_family}, |
1104 | 15 | &iterators); |
1105 | 15 | if (!s.ok()) { |
1106 | 0 | delete cursor; |
1107 | 0 | return new CursorError(s); |
1108 | 0 | } |
1109 | 15 | assert(iterators.size() == 2); |
1110 | 15 | return new CursorWithFilterIndexed(iterators[0], iterators[1], |
1111 | 15 | index_column_family->index, |
1112 | 15 | filter.release()); |
1113 | 4 | } else { |
1114 | 4 | return new CursorWithFilter( |
1115 | 4 | new CursorFromIterator(DocumentDB::NewIterator( |
1116 | 4 | read_options, primary_key_column_family_)), |
1117 | 4 | filter.release()); |
1118 | 4 | } |
1119 | 0 | } else { |
1120 | 0 | return new CursorWithFilter(cursor, filter.release()); |
1121 | 0 | } |
1122 | 0 | assert(false); |
1123 | 0 | return nullptr; |
1124 | 0 | } |
1125 | | |
1126 | | // currently, we lock and serialize all writes to rocksdb. reads are not |
1127 | | // locked and always get consistent view of the database. we should optimize |
1128 | | // locking in the future |
1129 | | port::Mutex write_mutex_; |
1130 | | port::Mutex name_to_index_mutex_; |
1131 | | const char* kPrimaryKey = "_id"; |
1132 | | struct IndexColumnFamily { |
1133 | | IndexColumnFamily(Index* _index, ColumnFamilyHandle* _column_family) |
1134 | 24 | : index(_index), column_family(_column_family) {} |
1135 | | Index* index; |
1136 | | ColumnFamilyHandle* column_family; |
1137 | | }; |
1138 | | |
1139 | | |
1140 | | // name_to_index_ protected: |
1141 | | // 1) when writing -- 1. lock write_mutex_, 2. lock name_to_index_mutex_ |
1142 | | // 2) when reading -- lock name_to_index_mutex_ OR write_mutex_ |
1143 | | std::unordered_map<std::string, IndexColumnFamily> name_to_index_; |
1144 | | ColumnFamilyHandle* primary_key_column_family_; |
1145 | | Options rocksdb_options_; |
1146 | | }; |
1147 | | |
1148 | | namespace { |
1149 | 3 | Options GetRocksDBOptionsFromOptions(const DocumentDBOptions& options) { |
1150 | 3 | Options rocksdb_options; |
1151 | 3 | rocksdb_options.max_background_compactions = options.background_threads - 1; |
1152 | 3 | rocksdb_options.max_background_flushes = 1; |
1153 | 3 | rocksdb_options.write_buffer_size = options.memtable_size; |
1154 | 3 | rocksdb_options.max_write_buffer_number = 6; |
1155 | 3 | BlockBasedTableOptions table_options; |
1156 | 3 | table_options.block_cache = NewLRUCache(options.cache_size); |
1157 | 3 | rocksdb_options.table_factory.reset(NewBlockBasedTableFactory(table_options)); |
1158 | 3 | return rocksdb_options; |
1159 | 3 | } |
1160 | | } // namespace |
1161 | | |
1162 | | Status DocumentDB::Open(const DocumentDBOptions& options, |
1163 | | const std::string& name, |
1164 | | const std::vector<DocumentDB::IndexDescriptor>& indexes, |
1165 | 3 | DocumentDB** db, bool read_only) { |
1166 | 3 | Options rocksdb_options = GetRocksDBOptionsFromOptions(options); |
1167 | 3 | rocksdb_options.create_if_missing = true; |
1168 | | |
1169 | 3 | std::vector<ColumnFamilyDescriptor> column_families; |
1170 | 3 | column_families.push_back(ColumnFamilyDescriptor( |
1171 | 3 | kDefaultColumnFamilyName, ColumnFamilyOptions(rocksdb_options))); |
1172 | 1 | for (const auto& index : indexes) { |
1173 | 1 | column_families.emplace_back(InternalSecondaryIndexName(index.name), |
1174 | 1 | ColumnFamilyOptions(rocksdb_options)); |
1175 | 1 | } |
1176 | 3 | std::vector<ColumnFamilyHandle*> handles; |
1177 | 3 | DB* base_db; |
1178 | 3 | Status s; |
1179 | 3 | if (read_only) { |
1180 | 0 | s = DB::OpenForReadOnly(DBOptions(rocksdb_options), name, column_families, |
1181 | 0 | &handles, &base_db); |
1182 | 3 | } else { |
1183 | 3 | s = DB::Open(DBOptions(rocksdb_options), name, column_families, &handles, |
1184 | 3 | &base_db); |
1185 | 3 | } |
1186 | 3 | if (!s.ok()) { |
1187 | 0 | return s; |
1188 | 0 | } |
1189 | | |
1190 | 3 | std::vector<std::pair<Index*, ColumnFamilyHandle*>> index_cf(indexes.size()); |
1191 | 3 | assert(handles.size() == indexes.size() + 1); |
1192 | 4 | for (size_t i = 0; i < indexes.size(); ++i) { |
1193 | 1 | auto index = Index::CreateIndexFromDescription(*indexes[i].description, |
1194 | 1 | indexes[i].name); |
1195 | 1 | index_cf[i] = {index, handles[i + 1]}; |
1196 | 1 | } |
1197 | 3 | *db = new DocumentDBImpl(base_db, handles[0], index_cf, rocksdb_options); |
1198 | 3 | return Status::OK(); |
1199 | 3 | } |
1200 | | |
1201 | | } // namespace rocksdb |
1202 | | #endif // ROCKSDB_LITE |