/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef ROCKSDB_LITE |
22 | | |
23 | | #include "yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h" |
24 | | |
25 | | #include "yb/rocksdb/db/column_family.h" |
26 | | #include "yb/rocksdb/db/merge_context.h" |
27 | | #include "yb/rocksdb/db/merge_helper.h" |
28 | | #include "yb/rocksdb/comparator.h" |
29 | | #include "yb/rocksdb/db.h" |
30 | | #include "yb/rocksdb/utilities/write_batch_with_index.h" |
31 | | #include "yb/rocksdb/util/coding.h" |
32 | | #include "yb/util/string_util.h" |
33 | | |
34 | | namespace rocksdb { |
35 | | |
36 | | class Env; |
37 | | class Logger; |
38 | | class Statistics; |
39 | | |
40 | | Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, |
41 | | WriteType* type, Slice* Key, |
42 | | Slice* value, |
43 | 37.8M | Slice* blob) const { |
44 | 37.8M | if (type == nullptr || Key == nullptr || value == nullptr || |
45 | 37.8M | blob == nullptr) { |
46 | 0 | return STATUS(InvalidArgument, "Output parameters cannot be null"); |
47 | 0 | } |
48 | | |
49 | 37.8M | if (data_offset == GetDataSize()) { |
50 | | // reached end of batch. |
51 | 0 | return STATUS(NotFound, ""); |
52 | 0 | } |
53 | | |
54 | 37.8M | if (data_offset > GetDataSize()) { |
55 | 0 | return STATUS(InvalidArgument, "data offset exceed write batch size"); |
56 | 0 | } |
57 | 37.8M | Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); |
58 | 37.8M | char tag; |
59 | 37.8M | uint32_t column_family; |
60 | 37.8M | Status s = |
61 | 37.8M | ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob); |
62 | | |
63 | 37.8M | switch (tag) { |
64 | 59.5k | case kTypeColumnFamilyValue: |
65 | 18.0M | case kTypeValue: |
66 | 18.0M | *type = kPutRecord; |
67 | 18.0M | break; |
68 | 79.1k | case kTypeColumnFamilyDeletion: |
69 | 19.8M | case kTypeDeletion: |
70 | 19.8M | *type = kDeleteRecord; |
71 | 19.8M | break; |
72 | 66 | case kTypeColumnFamilySingleDeletion: |
73 | 405 | case kTypeSingleDeletion: |
74 | 405 | *type = kSingleDeleteRecord; |
75 | 405 | break; |
76 | 91 | case kTypeColumnFamilyMerge: |
77 | 2.23k | case kTypeMerge: |
78 | 2.23k | *type = kMergeRecord; |
79 | 2.23k | break; |
80 | 0 | case kTypeLogData: |
81 | 0 | *type = kLogDataRecord; |
82 | 0 | break; |
83 | 0 | default: |
84 | 0 | return STATUS(Corruption, "unknown WriteBatch tag"); |
85 | 37.8M | } |
86 | 37.8M | return Status::OK(); |
87 | 37.8M | } |
88 | | |
89 | | int WriteBatchEntryComparator::operator()( |
90 | | const WriteBatchIndexEntry* entry1, |
91 | 30.5M | const WriteBatchIndexEntry* entry2) const { |
92 | 30.5M | if (entry1->column_family > entry2->column_family) { |
93 | 1.14k | return 1; |
94 | 30.5M | } else if (entry1->column_family < entry2->column_family) { |
95 | 6.65M | return -1; |
96 | 6.65M | } |
97 | | |
98 | 23.9M | if (entry1->offset == WriteBatchIndexEntry::kFlagMin) { |
99 | 0 | return -1; |
100 | 23.9M | } else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) { |
101 | 3.10k | return 1; |
102 | 3.10k | } |
103 | | |
104 | 23.9M | Status s; |
105 | 23.9M | Slice key1, key2; |
106 | 23.9M | if (entry1->search_key == nullptr) { |
107 | 23.9M | Slice value, blob; |
108 | 23.9M | WriteType write_type; |
109 | 23.9M | s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1, |
110 | 23.9M | &value, &blob); |
111 | 23.9M | if (!s.ok()) { |
112 | 0 | return 1; |
113 | 0 | } |
114 | 18.4E | } else { |
115 | 18.4E | key1 = *(entry1->search_key); |
116 | 18.4E | } |
117 | 23.9M | if (entry2->search_key == nullptr) { |
118 | 12.2M | Slice value, blob; |
119 | 12.2M | WriteType write_type; |
120 | 12.2M | s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2, |
121 | 12.2M | &value, &blob); |
122 | 12.2M | if (!s.ok()) { |
123 | 0 | return -1; |
124 | 0 | } |
125 | 11.6M | } else { |
126 | 11.6M | key2 = *(entry2->search_key); |
127 | 11.6M | } |
128 | | |
129 | 23.9M | int cmp = CompareKey(entry1->column_family, key1, key2); |
130 | 23.9M | if (cmp != 0) { |
131 | 22.8M | return cmp; |
132 | 1.05M | } else if (entry1->offset > entry2->offset) { |
133 | 751k | return 1; |
134 | 304k | } else if (entry1->offset < entry2->offset) { |
135 | 797 | return -1; |
136 | 797 | } |
137 | 304k | return 0; |
138 | 304k | } |
139 | | |
140 | | int WriteBatchEntryComparator::CompareKey(uint32_t column_family, |
141 | | const Slice& key1, |
142 | 24.4M | const Slice& key2) const { |
143 | 24.4M | auto comparator_for_cf = cf_comparator_map_.find(column_family); |
144 | 24.4M | if (comparator_for_cf != cf_comparator_map_.end()) { |
145 | 65.3k | return comparator_for_cf->second->Compare(key1, key2); |
146 | 24.3M | } else { |
147 | 24.3M | return default_comparator_->Compare(key1, key2); |
148 | 24.3M | } |
149 | 24.4M | } |
150 | | |
151 | | WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( |
152 | | const DBOptions& options, WriteBatchWithIndex* batch, |
153 | | ColumnFamilyHandle* column_family, const Slice& key, |
154 | | MergeContext* merge_context, WriteBatchEntryComparator* cmp, |
155 | 236 | std::string* value, bool overwrite_key, Status* s) { |
156 | 236 | uint32_t cf_id = GetColumnFamilyID(column_family); |
157 | 236 | *s = Status::OK(); |
158 | 236 | WriteBatchWithIndexInternal::Result result = |
159 | 236 | WriteBatchWithIndexInternal::Result::kNotFound; |
160 | | |
161 | 236 | std::unique_ptr<WBWIIterator> iter = |
162 | 236 | std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family)); |
163 | | |
164 | | // We want to iterate in the reverse order that the writes were added to the |
165 | | // batch. Since we don't have a reverse iterator, we must seek past the end. |
166 | | // TODO(agiardullo): consider adding support for reverse iteration |
167 | 236 | iter->Seek(key); |
168 | 399 | while (iter->Valid()) { |
169 | 217 | const WriteEntry& entry = iter->Entry(); |
170 | 217 | if (cmp->CompareKey(cf_id, entry.key, key) != 0) { |
171 | 54 | break; |
172 | 54 | } |
173 | | |
174 | 163 | iter->Next(); |
175 | 163 | } |
176 | | |
177 | 236 | if (!(*s).ok()) { |
178 | 0 | return WriteBatchWithIndexInternal::Result::kError; |
179 | 0 | } |
180 | | |
181 | 236 | if (!iter->Valid()) { |
182 | | // Read past end of results. Reposition on last result. |
183 | 182 | iter->SeekToLast(); |
184 | 54 | } else { |
185 | 54 | iter->Prev(); |
186 | 54 | } |
187 | | |
188 | 236 | Slice entry_value; |
189 | 280 | while (iter->Valid()) { |
190 | 185 | WriteEntry entry = iter->Entry(); |
191 | 185 | if (cmp->CompareKey(cf_id, entry.key, key) != 0) { |
192 | | // Unexpected error or we've reached a different next key |
193 | 52 | break; |
194 | 52 | } |
195 | | |
196 | 133 | switch (entry.type) { |
197 | 58 | case kPutRecord: { |
198 | 58 | result = WriteBatchWithIndexInternal::Result::kFound; |
199 | 58 | entry_value = entry.value; |
200 | 58 | break; |
201 | 0 | } |
202 | 53 | case kMergeRecord: { |
203 | 53 | result = WriteBatchWithIndexInternal::Result::kMergeInProgress; |
204 | 53 | merge_context->PushOperand(entry.value); |
205 | 53 | break; |
206 | 0 | } |
207 | 13 | case kDeleteRecord: |
208 | 22 | case kSingleDeleteRecord: { |
209 | 22 | result = WriteBatchWithIndexInternal::Result::kDeleted; |
210 | 22 | break; |
211 | 13 | } |
212 | 0 | case kLogDataRecord: { |
213 | | // ignore |
214 | 0 | break; |
215 | 13 | } |
216 | 0 | default: { |
217 | 0 | result = WriteBatchWithIndexInternal::Result::kError; |
218 | 0 | (*s) = STATUS(Corruption, "Unexpected entry in WriteBatchWithIndex:", |
219 | 0 | ToString(entry.type)); |
220 | 0 | break; |
221 | 133 | } |
222 | 133 | } |
223 | 133 | if (result == WriteBatchWithIndexInternal::Result::kFound || |
224 | 75 | result == WriteBatchWithIndexInternal::Result::kDeleted || |
225 | 80 | result == WriteBatchWithIndexInternal::Result::kError) { |
226 | | // We can stop iterating once we find a PUT or DELETE |
227 | 80 | break; |
228 | 80 | } |
229 | 53 | if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && |
230 | 53 | overwrite_key == true) { |
231 | | // Since we've overwritten keys, we do not know what other operations are |
232 | | // in this batch for this key, so we cannot do a Merge to compute the |
233 | | // result. Instead, we will simply return MergeInProgress. |
234 | 9 | break; |
235 | 9 | } |
236 | | |
237 | 44 | iter->Prev(); |
238 | 44 | } |
239 | | |
240 | 236 | if ((*s).ok()) { |
241 | 236 | if (result == WriteBatchWithIndexInternal::Result::kFound || |
242 | 178 | result == WriteBatchWithIndexInternal::Result::kDeleted) { |
243 | | // Found a Put or Delete. Merge if necessary. |
244 | 80 | if (merge_context->GetNumOperands() > 0) { |
245 | 5 | const MergeOperator* merge_operator; |
246 | | |
247 | 5 | if (column_family != nullptr) { |
248 | 5 | auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
249 | 5 | merge_operator = cfh->cfd()->ioptions()->merge_operator; |
250 | 0 | } else { |
251 | 0 | *s = STATUS(InvalidArgument, "Must provide a column_family"); |
252 | 0 | result = WriteBatchWithIndexInternal::Result::kError; |
253 | 0 | return result; |
254 | 0 | } |
255 | 5 | Statistics* statistics = options.statistics.get(); |
256 | 5 | Env* env = options.env; |
257 | 5 | Logger* logger = options.info_log.get(); |
258 | | |
259 | 5 | *s = MergeHelper::TimedFullMerge( |
260 | 5 | key, &entry_value, merge_context->GetOperands(), merge_operator, |
261 | 5 | statistics, env, logger, value); |
262 | 5 | if ((*s).ok()) { |
263 | 5 | result = WriteBatchWithIndexInternal::Result::kFound; |
264 | 0 | } else { |
265 | 0 | result = WriteBatchWithIndexInternal::Result::kError; |
266 | 0 | } |
267 | 75 | } else { // nothing to merge |
268 | 75 | if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT |
269 | 53 | value->assign(entry_value.cdata(), entry_value.size()); |
270 | 53 | } |
271 | 75 | } |
272 | 80 | } |
273 | 236 | } |
274 | | |
275 | 236 | return result; |
276 | 236 | } |
277 | | |
278 | | } // namespace rocksdb |
279 | | |
280 | | #endif // !ROCKSDB_LITE |