YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/table/get_context.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include "yb/rocksdb/table/get_context.h"
22
23
#include "yb/rocksdb/db/merge_context.h"
24
#include "yb/rocksdb/env.h"
25
#include "yb/rocksdb/merge_operator.h"
26
#include "yb/rocksdb/statistics.h"
27
#include "yb/rocksdb/util/perf_context_imp.h"
28
#include "yb/rocksdb/util/statistics.h"
29
#include "yb/rocksdb/util/stop_watch.h"
30
31
#include "yb/util/stats/perf_step_timer.h"
32
33
namespace rocksdb {
34
35
namespace {
36
37
6.51M
void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
38
6.51M
#ifndef ROCKSDB_LITE
39
6.51M
  if (replay_log) {
40
42.7k
    if (replay_log->empty()) {
41
      // Optimization: in the common case of only one operation in the
42
      // log, we allocate the exact amount of space needed.
43
42.7k
      replay_log->reserve(1 + VarintLength(value.size()) + value.size());
44
42.7k
    }
45
42.7k
    replay_log->push_back(type);
46
42.7k
    PutLengthPrefixedSlice(replay_log, value);
47
42.7k
  }
48
6.51M
#endif  // ROCKSDB_LITE
49
6.51M
}
50
51
}  // namespace
52
53
GetContext::GetContext(const Comparator* ucmp,
54
                       const MergeOperator* merge_operator, Logger* logger,
55
                       Statistics* statistics, GetState init_state,
56
                       const Slice& user_key, std::string* ret_value,
57
                       bool* value_found, MergeContext* merge_context, Env* env,
58
                       SequenceNumber* seq)
59
    : ucmp_(ucmp),
60
      merge_operator_(merge_operator),
61
      logger_(logger),
62
      statistics_(statistics),
63
      state_(init_state),
64
      user_key_(user_key),
65
      value_(ret_value),
66
      value_found_(value_found),
67
      merge_context_(merge_context),
68
      env_(env),
69
      seq_(seq),
70
7.34M
      replay_log_(nullptr) {
71
7.34M
  if (seq_) {
72
21
    *seq_ = kMaxSequenceNumber;
73
21
  }
74
7.34M
}
75
76
// Called from TableCache::Get and Table::Get when file/block in which
77
// key may exist are not there in TableCache/BlockCache respectively. In this
78
// case we can't guarantee that key does not exist and are not permitted to do
79
// IO to be certain.Set the status=kFound and value_found=false to let the
80
// caller know that key may exist but is not there in memory
81
140
void GetContext::MarkKeyMayExist() {
82
140
  state_ = kFound;
83
140
  if (value_found_ != nullptr) {
84
23
    *value_found_ = false;
85
23
  }
86
140
}
87
88
0
void GetContext::SaveValue(const Slice& value, SequenceNumber seq) {
89
0
  assert(state_ == kNotFound);
90
0
  appendToReplayLog(replay_log_, kTypeValue, value);
91
92
0
  state_ = kFound;
93
0
  if (value_ != nullptr) {
94
0
    value_->assign(value.cdata(), value.size());
95
0
  }
96
0
}
97
98
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
99
7.35M
                           const Slice& value) {
100
7.35M
  assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
101
7.35M
         merge_context_ != nullptr);
