/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/merge_helper.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #include "yb/rocksdb/db/merge_helper.h" |
22 | | |
23 | | #include <stdio.h> |
24 | | |
25 | | #include <string> |
26 | | |
27 | | #include "yb/rocksdb/compaction_filter.h" |
28 | | #include "yb/rocksdb/comparator.h" |
29 | | #include "yb/rocksdb/merge_operator.h" |
30 | | #include "yb/rocksdb/table/internal_iterator.h" |
31 | | #include "yb/rocksdb/util/perf_context_imp.h" |
32 | | #include "yb/rocksdb/util/statistics.h" |
33 | | |
34 | | #include "yb/util/stats/perf_step_timer.h" |
35 | | |
36 | | namespace rocksdb { |
37 | | |
38 | | // TODO(agiardullo): Clean up merge callsites to use this func |
39 | | Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value, |
40 | | const std::deque<std::string>& operands, |
41 | | const MergeOperator* merge_operator, |
42 | | Statistics* statistics, Env* env, |
43 | 40.1k | Logger* logger, std::string* result) { |
44 | 40.1k | if (operands.size() == 0) { |
45 | 0 | result->assign(value->cdata(), value->size()); |
46 | 0 | return Status::OK(); |
47 | 0 | } |
48 | | |
49 | 40.1k | if (merge_operator == nullptr) { |
50 | 0 | return STATUS(NotSupported, "Provide a merge_operator when opening DB"); |
51 | 0 | } |
52 | | |
53 | | // Setup to time the merge |
54 | 40.1k | StopWatchNano timer(env, statistics != nullptr); |
55 | 40.1k | PERF_TIMER_GUARD(merge_operator_time_nanos); |
56 | | |
57 | | // Do the merge |
58 | 40.1k | bool success = |
59 | 40.1k | merge_operator->FullMerge(key, value, operands, result, logger); |
60 | | |
61 | 40.1k | RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanosSafe()); |
62 | | |
63 | 40.1k | if (!success) { |
64 | 0 | RecordTick(statistics, NUMBER_MERGE_FAILURES); |
65 | 0 | return STATUS(Corruption, "Error: Could not perform merge."); |
66 | 0 | } |
67 | | |
68 | 40.1k | return Status::OK(); |
69 | 40.1k | } |
70 | | |
71 | | // PRE: iter points to the first merge type entry |
72 | | // POST: iter points to the first entry beyond the merge process (or the end) |
73 | | // keys_, operands_ are updated to reflect the merge result. |
74 | | // keys_ stores the list of keys encountered while merging. |
75 | | // operands_ stores the list of merge operands encountered while merging. |
76 | | // keys_[i] corresponds to operands_[i] for each i. |
77 | | Status MergeHelper::MergeUntil(InternalIterator* iter, |
78 | | const SequenceNumber stop_before, |
79 | 131k | const bool at_bottom) { |
80 | | // Get a copy of the internal key, before it's invalidated by iter->Next() |
81 | | // Also maintain the list of merge operands seen. |
82 | 131k | assert(HasOperator()); |
83 | 131k | keys_.clear(); |
84 | 131k | operands_.clear(); |
85 | 131k | assert(user_merge_operator_); |
86 | 131k | bool first_key = true; |
87 | | |
88 | | // We need to parse the internal key again as the parsed key is |
89 | | // backed by the internal key! |
90 | | // Assume no internal key corruption as it has been successfully parsed |
91 | | // by the caller. |
92 | | // original_key_is_iter variable is just caching the information: |
93 | | // original_key_is_iter == (iter->key().ToString() == original_key) |
94 | 131k | bool original_key_is_iter = true; |
95 | 131k | std::string original_key = iter->key().ToString(); |
96 | | // Important: |
97 | | // orig_ikey is backed by original_key if keys_.empty() |
98 | | // orig_ikey is backed by keys_.back() if !keys_.empty() |
99 | 131k | ParsedInternalKey orig_ikey; |
100 | 131k | ParseInternalKey(original_key, &orig_ikey); |
101 | | |
102 | 131k | Status s; |
103 | 131k | bool hit_the_next_user_key = false; |
104 | 424k | for (; iter->Valid(); iter->Next(), original_key_is_iter = false) { |
105 | 418k | ParsedInternalKey ikey; |
106 | 418k | assert(keys_.size() == operands_.size()); |
107 | | |
108 | 418k | if (!ParseInternalKey(iter->key(), &ikey)) { |
109 | | // stop at corrupted key |
110 | 0 | if (assert_valid_internal_key_) { |
111 | 0 | assert(!"Corrupted internal key not expected."); |
112 | 0 | return STATUS(Corruption, "Corrupted internal key not expected."); |
113 | 0 | } |
114 | 0 | break; |
115 | 418k | } else if (first_key) { |
116 | 131k | assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)); |
117 | 131k | first_key = false; |
118 | 286k | } else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) { |
119 | | // hit a different user key, stop right here |
120 | 109k | hit_the_next_user_key = true; |
121 | 109k | break; |
122 | 177k | } else if (stop_before && ikey.sequence <= stop_before) { |
123 | | // hit an entry that's visible by the previous snapshot, can't touch that |
124 | 26 | break; |
125 | 26 | } |
126 | | |
127 | | // At this point we are guaranteed that we need to process this key. |
128 | | |
129 | 308k | assert(IsValueType(ikey.type)); |
130 | 308k | if (ikey.type != kTypeMerge) { |
131 | 16.3k | if (ikey.type != kTypeValue && ikey.type != kTypeDeletion) { |
132 | | // Merges operands can only be used with puts and deletions, single |
133 | | // deletions are not supported. |
134 | 0 | assert(false); |
135 | | // release build doesn't have asserts, so we return error status |
136 | 0 | return STATUS(InvalidArgument, |
137 | 0 | " Merges operands can only be used with puts and deletions, single " |
138 | 0 | "deletions are not supported."); |
139 | 0 | } |
140 | | |
141 | | // hit a put/delete |
142 | | // => merge the put value or a nullptr with operands_ |
143 | | // => store result in operands_.back() (and update keys_.back()) |
144 | | // => change the entry type to kTypeValue for keys_.back() |
145 | | // We are done! Success! |
146 | | |
147 | | // If there are no operands, just return the Status::OK(). That will cause |
148 | | // the compaction iterator to write out the key we're currently at, which |
149 | | // is the put/delete we just encountered. |
150 | 16.3k | if (keys_.empty()) { |
151 | 1 | return Status::OK(); |
152 | 1 | } |
153 | | |
154 | | // TODO(noetzli) If the merge operator returns false, we are currently |
155 | | // (almost) silently dropping the put/delete. That's probably not what we |
156 | | // want. |
157 | 16.3k | const Slice val = iter->value(); |
158 | 13.8k | const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr; |
159 | 16.3k | std::string merge_result; |
160 | 16.3k | s = TimedFullMerge(ikey.user_key, val_ptr, operands_, |
161 | 16.3k | user_merge_operator_, stats_, env_, logger_, |
162 | 16.3k | &merge_result); |
163 | | |
164 | | // We store the result in keys_.back() and operands_.back() |
165 | | // if nothing went wrong (i.e.: no operand corruption on disk) |
166 | 16.3k | if (s.ok()) { |
167 | | // The original key encountered |
168 | 16.3k | original_key = std::move(keys_.back()); |
169 | 16.3k | orig_ikey.type = kTypeValue; |
170 | 16.3k | UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); |
171 | 16.3k | keys_.clear(); |
172 | 16.3k | operands_.clear(); |
173 | 16.3k | keys_.emplace_front(std::move(original_key)); |
174 | 16.3k | operands_.emplace_front(std::move(merge_result)); |
175 | 16.3k | } |
176 | | |
177 | | // move iter to the next entry |
178 | 16.3k | iter->Next(); |
179 | 16.3k | return s; |
180 | 292k | } else { |
181 | | // hit a merge |
182 | | // => if there is a compaction filter, apply it. |
183 | | // => merge the operand into the front of the operands_ list |
184 | | // if not filtered |
185 | | // => then continue because we haven't yet seen a Put/Delete. |
186 | | // |
187 | | // Keep queuing keys and operands until we either meet a put / delete |
188 | | // request or later did a partial merge. |
189 | | |
190 | 292k | Slice value_slice = iter->value(); |
191 | | // add an operand to the list if: |
192 | | // 1) it's included in one of the snapshots. in that case we *must* write |
193 | | // it out, no matter what compaction filter says |
194 | | // 2) it's not filtered by a compaction filter |
195 | 292k | if (ikey.sequence <= latest_snapshot_ || |
196 | 292k | !FilterMerge(orig_ikey.user_key, value_slice)) { |
197 | 292k | if (original_key_is_iter) { |
198 | | // this is just an optimization that saves us one memcpy |
199 | 131k | keys_.push_front(std::move(original_key)); |
200 | 160k | } else { |
201 | 160k | keys_.push_front(iter->key().ToString()); |
202 | 160k | } |
203 | 292k | if (keys_.size() == 1) { |
204 | | // we need to re-anchor the orig_ikey because it was anchored by |
205 | | // original_key before |
206 | 131k | ParseInternalKey(keys_.back(), &orig_ikey); |
207 | 131k | } |
208 | 292k | operands_.push_front(value_slice.ToString()); |
209 | 292k | } |
210 | 292k | } |
211 | 308k | } |
212 | | |
213 | 115k | if (operands_.size() == 0) { |
214 | | // we filtered out all the merge operands |
215 | 4 | return Status::OK(); |
216 | 4 | } |
217 | | |
218 | | // We are sure we have seen this key's entire history if we are at the |
219 | | // last level and exhausted all internal keys of this user key. |
220 | | // NOTE: !iter->Valid() does not necessarily mean we hit the |
221 | | // beginning of a user key, as versions of a user key might be |
222 | | // split into multiple files (even files on the same level) |
223 | | // and some files might not be included in the compaction/merge. |
224 | | // |
225 | | // There are also cases where we have seen the root of history of this |
226 | | // key without being sure of it. Then, we simply miss the opportunity |
227 | | // to combine the keys. Since VersionSet::SetupOtherInputs() always makes |
228 | | // sure that all merge-operands on the same level get compacted together, |
229 | | // this will simply lead to these merge operands moving to the next level. |
230 | | // |
231 | | // So, we only perform the following logic (to merge all operands together |
232 | | // without a Put/Delete) if we are certain that we have seen the end of key. |
233 | 115k | bool surely_seen_the_beginning = hit_the_next_user_key && at_bottom; |
234 | 115k | if (surely_seen_the_beginning) { |
235 | | // do a final merge with nullptr as the existing value and say |
236 | | // bye to the merge type (it's now converted to a Put) |
237 | 23.8k | assert(kTypeMerge == orig_ikey.type); |
238 | 23.8k | assert(operands_.size() >= 1); |
239 | 23.8k | assert(operands_.size() == keys_.size()); |
240 | 23.8k | std::string merge_result; |
241 | 23.8k | s = TimedFullMerge(orig_ikey.user_key, nullptr, operands_, |
242 | 23.8k | user_merge_operator_, stats_, env_, logger_, |
243 | 23.8k | &merge_result); |
244 | 23.8k | if (s.ok()) { |
245 | | // The original key encountered |
246 | | // We are certain that keys_ is not empty here (see assertions couple of |
247 | | // lines before). |
248 | 23.8k | original_key = std::move(keys_.back()); |
249 | 23.8k | orig_ikey.type = kTypeValue; |
250 | 23.8k | UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); |
251 | 23.8k | keys_.clear(); |
252 | 23.8k | operands_.clear(); |
253 | 23.8k | keys_.emplace_front(std::move(original_key)); |
254 | 23.8k | operands_.emplace_front(std::move(merge_result)); |
255 | 23.8k | } |
256 | 91.3k | } else { |
257 | | // We haven't seen the beginning of the key nor a Put/Delete. |
258 | | // Attempt to use the user's associative merge function to |
259 | | // merge the stacked merge operands into a single operand. |
260 | | // |
261 | | // TODO(noetzli) The docblock of MergeUntil suggests that a successful |
262 | | // partial merge returns Status::OK(). Should we change the status code |
263 | | // after a successful partial merge? |
264 | 91.3k | s = STATUS(MergeInProgress, ""); |
265 | 91.3k | if (operands_.size() >= 2 && |
266 | 2.97k | operands_.size() >= min_partial_merge_operands_) { |
267 | 2.97k | bool merge_success = false; |
268 | 2.97k | std::string merge_result; |
269 | 2.97k | { |
270 | 2.97k | StopWatchNano timer(env_, stats_ != nullptr); |
271 | 2.97k | PERF_TIMER_GUARD(merge_operator_time_nanos); |
272 | 2.97k | merge_success = user_merge_operator_->PartialMergeMulti( |
273 | 2.97k | orig_ikey.user_key, |
274 | 2.97k | std::deque<Slice>(operands_.begin(), operands_.end()), |
275 | 2.97k | &merge_result, logger_); |
276 | 2.97k | RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME, |
277 | 2.97k | timer.ElapsedNanosSafe()); |
278 | 2.97k | } |
279 | 2.97k | if (merge_success) { |
280 | | // Merging of operands (associative merge) was successful. |
281 | | // Replace operands with the merge result |
282 | 1.15k | operands_.clear(); |
283 | 1.15k | operands_.emplace_front(std::move(merge_result)); |
284 | 1.15k | keys_.erase(keys_.begin(), keys_.end() - 1); |
285 | 1.15k | } |
286 | 2.97k | } |
287 | 91.3k | } |
288 | | |
289 | 115k | return s; |
290 | 115k | } |
291 | | |
292 | | MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper) |
293 | 38.6k | : merge_helper_(merge_helper) { |
294 | 38.6k | it_keys_ = merge_helper_->keys().rend(); |
295 | 38.6k | it_values_ = merge_helper_->values().rend(); |
296 | 38.6k | } |
297 | | |
298 | 131k | void MergeOutputIterator::SeekToFirst() { |
299 | 131k | const auto& keys = merge_helper_->keys(); |
300 | 131k | const auto& values = merge_helper_->values(); |
301 | 131k | assert(keys.size() == values.size()); |
302 | 131k | it_keys_ = keys.rbegin(); |
303 | 131k | it_values_ = values.rbegin(); |
304 | 131k | } |
305 | | |
306 | 290k | void MergeOutputIterator::Next() { |
307 | 290k | ++it_keys_; |
308 | 290k | ++it_values_; |
309 | 290k | } |
310 | | |
311 | 292k | bool MergeHelper::FilterMerge(const Slice& user_key, const Slice& value_slice) { |
312 | 292k | if (compaction_filter_ == nullptr) { |
313 | 131k | return false; |
314 | 131k | } |
315 | 160k | if (stats_ != nullptr) { |
316 | 160k | filter_timer_.Start(); |
317 | 160k | } |
318 | 160k | bool to_delete = |
319 | 160k | compaction_filter_->FilterMergeOperand(level_, user_key, value_slice); |
320 | 160k | return to_delete; |
321 | 160k | } |
322 | | |
323 | | } // namespace rocksdb |