YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
23
#include "yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h"
24
25
#include "yb/rocksdb/db/column_family.h"
26
#include "yb/rocksdb/db/merge_context.h"
27
#include "yb/rocksdb/db/merge_helper.h"
28
#include "yb/rocksdb/comparator.h"
29
#include "yb/rocksdb/db.h"
30
#include "yb/rocksdb/utilities/write_batch_with_index.h"
31
#include "yb/rocksdb/util/coding.h"
32
#include "yb/util/string_util.h"
33
34
namespace rocksdb {
35
36
class Env;
37
class Logger;
38
class Statistics;
39
40
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
41
                                                  WriteType* type, Slice* Key,
42
                                                  Slice* value,
43
37.8M
                                                  Slice* blob) const {
44
37.8M
  if (type == nullptr || Key == nullptr || value == nullptr ||
45
37.8M
      blob == nullptr) {
46
0
    return STATUS(InvalidArgument, "Output parameters cannot be null");
47
0
  }
48
49
37.8M
  if (data_offset == GetDataSize()) {
50
    // reached end of batch.
51
0
    return STATUS(NotFound, "");
52
0
  }
53
54
37.8M
  if (data_offset > GetDataSize()) {
55
0
    return STATUS(InvalidArgument, "data offset exceed write batch size");
56
0
  }
57
37.8M
  Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
58
37.8M
  char tag;
59
37.8M
  uint32_t column_family;
60
37.8M
  Status s =
61
37.8M
      ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob);
62
63
37.8M
  switch (tag) {
64
59.5k
    case kTypeColumnFamilyValue:
65
18.0M
    case kTypeValue:
66
18.0M
      *type = kPutRecord;
67
18.0M
      break;
68
79.1k
    case kTypeColumnFamilyDeletion:
69
19.8M
    case kTypeDeletion:
70
19.8M
      *type = kDeleteRecord;
71
19.8M
      break;
72
66
    case kTypeColumnFamilySingleDeletion:
73
405
    case kTypeSingleDeletion:
74
405
      *type = kSingleDeleteRecord;
75
405
      break;
76
91
    case kTypeColumnFamilyMerge:
77
2.23k
    case kTypeMerge:
78
2.23k
      *type = kMergeRecord;
79
2.23k
      break;
80
0
    case kTypeLogData:
81
0
      *type = kLogDataRecord;
82
0
      break;
83
0
    default:
84
0
      return STATUS(Corruption, "unknown WriteBatch tag");
85
37.8M
  }
86
37.8M
  return Status::OK();
87
37.8M
}
88
89
int WriteBatchEntryComparator::operator()(
90
    const WriteBatchIndexEntry* entry1,
91
30.5M
    const WriteBatchIndexEntry* entry2) const {
92
30.5M
  if (entry1->column_family > entry2->column_family) {
93
1.14k
    return 1;
94
30.5M
  } else if (entry1->column_family < entry2->column_family) {
95
6.65M
    return -1;
96
6.65M
  }
97
98
23.9M
  if (entry1->offset == WriteBatchIndexEntry::kFlagMin) {
99
0
    return -1;
100
23.9M
  } else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
101
3.10k
    return 1;
102
3.10k
  }
103
104
23.9M
  Status s;
105
23.9M
  Slice key1, key2;
106
23.9M
  if (entry1->search_key == nullptr) {
107
23.9M
    Slice value, blob;
108
23.9M
    WriteType write_type;
109
23.9M
    s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1,
110
23.9M
                                             &value, &blob);
111
23.9M
    if (!s.ok()) {
112
0
      return 1;
113
0
    }
114
18.4E
  } else {
115
18.4E
    key1 = *(entry1->search_key);
116
18.4E
  }
117
23.9M
  if (entry2->search_key == nullptr) {
118
12.2M
    Slice value, blob;
119
12.2M
    WriteType write_type;
120
12.2M
    s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2,
121
12.2M
                                             &value, &blob);
122
12.2M
    if (!s.ok()) {
123
0
      return -1;
124
0
    }
125
11.6M
  } else {
126
11.6M
    key2 = *(entry2->search_key);
127
11.6M
  }
128
129
23.9M
  int cmp = CompareKey(entry1->column_family, key1, key2);
130
23.9M
  if (cmp != 0) {
131
22.8M
    return cmp;
132
1.05M
  } else if (entry1->offset > entry2->offset) {
133
751k
    return 1;
134
304k
  } else if (entry1->offset < entry2->offset) {
135
797
    return -1;
136
797
  }
137
304k
  return 0;
138
304k
}
139
140
int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
141
                                          const Slice& key1,
142
24.4M
                                          const Slice& key2) const {
143
24.4M
  auto comparator_for_cf = cf_comparator_map_.find(column_family);
144
24.4M
  if (comparator_for_cf != cf_comparator_map_.end()) {
145
65.3k
    return comparator_for_cf->second->Compare(key1, key2);
146
24.3M
  } else {
147
24.3M
    return default_comparator_->Compare(key1, key2);
148
24.3M
  }
149
24.4M
}
150
151
WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
152
    const DBOptions& options, WriteBatchWithIndex* batch,