102
7.35M
  if (ucmp_->Equal(parsed_key.user_key, user_key_)) {
103
6.51M
    appendToReplayLog(replay_log_, parsed_key.type, value);
104
105
6.51M
    if (seq_ != nullptr) {
106
      // Set the sequence number if it is uninitialized
107
18
      if (*seq_ == kMaxSequenceNumber) {
108
18
        *seq_ = parsed_key.sequence;
109
18
      }
110
18
    }
111
112
    // Key matches. Process it
113
6.51M
    switch (parsed_key.type) {
114
6.51M
      case kTypeValue:
115
6.51M
        assert(state_ == kNotFound || state_ == kMerge);
116
6.51M
        if (kNotFound == state_) {
117
6.51M
          state_ = kFound;
118
6.51M
          if (value_ != nullptr) {
119
6.51M
            value_->assign(value.cdata(), value.size());
120
6.51M
          }
121
280
        } else if (kMerge == state_) {
122
280
          assert(merge_operator_ != nullptr);
123
280
          state_ = kFound;
124
280
          if (value_ != nullptr) {
125
280
            bool merge_success = false;
126
280
            {
127
280
              StopWatchNano timer(env_, statistics_ != nullptr);
128
280
              PERF_TIMER_GUARD(merge_operator_time_nanos);
129
280
              merge_success = merge_operator_->FullMerge(
130
280
                  user_key_, &value, merge_context_->GetOperands(), value_,
131
280
                  logger_);
132
280
              RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
133
280
                         timer.ElapsedNanosSafe());
134
280
            }
135
280
            if (!merge_success) {
136
0
              RecordTick(statistics_, NUMBER_MERGE_FAILURES);
137
0
              state_ = kCorrupt;
138
0
            }
139
280
          }
140
280
        }
141
6.51M
        return false;
142
143
4.89k
      case kTypeDeletion:
144
4.92k
      case kTypeSingleDeletion:
145
        // TODO(noetzli): Verify correctness once merge of single-deletes
146
        // is supported
147
4.92k
        assert(state_ == kNotFound || state_ == kMerge);
148
4.92k
        if (kNotFound == state_) {
149
4.92k
          state_ = kDeleted;
150
0
        } else if (kMerge == state_) {
151
0
          state_ = kFound;
152
0
          if (value_ != nullptr) {
153
0
            bool merge_success = false;
154
0
            {
155
0
              StopWatchNano timer(env_, statistics_ != nullptr);
156
0
              PERF_TIMER_GUARD(merge_operator_time_nanos);
157
0
              merge_success = merge_operator_->FullMerge(
158
0
                  user_key_, nullptr, merge_context_->GetOperands(), value_,
159
0
                  logger_);
160
0
              RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME,
161
0
                         timer.ElapsedNanosSafe());
162
0
            }
163
0
            if (!merge_success) {
164
0
              RecordTick(statistics_, NUMBER_MERGE_FAILURES);
165
0
              state_ = kCorrupt;
166
0
            }
167
0
          }
168
0
        }
169
4.92k
        return false;
170
171
288
      case kTypeMerge:
172
288
        assert(state_ == kNotFound || state_ == kMerge);
173
288
        state_ = kMerge;
174
288
        merge_context_->PushOperand(value);
175
288
        return true;
176
177
0
      default:
178
0
        assert(false);
179
0
        break;
180
834k
    }
181
834k
  }
182
183
  // state_ could be Corrupt, merge or notfound
184
834k
  return false;
185
834k
}
186
187
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
188
6.61k
                         GetContext* get_context) {
189
6.61k
#ifndef ROCKSDB_LITE
190
6.61k
  Slice s = replay_log;
191
13.2k
  while (s.size()) {
192
6.61k
    auto type = static_cast<ValueType>(*s.data());
193
6.61k
    s.remove_prefix(1);
194
6.61k
    Slice value;
195
6.61k
    bool ret = GetLengthPrefixedSlice(&s, &value);
196
6.61k
    assert(ret);
197
6.61k
    (void)ret;
198
199
    // Since SequenceNumber is not stored and unknown, we will use
200
    // kMaxSequenceNumber.
201
6.61k
    get_context->SaveValue(
202
6.61k
        ParsedInternalKey(user_key, kMaxSequenceNumber, type), value);
203
6.61k
  }
204
#else   // ROCKSDB_LITE
205
  assert(false);
206
#endif  // ROCKSDB_LITE
207
6.61k
}
208
209
}  // namespace rocksdb