/Users/deen/code/yugabyte-db/src/yb/docdb/doc_boundary_values_extractor.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/rocksdb/db/dbformat.h" |
17 | | |
18 | | #include "yb/docdb/consensus_frontier.h" |
19 | | #include "yb/docdb/doc_key.h" |
20 | | #include "yb/docdb/value_type.h" |
21 | | |
22 | | #include "yb/gutil/casts.h" |
23 | | #include "yb/util/status_format.h" |
24 | | #include "yb/util/status_log.h" |
25 | | |
26 | | namespace yb { |
27 | | namespace docdb { |
28 | | |
29 | | Status GetDocHybridTime(const rocksdb::UserBoundaryValues& values, DocHybridTime* out); |
30 | | |
31 | | Status GetPrimitiveValue(const rocksdb::UserBoundaryValues& values, |
32 | | size_t index, |
33 | | PrimitiveValue* out); |
34 | | |
35 | | namespace { |
36 | | |
37 | | constexpr rocksdb::UserBoundaryTag kDocHybridTimeTag = 1; |
38 | | // Here we reserve some tags for future use. |
39 | | // Because Tag is persistent. |
40 | | constexpr rocksdb::UserBoundaryTag kRangeComponentsStart = 10; |
41 | | |
42 | | // Wrapper for UserBoundaryValue that stores DocHybridTime. |
43 | | class DocHybridTimeValue : public rocksdb::UserBoundaryValue { |
44 | | public: |
45 | 27.2M | explicit DocHybridTimeValue(Slice slice) { |
46 | 27.2M | memcpy(buffer_, slice.data(), slice.size()); |
47 | 27.2M | encoded_ = Slice(buffer_, slice.size()); |
48 | 27.2M | } |
49 | | |
50 | 27.1M | static CHECKED_STATUS Create(Slice data, rocksdb::UserBoundaryValuePtr* value) { |
51 | 27.1M | CHECK_NOTNULL(value); |
52 | 27.1M | if (data.size() > kMaxBytesPerEncodedHybridTime) { |
53 | 0 | return STATUS_SUBSTITUTE(Corruption, "Too big encoded doc hybrid time: $0", data.size()); |
54 | 0 | } |
55 | | |
56 | 27.1M | *value = std::make_shared<DocHybridTimeValue>(data); |
57 | 27.1M | return Status::OK(); |
58 | 27.1M | } |
59 | | |
60 | 27.2M | virtual ~DocHybridTimeValue() {} |
61 | | |
62 | 177M | rocksdb::UserBoundaryTag Tag() override { |
63 | 177M | return kDocHybridTimeTag; |
64 | 177M | } |
65 | | |
66 | 86.2k | Slice Encode() override { |
67 | 86.2k | return encoded_; |
68 | 86.2k | } |
69 | | |
70 | 54.0M | int CompareTo(const UserBoundaryValue& pre_rhs) override { |
71 | 54.0M | const auto* rhs = down_cast<const DocHybridTimeValue*>(&pre_rhs); |
72 | 54.0M | return -encoded_.compare(rhs->encoded_); |
73 | 54.0M | } |
74 | | |
75 | 27.1M | CHECKED_STATUS value(DocHybridTime* out) const { |
76 | 27.1M | CHECK_NOTNULL(out); |
77 | 27.1M | DocHybridTime result; |
78 | 27.1M | RETURN_NOT_OK(result.FullyDecodeFrom(encoded_)); |
79 | 27.1M | *out = std::move(result); |
80 | 27.1M | return Status::OK(); |
81 | 27.1M | } |
82 | | |
83 | | private: |
84 | | char buffer_[kMaxBytesPerEncodedHybridTime]; |
85 | | Slice encoded_; |
86 | | }; |
87 | | |
88 | | // Wrapper for UserBoundaryValue that stores PrimitiveValue with index. |
89 | | class PrimitiveBoundaryValue : public rocksdb::UserBoundaryValue { |
90 | | public: |
91 | 14.5M | explicit PrimitiveBoundaryValue(size_t index, Slice slice) : index_(index) { |
92 | 14.5M | buffer_.assign(slice.data(), slice.end()); |
93 | 14.5M | } |
94 | | |
95 | 14.4M | static CHECKED_STATUS Create(size_t index, Slice data, rocksdb::UserBoundaryValuePtr* value) { |
96 | 14.4M | CHECK_NOTNULL(value); |
97 | | |
98 | 14.4M | *value = std::make_shared<PrimitiveBoundaryValue>(index, data); |
99 | 14.4M | return Status::OK(); |
100 | 14.4M | } |
101 | | |
102 | 14.5M | virtual ~PrimitiveBoundaryValue() {} |
103 | | |
104 | 108M | static rocksdb::UserBoundaryTag TagForIndex(size_t index) { |
105 | 108M | return static_cast<uint32_t>(kRangeComponentsStart + index); |
106 | 108M | } |
107 | | |
108 | 87.9M | rocksdb::UserBoundaryTag Tag() override { |
109 | 87.9M | return TagForIndex(index_); |
110 | 87.9M | } |
111 | | |
112 | 29.0M | Slice Encode() override { |
113 | 29.0M | return const_cast<const PrimitiveBoundaryValue*>(this)->Encode(); |
114 | 29.0M | } |
115 | | |
116 | 72.0M | Slice Encode() const { |
117 | 72.0M | return Slice(buffer_.data(), buffer_.size()); |
118 | 72.0M | } |
119 | | |
120 | 14.4M | CHECKED_STATUS value(PrimitiveValue* out) const { |
121 | 14.4M | CHECK_NOTNULL(out); |
122 | 14.4M | PrimitiveValue result; |
123 | 14.4M | Slice temp = Encode(); |
124 | 14.4M | RETURN_NOT_OK(result.DecodeFromKey(&temp)); |
125 | 14.4M | if (!temp.empty()) { |
126 | 0 | return STATUS_SUBSTITUTE(Corruption, |
127 | 0 | "Extra data left after decoding: $0, remaining: $1", |
128 | 0 | Encode().ToDebugString(), |
129 | 0 | temp.size()); |
130 | 0 | } |
131 | 14.4M | *out = std::move(result); |
132 | 14.4M | return Status::OK(); |
133 | 14.4M | } |
134 | | |
135 | 28.8M | int CompareTo(const rocksdb::UserBoundaryValue& pre_rhs) override { |
136 | 28.8M | const auto* rhs = down_cast<const PrimitiveBoundaryValue*>(&pre_rhs); |
137 | 28.8M | return Encode().compare(rhs->Encode()); |
138 | 28.8M | } |
139 | | private: |
140 | | size_t index_; // Index of corresponding range component. |
141 | | boost::container::small_vector<uint8_t, 128> buffer_; |
142 | | }; |
143 | | |
144 | | class DocBoundaryValuesExtractor : public rocksdb::BoundaryValuesExtractor { |
145 | | public: |
146 | 178 | virtual ~DocBoundaryValuesExtractor() {} |
147 | | |
148 | | Status Decode(rocksdb::UserBoundaryTag tag, |
149 | | Slice data, |
150 | 51.8k | rocksdb::UserBoundaryValuePtr* value) override { |
151 | 51.8k | CHECK_NOTNULL(value); |
152 | 51.8k | if (tag == kDocHybridTimeTag) { |
153 | 12.5k | return DocHybridTimeValue::Create(data, value); |
154 | 12.5k | } |
155 | 39.3k | if (tag >= kRangeComponentsStart) { |
156 | 39.3k | return PrimitiveBoundaryValue::Create(tag - kRangeComponentsStart, data, value); |
157 | 39.3k | } |
158 | | |
159 | 18.4E | return STATUS_SUBSTITUTE(NotFound, "Unknown tag: $0", tag); |
160 | 18.4E | } |
161 | | |
162 | 7.59M | Status Extract(Slice user_key, Slice value, rocksdb::UserBoundaryValues* values) override { |
163 | 7.59M | if (docdb::IsInternalRecordKeyType(docdb::DecodeValueType(user_key))) { |
164 | | // Skipping internal DocDB records. |
165 | 570k | return Status::OK(); |
166 | 570k | } |
167 | | |
168 | 7.02M | CHECK_NOTNULL(values); |
169 | 7.02M | boost::container::small_vector<Slice, 20> slices; |
170 | 7.02M | auto user_key_copy = user_key; |
171 | 7.02M | RETURN_NOT_OK(SubDocKey::PartiallyDecode(&user_key_copy, &slices)); |
172 | 7.02M | size_t size = slices.size(); |
173 | 7.02M | if (size == 0) { |
174 | 0 | return STATUS(Corruption, "Key does not contain hybrid time", user_key.ToDebugString()); |
175 | 0 | } |
176 | | // Last one contains Doc Hybrid Time, so number of range is less by 1. |
177 | 7.02M | --size; |
178 | | |
179 | 7.02M | rocksdb::UserBoundaryValuePtr temp; |
180 | 7.02M | RETURN_NOT_OK(DocHybridTimeValue::Create(slices.back(), &temp)); |
181 | 7.02M | values->push_back(std::move(temp)); |
182 | | |
183 | 11.6M | for (size_t i = 0; i != size; ++i) { |
184 | 4.61M | RETURN_NOT_OK(PrimitiveBoundaryValue::Create(i, slices[i], &temp)); |
185 | 4.61M | values->push_back(std::move(temp)); |
186 | 4.61M | } |
187 | | |
188 | 7.02M | DCHECK(PerformSanityCheck(user_key, slices, *values)); |
189 | | |
190 | 7.02M | return Status::OK(); |
191 | 7.02M | } |
192 | | |
193 | 18.6k | rocksdb::UserFrontierPtr CreateFrontier() override { |
194 | 18.6k | return new docdb::ConsensusFrontier(); |
195 | 18.6k | } |
196 | | |
197 | | bool PerformSanityCheck(Slice user_key, |
198 | | const boost::container::small_vector_base<Slice>& slices, |
199 | 7.02M | const rocksdb::UserBoundaryValues& values) { |
200 | 7.02M | #ifndef NDEBUG |
201 | 7.02M | SubDocKey sub_doc_key; |
202 | 7.02M | CHECK_OK(sub_doc_key.FullyDecodeFrom(user_key)); |
203 | | |
204 | 7.02M | DocHybridTime doc_ht, doc_ht2; |
205 | 7.02M | Slice temp_slice = slices.back(); |
206 | 7.02M | CHECK_OK(doc_ht.DecodeFrom(&temp_slice)); |
207 | 7.02M | CHECK(temp_slice.empty()); |
208 | 7.02M | CHECK_EQ(sub_doc_key.doc_hybrid_time(), doc_ht); |
209 | 7.02M | CHECK_OK(GetDocHybridTime(values, &doc_ht2)); |
210 | 7.02M | CHECK_EQ(doc_ht, doc_ht2); |
211 | | |
212 | 7.02M | const auto& range_group = sub_doc_key.doc_key().range_group(); |
213 | 7.02M | CHECK_EQ(range_group.size(), slices.size() - 1); |
214 | | |
215 | 11.6M | for (size_t i = 0; i != range_group.size(); ++i) { |
216 | 4.61M | PrimitiveValue primitive_value, primitive_value2; |
217 | 4.61M | temp_slice = slices[i]; |
218 | 4.61M | CHECK_OK(primitive_value.DecodeFromKey(&temp_slice)); |
219 | 4.61M | CHECK(temp_slice.empty()); |
220 | 4.61M | CHECK_EQ(range_group[i], primitive_value); |
221 | 4.61M | CHECK_OK(GetPrimitiveValue(values, i, &primitive_value2)); |
222 | 4.61M | CHECK_EQ(range_group[i], primitive_value2); |
223 | 4.61M | } |
224 | 7.02M | #endif |
225 | 7.02M | return true; |
226 | 7.02M | } |
227 | | }; |
228 | | |
229 | | } // namespace |
230 | | |
231 | 430k | std::shared_ptr<rocksdb::BoundaryValuesExtractor> DocBoundaryValuesExtractorInstance() { |
232 | 430k | static std::shared_ptr<rocksdb::BoundaryValuesExtractor> instance = |
233 | 430k | std::make_shared<DocBoundaryValuesExtractor>(); |
234 | 430k | return instance; |
235 | 430k | } |
236 | | |
237 | | // Used in tests |
238 | | Status GetPrimitiveValue(const rocksdb::UserBoundaryValues& values, |
239 | | size_t index, |
240 | 4.61M | PrimitiveValue* out) { |
241 | 4.61M | auto value = rocksdb::UserValueWithTag(values, PrimitiveBoundaryValue::TagForIndex(index)); |
242 | 4.61M | if (!value) { |
243 | 0 | return STATUS_SUBSTITUTE(NotFound, "Not found value for index $0", index); |
244 | 0 | } |
245 | 4.61M | const auto* primitive_value = down_cast<PrimitiveBoundaryValue*>(value.get()); |
246 | 4.61M | return primitive_value->value(out); |
247 | 4.61M | } |
248 | | |
249 | 7.02M | Status GetDocHybridTime(const rocksdb::UserBoundaryValues& values, DocHybridTime* out) { |
250 | 7.02M | auto value = rocksdb::UserValueWithTag(values, kDocHybridTimeTag); |
251 | 7.02M | if (!value) { |
252 | 0 | return STATUS(NotFound, "Not found value for doc hybrid time"); |
253 | 0 | } |
254 | 7.02M | const auto* time_value = down_cast<DocHybridTimeValue*>(value.get()); |
255 | 7.02M | return time_value->value(out); |
256 | 7.02M | } |
257 | | |
258 | 6.47M | rocksdb::UserBoundaryTag TagForRangeComponent(size_t index) { |
259 | 6.47M | return PrimitiveBoundaryValue::TagForIndex(index); |
260 | 6.47M | } |
261 | | |
262 | | } // namespace docdb |
263 | | } // namespace yb |