/Users/deen/code/yugabyte-db/src/yb/rocksdb/table/get_context.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #include "yb/rocksdb/table/get_context.h" |
22 | | |
23 | | #include "yb/rocksdb/db/merge_context.h" |
24 | | #include "yb/rocksdb/env.h" |
25 | | #include "yb/rocksdb/merge_operator.h" |
26 | | #include "yb/rocksdb/statistics.h" |
27 | | #include "yb/rocksdb/util/perf_context_imp.h" |
28 | | #include "yb/rocksdb/util/statistics.h" |
29 | | #include "yb/rocksdb/util/stop_watch.h" |
30 | | |
31 | | #include "yb/util/stats/perf_step_timer.h" |
32 | | |
33 | | namespace rocksdb { |
34 | | |
35 | | namespace { |
36 | | |
37 | 6.51M | void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) { |
38 | 6.51M | #ifndef ROCKSDB_LITE |
39 | 6.51M | if (replay_log) { |
40 | 42.7k | if (replay_log->empty()) { |
41 | | // Optimization: in the common case of only one operation in the |
42 | | // log, we allocate the exact amount of space needed. |
43 | 42.7k | replay_log->reserve(1 + VarintLength(value.size()) + value.size()); |
44 | 42.7k | } |
45 | 42.7k | replay_log->push_back(type); |
46 | 42.7k | PutLengthPrefixedSlice(replay_log, value); |
47 | 42.7k | } |
48 | 6.51M | #endif // ROCKSDB_LITE |
49 | 6.51M | } |
50 | | |
51 | | } // namespace |
52 | | |
53 | | GetContext::GetContext(const Comparator* ucmp, |
54 | | const MergeOperator* merge_operator, Logger* logger, |
55 | | Statistics* statistics, GetState init_state, |
56 | | const Slice& user_key, std::string* ret_value, |
57 | | bool* value_found, MergeContext* merge_context, Env* env, |
58 | | SequenceNumber* seq) |
59 | | : ucmp_(ucmp), |
60 | | merge_operator_(merge_operator), |
61 | | logger_(logger), |
62 | | statistics_(statistics), |
63 | | state_(init_state), |
64 | | user_key_(user_key), |
65 | | value_(ret_value), |
66 | | value_found_(value_found), |
67 | | merge_context_(merge_context), |
68 | | env_(env), |
69 | | seq_(seq), |
70 | 7.34M | replay_log_(nullptr) { |
71 | 7.34M | if (seq_) { |
72 | 21 | *seq_ = kMaxSequenceNumber; |
73 | 21 | } |
74 | 7.34M | } |
75 | | |
76 | | // Called from TableCache::Get and Table::Get when file/block in which |
77 | | // key may exist are not there in TableCache/BlockCache respectively. In this |
78 | | // case we can't guarantee that key does not exist and are not permitted to do |
79 | | // IO to be certain.Set the status=kFound and value_found=false to let the |
80 | | // caller know that key may exist but is not there in memory |
81 | 140 | void GetContext::MarkKeyMayExist() { |
82 | 140 | state_ = kFound; |
83 | 140 | if (value_found_ != nullptr) { |
84 | 23 | *value_found_ = false; |
85 | 23 | } |
86 | 140 | } |
87 | | |
88 | 0 | void GetContext::SaveValue(const Slice& value, SequenceNumber seq) { |
89 | 0 | assert(state_ == kNotFound); |
90 | 0 | appendToReplayLog(replay_log_, kTypeValue, value); |
91 | |
|
92 | 0 | state_ = kFound; |
93 | 0 | if (value_ != nullptr) { |
94 | 0 | value_->assign(value.cdata(), value.size()); |
95 | 0 | } |
96 | 0 | } |
97 | | |
98 | | bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, |
99 | 7.35M | const Slice& value) { |
100 | 7.35M | assert((state_ != kMerge && parsed_key.type != kTypeMerge) || |
101 | 7.35M | merge_context_ != nullptr); |
102 | 7.35M | if (ucmp_->Equal(parsed_key.user_key, user_key_)) { |
103 | 6.51M | appendToReplayLog(replay_log_, parsed_key.type, value); |
104 | | |
105 | 6.51M | if (seq_ != nullptr) { |
106 | | // Set the sequence number if it is uninitialized |
107 | 18 | if (*seq_ == kMaxSequenceNumber) { |
108 | 18 | *seq_ = parsed_key.sequence; |
109 | 18 | } |
110 | 18 | } |
111 | | |
112 | | // Key matches. Process it |
113 | 6.51M | switch (parsed_key.type) { |
114 | 6.51M | case kTypeValue: |
115 | 6.51M | assert(state_ == kNotFound || state_ == kMerge); |
116 | 6.51M | if (kNotFound == state_) { |
117 | 6.51M | state_ = kFound; |
118 | 6.51M | if (value_ != nullptr) { |
119 | 6.51M | value_->assign(value.cdata(), value.size()); |
120 | 6.51M | } |
121 | 280 | } else if (kMerge == state_) { |
122 | 280 | assert(merge_operator_ != nullptr); |
123 | 280 | state_ = kFound; |
124 | 280 | if (value_ != nullptr) { |
125 | 280 | bool merge_success = false; |
126 | 280 | { |
127 | 280 | StopWatchNano timer(env_, statistics_ != nullptr); |
128 | 280 | PERF_TIMER_GUARD(merge_operator_time_nanos); |
129 | 280 | merge_success = merge_operator_->FullMerge( |
130 | 280 | user_key_, &value, merge_context_->GetOperands(), value_, |
131 | 280 | logger_); |
132 | 280 | RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, |
133 | 280 | timer.ElapsedNanosSafe()); |
134 | 280 | } |
135 | 280 | if (!merge_success) { |
136 | 0 | RecordTick(statistics_, NUMBER_MERGE_FAILURES); |
137 | 0 | state_ = kCorrupt; |
138 | 0 | } |
139 | 280 | } |
140 | 280 | } |
141 | 6.51M | return false; |
142 | | |
143 | 4.89k | case kTypeDeletion: |
144 | 4.92k | case kTypeSingleDeletion: |
145 | | // TODO(noetzli): Verify correctness once merge of single-deletes |
146 | | // is supported |
147 | 4.92k | assert(state_ == kNotFound || state_ == kMerge); |
148 | 4.92k | if (kNotFound == state_) { |
149 | 4.92k | state_ = kDeleted; |
150 | 0 | } else if (kMerge == state_) { |
151 | 0 | state_ = kFound; |
152 | 0 | if (value_ != nullptr) { |
153 | 0 | bool merge_success = false; |
154 | 0 | { |
155 | 0 | StopWatchNano timer(env_, statistics_ != nullptr); |
156 | 0 | PERF_TIMER_GUARD(merge_operator_time_nanos); |
157 | 0 | merge_success = merge_operator_->FullMerge( |
158 | 0 | user_key_, nullptr, merge_context_->GetOperands(), value_, |
159 | 0 | logger_); |
160 | 0 | RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, |
161 | 0 | timer.ElapsedNanosSafe()); |
162 | 0 | } |
163 | 0 | if (!merge_success) { |
164 | 0 | RecordTick(statistics_, NUMBER_MERGE_FAILURES); |
165 | 0 | state_ = kCorrupt; |
166 | 0 | } |
167 | 0 | } |
168 | 0 | } |
169 | 4.92k | return false; |
170 | | |
171 | 288 | case kTypeMerge: |
172 | 288 | assert(state_ == kNotFound || state_ == kMerge); |
173 | 288 | state_ = kMerge; |
174 | 288 | merge_context_->PushOperand(value); |
175 | 288 | return true; |
176 | | |
177 | 0 | default: |
178 | 0 | assert(false); |
179 | 0 | break; |
180 | 834k | } |
181 | 834k | } |
182 | | |
183 | | // state_ could be Corrupt, merge or notfound |
184 | 834k | return false; |
185 | 834k | } |
186 | | |
187 | | void replayGetContextLog(const Slice& replay_log, const Slice& user_key, |
188 | 6.61k | GetContext* get_context) { |
189 | 6.61k | #ifndef ROCKSDB_LITE |
190 | 6.61k | Slice s = replay_log; |
191 | 13.2k | while (s.size()) { |
192 | 6.61k | auto type = static_cast<ValueType>(*s.data()); |
193 | 6.61k | s.remove_prefix(1); |
194 | 6.61k | Slice value; |
195 | 6.61k | bool ret = GetLengthPrefixedSlice(&s, &value); |
196 | 6.61k | assert(ret); |
197 | 6.61k | (void)ret; |
198 | | |
199 | | // Since SequenceNumber is not stored and unknown, we will use |
200 | | // kMaxSequenceNumber. |
201 | 6.61k | get_context->SaveValue( |
202 | 6.61k | ParsedInternalKey(user_key, kMaxSequenceNumber, type), value); |
203 | 6.61k | } |
204 | | #else // ROCKSDB_LITE |
205 | | assert(false); |
206 | | #endif // ROCKSDB_LITE |
207 | 6.61k | } |
208 | | |
209 | | } // namespace rocksdb |