153
    ColumnFamilyHandle* column_family, const Slice& key,
154
    MergeContext* merge_context, WriteBatchEntryComparator* cmp,
155
236
    std::string* value, bool overwrite_key, Status* s) {
156
236
  uint32_t cf_id = GetColumnFamilyID(column_family);
157
236
  *s = Status::OK();
158
236
  WriteBatchWithIndexInternal::Result result =
159
236
      WriteBatchWithIndexInternal::Result::kNotFound;
160
161
236
  std::unique_ptr<WBWIIterator> iter =
162
236
      std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family));
163
164
  // We want to iterate in the reverse order that the writes were added to the
165
  // batch.  Since we don't have a reverse iterator, we must seek past the end.
166
  // TODO(agiardullo): consider adding support for reverse iteration
167
236
  iter->Seek(key);
168
399
  while (iter->Valid()) {
169
217
    const WriteEntry& entry = iter->Entry();
170
217
    if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
171
54
      break;
172
54
    }
173
174
163
    iter->Next();
175
163
  }
176
177
236
  if (!(*s).ok()) {
178
0
    return WriteBatchWithIndexInternal::Result::kError;
179
0
  }
180
181
236
  if (!iter->Valid()) {
182
    // Read past end of results.  Reposition on last result.
183
182
    iter->SeekToLast();
184
54
  } else {
185
54
    iter->Prev();
186
54
  }
187
188
236
  Slice entry_value;
189
280
  while (iter->Valid()) {
190
185
    WriteEntry entry = iter->Entry();
191
185
    if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
192
      // Unexpected error or we've reached a different next key
193
52
      break;
194
52
    }
195
196
133
    switch (entry.type) {
197
58
      case kPutRecord: {
198
58
        result = WriteBatchWithIndexInternal::Result::kFound;
199
58
        entry_value = entry.value;
200
58
        break;
201
0
      }
202
53
      case kMergeRecord: {
203
53
        result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
204
53
        merge_context->PushOperand(entry.value);
205
53
        break;
206
0
      }
207
13
      case kDeleteRecord:
208
22
      case kSingleDeleteRecord: {
209
22
        result = WriteBatchWithIndexInternal::Result::kDeleted;
210
22
        break;
211
13
      }
212
0
      case kLogDataRecord: {
213
        // ignore
214
0
        break;
215
13
      }
216
0
      default: {
217
0
        result = WriteBatchWithIndexInternal::Result::kError;
218
0
        (*s) = STATUS(Corruption, "Unexpected entry in WriteBatchWithIndex:",
219
0
                                  ToString(entry.type));
220
0
        break;
221
133
      }
222
133
    }
223
133
    if (result == WriteBatchWithIndexInternal::Result::kFound ||
224
75
        result == WriteBatchWithIndexInternal::Result::kDeleted ||
225
80
        result == WriteBatchWithIndexInternal::Result::kError) {
226
      // We can stop iterating once we find a PUT or DELETE
227
80
      break;
228
80
    }
229
53
    if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
230
53
        overwrite_key == true) {
231
      // Since we've overwritten keys, we do not know what other operations are
232
      // in this batch for this key, so we cannot do a Merge to compute the
233
      // result.  Instead, we will simply return MergeInProgress.
234
9
      break;
235
9
    }
236
237
44
    iter->Prev();
238
44
  }
239
240
236
  if ((*s).ok()) {
241
236
    if (result == WriteBatchWithIndexInternal::Result::kFound ||
242
178
        result == WriteBatchWithIndexInternal::Result::kDeleted) {
243
      // Found a Put or Delete.  Merge if necessary.
244
80
      if (merge_context->GetNumOperands() > 0) {
245
5
        const MergeOperator* merge_operator;
246
247
5
        if (column_family != nullptr) {
248
5
          auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
249
5
          merge_operator = cfh->cfd()->ioptions()->merge_operator;
250
0
        } else {
251
0
          *s = STATUS(InvalidArgument, "Must provide a column_family");
252
0
          result = WriteBatchWithIndexInternal::Result::kError;
253
0
          return result;
254
0
        }
255
5
        Statistics* statistics = options.statistics.get();
256
5
        Env* env = options.env;
257
5
        Logger* logger = options.info_log.get();
258
259
5
        *s = MergeHelper::TimedFullMerge(
260
5
            key, &entry_value, merge_context->GetOperands(), merge_operator,
261
5
            statistics, env, logger, value);
262
5
        if ((*s).ok()) {
263
5
          result = WriteBatchWithIndexInternal::Result::kFound;
264
0
        } else {
265
0
          result = WriteBatchWithIndexInternal::Result::kError;
266
0
        }
267
75
      } else {  // nothing to merge
268
75
        if (result == WriteBatchWithIndexInternal::Result::kFound) {  // PUT
269
53
          value->assign(entry_value.cdata(), entry_value.size());
270
53
        }
271
75
      }
272
80
    }
273
236
  }
274
275
236
  return result;
276
236
}
277
278
}  // namespace rocksdb
279
280
#endif  // !ROCKSDB_LITE