/Users/deen/code/yugabyte-db/src/yb/rocksdb/table/block_hash_index.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #include "yb/rocksdb/table/block_hash_index.h" |
22 | | |
23 | | #include <algorithm> |
24 | | |
25 | | #include "yb/rocksdb/comparator.h" |
26 | | #include "yb/rocksdb/iterator.h" |
27 | | #include "yb/rocksdb/slice_transform.h" |
28 | | #include "yb/rocksdb/table/internal_iterator.h" |
29 | | #include "yb/rocksdb/util/coding.h" |
30 | | |
31 | | namespace rocksdb { |
32 | | |
33 | | Status CreateBlockHashIndex(const SliceTransform* hash_key_extractor, |
34 | | const Slice& prefixes, const Slice& prefix_meta, |
35 | 0 | BlockHashIndex** hash_index) { |
36 | 0 | uint64_t pos = 0; |
37 | 0 | auto meta_pos = prefix_meta; |
38 | 0 | Status s; |
39 | 0 | *hash_index = new BlockHashIndex( |
40 | 0 | hash_key_extractor, |
41 | 0 | false /* external module manages memory space for prefixes */); |
42 | |
|
43 | 0 | while (!meta_pos.empty()) { |
44 | 0 | uint32_t prefix_size = 0; |
45 | 0 | uint32_t entry_index = 0; |
46 | 0 | uint32_t num_blocks = 0; |
47 | 0 | if (!GetVarint32(&meta_pos, &prefix_size) || |
48 | 0 | !GetVarint32(&meta_pos, &entry_index) || |
49 | 0 | !GetVarint32(&meta_pos, &num_blocks)) { |
50 | 0 | s = STATUS(Corruption, |
51 | 0 | "Corrupted prefix meta block: unable to read from it."); |
52 | 0 | break; |
53 | 0 | } |
54 | 0 | Slice prefix(prefixes.data() + pos, prefix_size); |
55 | 0 | (*hash_index)->Add(prefix, entry_index, num_blocks); |
56 | |
|
57 | 0 | pos += prefix_size; |
58 | 0 | } |
59 | |
|
60 | 0 | if (s.ok() && pos != prefixes.size()) { |
61 | 0 | s = STATUS(Corruption, "Corrupted prefix meta block"); |
62 | 0 | } |
63 | |
|
64 | 0 | if (!s.ok()) { |
65 | 0 | delete *hash_index; |
66 | 0 | } |
67 | |
|
68 | 0 | return s; |
69 | 0 | } |
70 | | |
71 | | BlockHashIndex* CreateBlockHashIndexOnTheFly( |
72 | | InternalIterator* index_iter, InternalIterator* data_iter, |
73 | | const uint32_t num_restarts, const Comparator* comparator, |
74 | 5 | const SliceTransform* hash_key_extractor) { |
75 | 5 | assert(hash_key_extractor); |
76 | 5 | auto hash_index = new BlockHashIndex( |
77 | 5 | hash_key_extractor, |
78 | 5 | true /* hash_index will copy prefix when Add() is called */); |
79 | 5 | uint32_t current_restart_index = 0; |
80 | | |
81 | 5 | std::string pending_entry_prefix; |
82 | | // pending_block_num == 0 also implies there is no entry inserted at all. |
83 | 5 | uint32_t pending_block_num = 0; |
84 | 5 | uint32_t pending_entry_index = 0; |
85 | | |
86 | | // scan all the entries and create a hash index based on their prefixes. |
87 | 5 | data_iter->SeekToFirst(); |
88 | 5 | for (index_iter->SeekToFirst(); |
89 | 600k | index_iter->Valid() && current_restart_index < num_restarts; |
90 | 600k | index_iter->Next()) { |
91 | 600k | Slice last_key_in_block = index_iter->key(); |
92 | 600k | assert(data_iter->Valid() && data_iter->status().ok()); |
93 | | |
94 | | // scan through all entries within a data block. |
95 | 1.20M | while (data_iter->Valid() && |
96 | 1.20M | comparator->Compare(data_iter->key(), last_key_in_block) <= 0) { |
97 | 600k | auto key_prefix = hash_key_extractor->Transform(data_iter->key()); |
98 | 600k | bool is_first_entry = pending_block_num == 0; |
99 | | |
100 | | // Keys may share the prefix |
101 | 600k | if (is_first_entry || pending_entry_prefix != key_prefix) { |
102 | 200k | if (!is_first_entry) { |
103 | 200k | bool succeeded = hash_index->Add( |
104 | 200k | pending_entry_prefix, pending_entry_index, pending_block_num); |
105 | 200k | if (!succeeded) { |
106 | 0 | delete hash_index; |
107 | 0 | return nullptr; |
108 | 0 | } |
109 | 200k | } |
110 | | |
111 | | // update the status. |
112 | | // needs a hard copy otherwise the underlying data changes all the time. |
113 | 200k | pending_entry_prefix = key_prefix.ToString(); |
114 | 200k | pending_block_num = 1; |
115 | 200k | pending_entry_index = current_restart_index; |
116 | 400k | } else { |
117 | | // entry number increments when keys share the prefix reside in |
118 | | // different data blocks. |
119 | 400k | auto last_restart_index = pending_entry_index + pending_block_num - 1; |
120 | 400k | assert(last_restart_index <= current_restart_index); |
121 | 400k | if (last_restart_index != current_restart_index) { |
122 | 400k | ++pending_block_num; |
123 | 400k | } |
124 | 400k | } |
125 | 600k | data_iter->Next(); |
126 | 600k | } |
127 | | |
128 | 600k | ++current_restart_index; |
129 | 600k | } |
130 | | |
131 | | // make sure all entries has been scaned. |
132 | 5 | assert(!index_iter->Valid()); |
133 | 5 | assert(!data_iter->Valid()); |
134 | | |
135 | 5 | if (pending_block_num > 0) { |
136 | 5 | auto succeeded = hash_index->Add(pending_entry_prefix, pending_entry_index, |
137 | 5 | pending_block_num); |
138 | 5 | if (!succeeded) { |
139 | 0 | delete hash_index; |
140 | 0 | return nullptr; |
141 | 0 | } |
142 | 5 | } |
143 | | |
144 | 5 | return hash_index; |
145 | 5 | } |
146 | | |
147 | | bool BlockHashIndex::Add(const Slice& prefix, uint32_t restart_index, |
148 | 200k | uint32_t num_blocks) { |
149 | 200k | auto prefix_to_insert = prefix; |
150 | 200k | if (kOwnPrefixes) { |
151 | 200k | auto prefix_ptr = arena_.Allocate(prefix.size()); |
152 | | // MSVC reports C4996 Function call with parameters that may be |
153 | | // unsafe when using std::copy with a output iterator - pointer |
154 | 200k | memcpy(prefix_ptr, prefix.data(), prefix.size()); |
155 | 200k | prefix_to_insert = Slice(prefix_ptr, prefix.size()); |
156 | 200k | } |
157 | 200k | auto result = restart_indices_.insert( |
158 | 200k | {prefix_to_insert, RestartIndex(restart_index, num_blocks)}); |
159 | 200k | return result.second; |
160 | 200k | } |
161 | | |
162 | | const BlockHashIndex::RestartIndex* BlockHashIndex::GetRestartIndex( |
163 | 800k | const Slice& key) { |
164 | 800k | auto key_prefix = hash_key_extractor_->Transform(key); |
165 | | |
166 | 800k | auto pos = restart_indices_.find(key_prefix); |
167 | 800k | if (pos == restart_indices_.end()) { |
168 | 199k | return nullptr; |
169 | 199k | } |
170 | | |
171 | 600k | return &pos->second; |
172 | 600k | } |
173 | | |
174 | | } // namespace rocksdb |