/Users/deen/code/yugabyte-db/src/yb/docdb/doc_reader_redis.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/doc_reader_redis.h" |
15 | | |
16 | | #include <string> |
17 | | #include <vector> |
18 | | |
19 | | #include "yb/common/hybrid_time.h" |
20 | | #include "yb/common/transaction.h" |
21 | | |
22 | | #include "yb/docdb/deadline_info.h" |
23 | | #include "yb/docdb/doc_key.h" |
24 | | #include "yb/docdb/doc_ttl_util.h" |
25 | | #include "yb/docdb/docdb-internal.h" |
26 | | #include "yb/docdb/docdb_rocksdb_util.h" |
27 | | #include "yb/docdb/docdb_types.h" |
28 | | #include "yb/docdb/intent_aware_iterator.h" |
29 | | #include "yb/docdb/subdocument.h" |
30 | | #include "yb/docdb/value.h" |
31 | | #include "yb/docdb/value_type.h" |
32 | | |
33 | | #include "yb/util/result.h" |
34 | | #include "yb/util/status.h" |
35 | | #include "yb/util/status_format.h" |
36 | | |
37 | | using std::vector; |
38 | | |
39 | | using yb::HybridTime; |
40 | | |
41 | | namespace yb { |
42 | | namespace docdb { |
43 | | |
44 | 4.93M | const SliceKeyBound& SliceKeyBound::Invalid() { |
45 | 4.93M | static SliceKeyBound result; |
46 | 4.93M | return result; |
47 | 4.93M | } |
48 | | |
49 | 0 | std::string SliceKeyBound::ToString() const { |
50 | 0 | if (!is_valid()) { |
51 | 0 | return "{ empty }"; |
52 | 0 | } |
53 | 0 | return Format("{ $0$1 $2 }", is_lower() ? ">" : "<", is_exclusive() ? "" : "=", |
54 | 0 | SubDocKey::DebugSliceToString(key_)); |
55 | 0 | } |
56 | | |
57 | 4.93M | const IndexBound& IndexBound::Empty() { |
58 | 4.93M | static IndexBound result; |
59 | 4.93M | return result; |
60 | 4.93M | } |
61 | | |
62 | | |
63 | | // ------------------------------------------------------------------------------------------------ |
64 | | // Standalone functions |
65 | | // ------------------------------------------------------------------------------------------------ |
66 | | |
67 | | namespace { |
68 | | |
69 | 10.8k | void SeekToLowerBound(const SliceKeyBound& lower_bound, IntentAwareIterator* iter) { |
70 | 10.8k | if (lower_bound.is_exclusive()) { |
71 | 1.76k | iter->SeekPastSubKey(lower_bound.key()); |
72 | 9.10k | } else { |
73 | 9.10k | iter->SeekForward(lower_bound.key()); |
74 | 9.10k | } |
75 | 10.8k | } |
76 | | |
77 | | // This function does not assume that object init_markers are present. If no init marker is present, |
78 | | // or if a tombstone is found at some level, it still looks for subkeys inside it if they have |
79 | | // larger timestamps. |
80 | | // |
81 | | // TODO(akashnil): ENG-1152: If object init markers were required, this read path may be optimized. |
82 | | // We look at all rocksdb keys with prefix = subdocument_key, and construct a subdocument out of |
83 | | // them, between the timestamp range high_ts and low_ts. |
84 | | // |
85 | | // The iterator is expected to be placed at the smallest key that is subdocument_key or later, and |
86 | | // after the function returns, the iterator should be placed just completely outside the |
87 | | // subdocument_key prefix. Although if high_subkey is specified, the iterator is only guaranteed |
88 | | // to be positioned after the high_subkey and not necessarily outside the subdocument_key prefix. |
89 | | // num_values_observed is used for queries on indices, and keeps track of the number of primitive |
90 | | // values observed thus far. In a query with lower index bound k, ignore the first k primitive |
91 | | // values before building the subdocument. |
92 | | CHECKED_STATUS BuildSubDocument( |
93 | | IntentAwareIterator* iter, |
94 | | const GetRedisSubDocumentData& data, |
95 | | DocHybridTime low_ts, |
96 | 2.16M | int64* num_values_observed) { |
97 | 2.16M | VLOG(3) << "BuildSubDocument data: " << data << " read_time: " << iter->read_time() |
98 | 0 | << " low_ts: " << low_ts; |
99 | 4.01M | while (iter->valid()) { |
100 | 3.96M | if (data.deadline_info && data.deadline_info->CheckAndSetDeadlinePassed()3.69M ) { |
101 | 0 | return STATUS(Expired, "Deadline for query passed."); |
102 | 0 | } |
103 | | // Since we modify num_values_observed on recursive calls, we keep a local copy of the value. |
104 | 3.96M | int64 current_values_observed = *num_values_observed; |
105 | 3.96M | auto key_data = VERIFY_RESULT(iter->FetchKey()); |
106 | 0 | auto key = key_data.key; |
107 | 3.96M | const auto write_time = key_data.write_time; |
108 | 3.96M | VLOG(4) << "iter: " << SubDocKey::DebugSliceToString(key) |
109 | 0 | << ", key: " << SubDocKey::DebugSliceToString(data.subdocument_key); |
110 | 3.96M | DCHECK(key.starts_with(data.subdocument_key)) |
111 | 0 | << "iter: " << SubDocKey::DebugSliceToString(key) |
112 | 0 | << ", key: " << SubDocKey::DebugSliceToString(data.subdocument_key); |
113 | | |
114 | | // Key could be invalidated because we could move iterator, so back it up. |
115 | 3.96M | KeyBytes key_copy(key); |
116 | 3.96M | key = key_copy.AsSlice(); |
117 | 3.96M | rocksdb::Slice value = iter->value(); |
118 | | // Checking that IntentAwareIterator returns an entry with correct time. |
119 | 3.96M | DCHECK(key_data.same_transaction || |
120 | 0 | iter->read_time().global_limit >= write_time.hybrid_time()) |
121 | 0 | << "Bad key: " << SubDocKey::DebugSliceToString(key) |
122 | 0 | << ", global limit: " << iter->read_time().global_limit |
123 | 0 | << ", write time: " << write_time.hybrid_time(); |
124 | | |
125 | 3.96M | if (low_ts > write_time) { |
126 | 35 | VLOG(3) << "SeekPastSubKey: " << SubDocKey::DebugSliceToString(key)0 ; |
127 | 35 | iter->SeekPastSubKey(key); |
128 | 35 | continue; |
129 | 35 | } |
130 | 3.96M | Value doc_value; |
131 | 3.96M | RETURN_NOT_OK(doc_value.Decode(value)); |
132 | 3.96M | ValueType value_type = doc_value.value_type(); |
133 | 3.96M | if (key == data.subdocument_key) { |
134 | 2.12M | if (write_time == DocHybridTime::kMin) |
135 | 0 | return STATUS(Corruption, "No hybrid timestamp found on entry"); |
136 | | |
137 | | // We may need to update the TTL in individual columns. |
138 | 2.12M | if (write_time.hybrid_time() >= data.exp.write_ht) { |
139 | | // We want to keep the default TTL otherwise. |
140 | 2.12M | if (doc_value.ttl() != ValueControlFields::kMaxTtl) { |
141 | 134 | data.exp.write_ht = write_time.hybrid_time(); |
142 | 134 | data.exp.ttl = doc_value.ttl(); |
143 | 2.12M | } else if (data.exp.ttl.IsNegative()) { |
144 | 658 | data.exp.ttl = -data.exp.ttl; |
145 | 658 | } |
146 | 2.12M | } |
147 | | |
148 | | // If the hybrid time is kMin, then we must be using default TTL. |
149 | 2.12M | if (data.exp.write_ht == HybridTime::kMin) { |
150 | 280k | data.exp.write_ht = write_time.hybrid_time(); |
151 | 280k | } |
152 | | |
153 | | // Treat an expired value as a tombstone written at the same time as the original value. |
154 | 2.12M | if (HasExpiredTTL(data.exp.write_ht, data.exp.ttl, iter->read_time().read)) { |
155 | 84 | doc_value = Value::Tombstone(); |
156 | 84 | value_type = ValueType::kTombstone; |
157 | 84 | } |
158 | | |
159 | 2.12M | const bool is_collection = IsCollectionType(value_type); |
160 | | // We have found some key that matches our entire subdocument_key, i.e. we didn't skip ahead |
161 | | // to a lower level key (with optional object init markers). |
162 | 2.12M | if (is_collection || value_type == ValueType::kTombstone2.11M ) { |
163 | 12.2k | if (low_ts < write_time) { |
164 | 473 | low_ts = write_time; |
165 | 473 | } |
166 | 12.2k | if (is_collection) { |
167 | 11.4k | *data.result = SubDocument(value_type); |
168 | 11.4k | } |
169 | | |
170 | | // If the subkey lower bound filters out the key we found, we want to skip to the lower |
171 | | // bound. If it does not, we want to seek to the next key. This prevents an infinite loop |
172 | | // where the iterator keeps seeking to itself if the key we found matches the low subkey. |
173 | | // TODO: why are not we doing this for arrays? |
174 | 12.2k | if (IsObjectType(value_type) && !data.low_subkey->CanInclude(key)11.4k ) { |
175 | | // Try to seek to the low_subkey for efficiency. |
176 | 10.8k | SeekToLowerBound(*data.low_subkey, iter); |
177 | 10.8k | } else { |
178 | 1.39k | VLOG(3) << "SeekPastSubKey: " << SubDocKey::DebugSliceToString(key)0 ; |
179 | 1.39k | iter->SeekPastSubKey(key); |
180 | 1.39k | } |
181 | 12.2k | continue; |
182 | 2.11M | } else if (IsPrimitiveValueType(value_type)) { |
183 | | // Choose the user supplied timestamp if present. |
184 | 2.11M | const UserTimeMicros user_timestamp = doc_value.user_timestamp(); |
185 | 2.11M | doc_value.mutable_primitive_value()->SetWriteTime( |
186 | 2.11M | user_timestamp == ValueControlFields::kInvalidUserTimestamp |
187 | 2.11M | ? write_time.hybrid_time().GetPhysicalValueMicros() |
188 | 2.11M | : doc_value.user_timestamp()0 ); |
189 | 2.11M | if (!data.high_index->CanInclude(current_values_observed)) { |
190 | 20 | iter->SeekOutOfSubDoc(&key_copy); |
191 | 20 | return Status::OK(); |
192 | 20 | } |
193 | 2.11M | if (data.low_index->CanInclude(*num_values_observed)) { |
194 | 2.11M | *data.result = SubDocument(doc_value.primitive_value()); |
195 | 2.11M | } |
196 | 2.11M | (*num_values_observed)++; |
197 | 2.11M | VLOG(3) << "SeekOutOfSubDoc: " << SubDocKey::DebugSliceToString(key)0 ; |
198 | 2.11M | iter->SeekOutOfSubDoc(&key_copy); |
199 | 2.11M | return Status::OK(); |
200 | 2.11M | } else { |
201 | 0 | return STATUS_FORMAT(Corruption, "Expected primitive value type, got $0", value_type); |
202 | 0 | } |
203 | 2.12M | } |
204 | 1.84M | SubDocument descendant{PrimitiveValue(ValueType::kInvalid)}; |
205 | | // TODO: what if the key we found is the same as before? |
206 | | // We'll get into an infinite recursion then. |
207 | 1.84M | { |
208 | 1.84M | IntentAwareIteratorPrefixScope prefix_scope(key, iter); |
209 | 1.84M | RETURN_NOT_OK(BuildSubDocument( |
210 | 1.84M | iter, data.Adjusted(key, &descendant), low_ts, |
211 | 1.84M | num_values_observed)); |
212 | | |
213 | 1.84M | } |
214 | 1.84M | if (descendant.value_type() == ValueType::kInvalid) { |
215 | | // The document was not found in this level (maybe a tombstone was encountered). |
216 | 143 | continue; |
217 | 143 | } |
218 | | |
219 | 1.84M | if (!data.low_subkey->CanInclude(key)) { |
220 | 11 | VLOG(3) << "Filtered by low_subkey: " << data.low_subkey->ToString() |
221 | 0 | << ", key: " << SubDocKey::DebugSliceToString(key); |
222 | | // The value provided is lower than what we are looking for, seek to the lower bound. |
223 | 11 | SeekToLowerBound(*data.low_subkey, iter); |
224 | 11 | continue; |
225 | 11 | } |
226 | | |
227 | | // We use num_values_observed as a conservative figure for lower bound and |
228 | | // current_values_observed for upper bound so we don't lose any data we should be including. |
229 | 1.84M | if (!data.low_index->CanInclude(*num_values_observed)) { |
230 | 42 | continue; |
231 | 42 | } |
232 | | |
233 | 1.84M | if (!data.high_subkey->CanInclude(key)) { |
234 | 4.91k | VLOG(3) << "Filtered by high_subkey: " << data.high_subkey->ToString() |
235 | 0 | << ", key: " << SubDocKey::DebugSliceToString(key); |
236 | | // We have encountered a subkey higher than our constraints, we should stop here. |
237 | 4.91k | return Status::OK(); |
238 | 4.91k | } |
239 | | |
240 | 1.83M | if (!data.high_index->CanInclude(current_values_observed)) { |
241 | 20 | return Status::OK(); |
242 | 20 | } |
243 | | |
244 | 1.83M | if (!IsObjectType(data.result->value_type())) { |
245 | 14 | *data.result = SubDocument(); |
246 | 14 | } |
247 | | |
248 | 1.83M | SubDocument* current = data.result; |
249 | 1.83M | size_t num_children; |
250 | 1.83M | RETURN_NOT_OK(current->NumChildren(&num_children)); |
251 | 1.83M | if (data.limit != 0 && num_children >= data.limit1.17M ) { |
252 | | // We have processed enough records. |
253 | 1.44k | return Status::OK(); |
254 | 1.44k | } |
255 | | |
256 | 1.83M | if (data.count_only) { |
257 | | // We need to only count the records that we found. |
258 | 134 | data.record_count++; |
259 | 1.83M | } else { |
260 | 1.83M | Slice temp = key; |
261 | 1.83M | temp.remove_prefix(data.subdocument_key.size()); |
262 | 1.83M | for (;;) { |
263 | 1.83M | PrimitiveValue child; |
264 | 1.83M | RETURN_NOT_OK(child.DecodeFromKey(&temp)); |
265 | 1.83M | if (temp.empty()) { |
266 | 1.83M | current->SetChild(child, std::move(descendant)); |
267 | 1.83M | break; |
268 | 1.83M | } |
269 | 75 | current = current->GetOrAddChild(child).first; |
270 | 75 | } |
271 | 1.83M | } |
272 | 1.83M | } |
273 | | |
274 | 50.4k | return Status::OK(); |
275 | 2.16M | } |
276 | | |
277 | | // If there is a key equal to key_bytes_without_ht + some timestamp, which is later than |
278 | | // max_overwrite_time, we update max_overwrite_time, and result_value (unless it is nullptr). |
279 | | // If there is a TTL with write time later than the write time in expiration, it is updated with |
280 | | // the new write time and TTL, unless its value is kMaxTTL. |
281 | | // When the TTL found is kMaxTTL and it is not a merge record, then it is assumed not to be |
282 | | // explicitly set. Because it does not override the default table ttl, exp, which was initialized |
283 | | // to the table ttl, is not updated. |
284 | | // Observe that exp updates based on the first record found, while max_overwrite_time updates |
285 | | // based on the first non-merge record found. |
286 | | // This should not be used for leaf nodes. - Why? Looks like it is already used for leaf nodes |
287 | | // also. |
288 | | // Note: it is responsibility of caller to make sure key_bytes_without_ht doesn't have hybrid |
289 | | // time. |
290 | | // TODO: We could also check that the value is kTombStone or kObject type for sanity checking - ? |
291 | | // It could be a simple value as well, not necessarily kTombstone or kObject. |
292 | | Status FindLastWriteTime( |
293 | | IntentAwareIterator* iter, |
294 | | const Slice& key_without_ht, |
295 | | DocHybridTime* max_overwrite_time, |
296 | | Expiration* exp, |
297 | 892k | Value* result_value = nullptr) { |
298 | 892k | Slice value; |
299 | 892k | DocHybridTime doc_ht = *max_overwrite_time; |
300 | 892k | RETURN_NOT_OK(iter->FindLatestRecord(key_without_ht, &doc_ht, &value)); |
301 | 892k | if (!iter->valid()) { |
302 | 151k | return Status::OK(); |
303 | 151k | } |
304 | | |
305 | 741k | auto value_copy = value; |
306 | 741k | auto control_fields = VERIFY_RESULT(ValueControlFields::Decode(&value_copy)); |
307 | 0 | auto value_type = DecodeValueType(value_copy); |
308 | 741k | if (value_type == ValueType::kInvalid) { |
309 | 200k | return Status::OK(); |
310 | 200k | } |
311 | | |
312 | | // We update the expiration if and only if the write time is later than the write time |
313 | | // currently stored in expiration, and the record is not a regular record with default TTL. |
314 | | // This is done independently of whether the row is a TTL row. |
315 | | // In the case that the always_override flag is true, default TTL will not be preserved. |
316 | 541k | Expiration new_exp = *exp; |
317 | 541k | if (doc_ht.hybrid_time() >= exp->write_ht) { |
318 | | // We want to keep the default TTL otherwise. |
319 | 541k | if (control_fields.ttl != ValueControlFields::kMaxTtl || |
320 | 541k | control_fields.merge_flags == ValueControlFields::kTtlFlag541k || |
321 | 541k | exp->always_override541k ) { |
322 | 2.84k | new_exp.write_ht = doc_ht.hybrid_time(); |
323 | 2.84k | new_exp.ttl = control_fields.ttl; |
324 | 538k | } else if (exp->ttl.IsNegative()) { |
325 | 0 | new_exp.ttl = -new_exp.ttl; |
326 | 0 | } |
327 | 541k | } |
328 | | |
329 | | // If we encounter a TTL row, we assign max_overwrite_time to be the write time of the |
330 | | // original value/init marker. |
331 | 541k | if (control_fields.merge_flags == ValueControlFields::kTtlFlag) { |
332 | 180 | DocHybridTime new_ht; |
333 | 180 | RETURN_NOT_OK(iter->NextFullValue(&new_ht, &value)); |
334 | | |
335 | | // There could be a case where the TTL row exists, but the value has been |
336 | | // compacted away. Then, it is treated as a Tombstone written at the time |
337 | | // of the TTL row. |
338 | 180 | if (!iter->valid() && !new_exp.ttl.IsNegative()0 ) { |
339 | 0 | new_exp.ttl = -new_exp.ttl; |
340 | 180 | } else { |
341 | 180 | RETURN_NOT_OK(Value::DecodePrimitiveValueType(value)); |
342 | | // Because we still do not know whether we are seeking something expired, |
343 | | // we must take the max_overwrite_time as if the value were not expired. |
344 | 180 | doc_ht = new_ht; |
345 | 180 | } |
346 | 180 | } |
347 | | |
348 | 541k | if ((value_type == ValueType::kTombstone || value_type == ValueType::kInvalid540k ) && |
349 | 541k | !new_exp.ttl.IsNegative()684 ) { |
350 | 684 | new_exp.ttl = -new_exp.ttl; |
351 | 684 | } |
352 | 541k | *exp = new_exp; |
353 | | |
354 | 541k | if (doc_ht > *max_overwrite_time) { |
355 | 541k | *max_overwrite_time = doc_ht; |
356 | 541k | VLOG(4) << "Max overwritten time for " << key_without_ht.ToDebugHexString() << ": " |
357 | 0 | << *max_overwrite_time; |
358 | 541k | } |
359 | | |
360 | 541k | if (result_value) { |
361 | 539k | RETURN_NOT_OK(result_value->Decode(value)); |
362 | 539k | } |
363 | | |
364 | 541k | return Status::OK(); |
365 | 541k | } |
366 | | |
367 | | } // namespace |
368 | | |
369 | | yb::Status GetRedisSubDocument( |
370 | | const DocDB& doc_db, |
371 | | const GetRedisSubDocumentData& data, |
372 | | const rocksdb::QueryId query_id, |
373 | | const TransactionOperationContext& txn_op_context, |
374 | | CoarseTimePoint deadline, |
375 | 221k | const ReadHybridTime& read_time) { |
376 | 221k | auto iter = CreateIntentAwareIterator( |
377 | 221k | doc_db, BloomFilterMode::USE_BLOOM_FILTER, data.subdocument_key, query_id, |
378 | 221k | txn_op_context, deadline, read_time); |
379 | 221k | return GetRedisSubDocument(iter.get(), data, nullptr /* projection */, SeekFwdSuffices::kFalse); |
380 | 221k | } |
381 | | |
382 | | yb::Status GetRedisSubDocument( |
383 | | IntentAwareIterator *db_iter, |
384 | | const GetRedisSubDocumentData& data, |
385 | | const vector<PrimitiveValue>* projection, |
386 | 627k | const SeekFwdSuffices seek_fwd_suffices) { |
387 | | // TODO(dtxn) scan through all involved transactions first to cache statuses in a batch, |
388 | | // so during building subdocument we don't need to request them one by one. |
389 | | // TODO(dtxn) we need to restart read with scan_ht = commit_ht if some transaction was committed |
390 | | // at time commit_ht within [scan_ht; read_request_time + max_clock_skew). Also we need |
391 | | // to wait until time scan_ht = commit_ht passed. |
392 | | // TODO(dtxn) for each scanned key (and its subkeys) we need to avoid *new* values committed at |
393 | | // ht <= scan_ht (or just ht < scan_ht?) |
394 | | // Question: what will break if we allow later commit at ht <= scan_ht ? Need to write down |
395 | | // detailed example. |
396 | 627k | *data.doc_found = false; |
397 | 627k | DOCDB_DEBUG_LOG("GetRedisSubDocument for key $0 @ $1", data.subdocument_key.ToDebugHexString(), |
398 | 627k | db_iter->read_time().ToString()); |
399 | | |
400 | | // The latest time at which any prefix of the given key was overwritten. |
401 | 627k | DocHybridTime max_overwrite_ht(DocHybridTime::kMin); |
402 | 627k | VLOG(4) << "GetRedisSubDocument(" << data << ")"0 ; |
403 | | |
404 | 627k | SubDocKey found_subdoc_key; |
405 | 627k | auto dockey_size = |
406 | 627k | VERIFY_RESULT(DocKey::EncodedSize(data.subdocument_key, DocKeyPart::kWholeDocKey)); |
407 | | |
408 | 0 | Slice key_slice(data.subdocument_key.data(), dockey_size); |
409 | | |
410 | | // First, check the descendants of the ID level for TTL or more recent writes. |
411 | 627k | IntentAwareIteratorPrefixScope prefix_scope(key_slice, db_iter); |
412 | 627k | if (seek_fwd_suffices) { |
413 | 0 | db_iter->SeekForward(key_slice); |
414 | 627k | } else { |
415 | 627k | db_iter->Seek(key_slice); |
416 | 627k | } |
417 | 627k | { |
418 | 627k | auto temp_key = data.subdocument_key; |
419 | 627k | temp_key.remove_prefix(dockey_size); |
420 | 892k | for (;;) { |
421 | 892k | auto decode_result = VERIFY_RESULT(SubDocKey::DecodeSubkey(&temp_key)); |
422 | 892k | if (!decode_result) { |
423 | 627k | break; |
424 | 627k | } |
425 | 265k | RETURN_NOT_OK(FindLastWriteTime(db_iter, key_slice, &max_overwrite_ht, &data.exp)); |
426 | 265k | key_slice = Slice(key_slice.data(), temp_key.data() - key_slice.data()); |
427 | 265k | } |
428 | 627k | } |
429 | | |
430 | | // By this point, key_slice is the DocKey and all the subkeys of subdocument_key. Check for |
431 | | // init-marker / tombstones at the top level; update max_overwrite_ht. |
432 | 627k | Value doc_value = Value(PrimitiveValue(ValueType::kInvalid)); |
433 | 627k | RETURN_NOT_OK(FindLastWriteTime(db_iter, key_slice, &max_overwrite_ht, &data.exp, &doc_value)); |
434 | | |
435 | 627k | const ValueType value_type = doc_value.value_type(); |
436 | | |
437 | 627k | if (data.return_type_only) { |
438 | 301k | *data.doc_found = value_type != ValueType::kInvalid && |
439 | 301k | !data.exp.ttl.IsNegative()258k ; |
440 | | // Check for expiration. |
441 | 301k | if (*data.doc_found && max_overwrite_ht != DocHybridTime::kMin258k ) { |
442 | 258k | *data.doc_found = |
443 | 258k | !HasExpiredTTL(data.exp.write_ht, data.exp.ttl, db_iter->read_time().read); |
444 | 258k | } |
445 | 301k | if (*data.doc_found) { |
446 | | // Observe that this will have the right type but not necessarily the right value. |
447 | 258k | *data.result = SubDocument(doc_value.primitive_value()); |
448 | 258k | } |
449 | 301k | return Status::OK(); |
450 | 301k | } |
451 | | |
452 | 326k | if (projection == nullptr) { |
453 | 326k | *data.result = SubDocument(ValueType::kInvalid); |
454 | 326k | int64 num_values_observed = 0; |
455 | 326k | IntentAwareIteratorPrefixScope prefix_scope(key_slice, db_iter); |
456 | 326k | RETURN_NOT_OK(BuildSubDocument(db_iter, data, max_overwrite_ht, |
457 | 326k | &num_values_observed)); |
458 | 326k | *data.doc_found = data.result->value_type() != ValueType::kInvalid; |
459 | 326k | if (*data.doc_found) { |
460 | 280k | if (value_type == ValueType::kRedisSet) { |
461 | 6 | RETURN_NOT_OK(data.result->ConvertToRedisSet()); |
462 | 280k | } else if (value_type == ValueType::kRedisTS) { |
463 | 10.8k | RETURN_NOT_OK(data.result->ConvertToRedisTS()); |
464 | 269k | } else if (value_type == ValueType::kRedisSortedSet) { |
465 | 12 | RETURN_NOT_OK(data.result->ConvertToRedisSortedSet()); |
466 | 269k | } else if (value_type == ValueType::kRedisList) { |
467 | 0 | RETURN_NOT_OK(data.result->ConvertToRedisList()); |
468 | 0 | } |
469 | 280k | } |
470 | 326k | return Status::OK(); |
471 | 326k | } |
472 | | // Seed key_bytes with the subdocument key. For each subkey in the projection, build subdocument |
473 | | // and reuse key_bytes while appending the subkey. |
474 | 0 | *data.result = SubDocument(); |
475 | 0 | KeyBytes key_bytes; |
476 | | // Preallocate some extra space to avoid allocation for small subkeys. |
477 | 0 | key_bytes.Reserve(data.subdocument_key.size() + kMaxBytesPerEncodedHybridTime + 32); |
478 | 0 | key_bytes.AppendRawBytes(data.subdocument_key); |
479 | 0 | const size_t subdocument_key_size = key_bytes.size(); |
480 | 0 | for (const PrimitiveValue& subkey : *projection) { |
481 | | // Append subkey to subdocument key. Reserve extra kMaxBytesPerEncodedHybridTime + 1 bytes in |
482 | | // key_bytes to avoid the internal buffer from getting reallocated and moved by SeekForward() |
483 | | // appending the hybrid time, thereby invalidating the buffer pointer saved by prefix_scope. |
484 | 0 | subkey.AppendToKey(&key_bytes); |
485 | 0 | key_bytes.Reserve(key_bytes.size() + kMaxBytesPerEncodedHybridTime + 1); |
486 | | // This seek is to initialize the iterator for BuildSubDocument call. |
487 | 0 | IntentAwareIteratorPrefixScope prefix_scope(key_bytes, db_iter); |
488 | 0 | db_iter->SeekForward(&key_bytes); |
489 | 0 | SubDocument descendant(ValueType::kInvalid); |
490 | 0 | int64 num_values_observed = 0; |
491 | 0 | RETURN_NOT_OK(BuildSubDocument( |
492 | 0 | db_iter, data.Adjusted(key_bytes, &descendant), max_overwrite_ht, |
493 | 0 | &num_values_observed)); |
494 | 0 | *data.doc_found = descendant.value_type() != ValueType::kInvalid; |
495 | 0 | data.result->SetChild(subkey, std::move(descendant)); |
496 | | |
497 | | // Restore subdocument key by truncating the appended subkey. |
498 | 0 | key_bytes.Truncate(subdocument_key_size); |
499 | 0 | } |
500 | | // Make sure the iterator is placed outside the whole document in the end. |
501 | 0 | key_bytes.Truncate(dockey_size); |
502 | 0 | db_iter->SeekOutOfSubDoc(&key_bytes); |
503 | 0 | return Status::OK(); |
504 | 0 | } |
505 | | |
506 | 0 | std::string GetRedisSubDocumentData::ToString() const { |
507 | 0 | return Format("{ subdocument_key: $0 exp.ttl: $1 exp.write_time: $2 return_type_only: $3 " |
508 | 0 | "low_subkey: $4 high_subkey: $5 }", |
509 | 0 | SubDocKey::DebugSliceToString(subdocument_key), exp.ttl, |
510 | 0 | exp.write_ht, return_type_only, low_subkey, high_subkey); |
511 | 0 | } |
512 | | |
513 | | |
514 | | } // namespace docdb |
515 | | } // namespace yb |