YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/managed_iterator.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
23
#include "yb/rocksdb/db/managed_iterator.h"
24
25
#include <limits>
26
#include <string>
27
#include <utility>
28
29
#include "yb/rocksdb/db/column_family.h"
30
#include "yb/rocksdb/db/db_impl.h"
31
#include "yb/rocksdb/db/dbformat.h"
32
#include "yb/rocksdb/env.h"
33
#include "yb/rocksdb/slice_transform.h"
34
#include "yb/rocksdb/util/xfunc.h"
35
36
namespace rocksdb {
37
38
namespace {
39
// Helper class that locks a mutex on construction and unlocks the mutex when
40
// the destructor of the MutexLock object is invoked.
41
//
42
// Typical usage:
43
//
44
//   void MyClass::MyMethod() {
45
//     MILock l(&mu_);       // mu_ is an instance variable
46
//     ... some complex code, possibly with multiple return paths ...
47
//   }
48
49
class MILock {
50
 public:
51
23.1k
  explicit MILock(std::mutex* mu, ManagedIterator* mi) : mu_(mu), mi_(mi) {
52
23.1k
    this->mu_->lock();
53
23.1k
  }
54
23.1k
  ~MILock() {
55
23.1k
    this->mu_->unlock();
56
23.1k
    XFUNC_TEST("managed_xftest_release", "managed_unlock", managed_unlock1,
57
23.1k
               xf_manage_release, mi_);
58
23.1k
  }
59
0
  ManagedIterator* GetManagedIterator() { return mi_; }
60
61
 private:
62
  std::mutex* const mu_;
63
  ManagedIterator* mi_;
64
  // No copying allowed
65
  MILock(const MILock&) = delete;
66
  void operator=(const MILock&) = delete;
67
};
68
}  // anonymous namespace
69
70
//
71
// Synchronization between modifiers, releasers, creators
72
// If iterator operation, wait till (!in_use), set in_use, do op, reset in_use
73
//  if modifying mutable_iter, atomically exchange in_use:
74
//  return if in_use set / otherwise set in use,
75
//  atomically replace new iter with old , reset in use
76
//  The releaser is the new operation and it holds a lock for a very short time
77
//  The existing non-const iterator operations are supposed to be single
78
//  threaded and hold the lock for the duration of the operation
79
//  The existing const iterator operations use the cached key/values
80
//  and don't do any locking.
81
ManagedIterator::ManagedIterator(DBImpl* db, const ReadOptions& read_options,
82
                                 ColumnFamilyData* cfd)
83
    : db_(db),
84
      read_options_(read_options),
85
      cfd_(cfd),
86
      svnum_(cfd->GetSuperVersionNumber()),
87
      mutable_iter_(nullptr),
88
      valid_(false),
89
      snapshot_created_(false),
90
76
      release_supported_(true) {
91
76
  read_options_.managed = false;
92
76
  if ((!read_options_.tailing) && 
(read_options_.snapshot == nullptr)69
) {
93
69
    assert(read_options_.snapshot = db_->GetSnapshot());
94
0
    snapshot_created_ = true;
95
69
  }
96
0
  cfh_.SetCFD(cfd);
97
76
  mutable_iter_ = unique_ptr<Iterator>(db->NewIterator(read_options_, &cfh_));
98
76
  XFUNC_TEST("managed_xftest_dropold", "managed_create", xf_managed_create1,
99
76
             xf_manage_create, this);
100
76
}
101
102
76
ManagedIterator::~ManagedIterator() {
103
76
  Lock();
104
76
  if (snapshot_created_) {
105
69
    db_->ReleaseSnapshot(read_options_.snapshot);
106
69
    snapshot_created_ = false;
107
69
    read_options_.snapshot = nullptr;
108
69
  }
109
76
  UnLock();
110
76
}
111
112
23.1k
bool ManagedIterator::Valid() const { return valid_; }
113
114
0
void ManagedIterator::SeekToLast() {
115
0
  MILock l(&in_use_, this);
116
0
  if (NeedToRebuild()) {
117
0
    RebuildIterator();
118
0
  }
119
0
  assert(mutable_iter_ != nullptr);
120
0
  mutable_iter_->SeekToLast();
121
0
  if (mutable_iter_->status().ok()) {
122
0
    UpdateCurrent();
123
0
  }
124
0
}
125
126
74
void ManagedIterator::SeekToFirst() {
127
74
  MILock l(&in_use_, this);
128
74
  SeekInternal(Slice(), true);
129
74
}
130
131
13.0k
void ManagedIterator::Seek(const Slice& user_key) {
132
13.0k
  MILock l(&in_use_, this);
133
13.0k
  SeekInternal(user_key, false);
134
13.0k
}
135
136
13.0k
void ManagedIterator::SeekInternal(const Slice& user_key, bool seek_to_first) {
137
13.0k
  if (NeedToRebuild()) {
138
0
    RebuildIterator();
139
0
  }
140
13.0k
  assert(mutable_iter_ != nullptr);
141
13.0k
  if (seek_to_first) {
142
74
    mutable_iter_->SeekToFirst();
143
13.0k
  } else {
144
13.0k
    mutable_iter_->Seek(user_key);
145
13.0k
  }
146
13.0k
  UpdateCurrent();
147
13.0k
}
148
149
0
void ManagedIterator::Prev() {
150
0
  if (!valid_) {
151
0
    status_ = STATUS(InvalidArgument, "Iterator value invalid");
152
0
    return;
153
0
  }
154
0
  MILock l(&in_use_, this);
155
0
  if (NeedToRebuild()) {
156
0
    std::string current_key = key().ToString();
157
0
    Slice old_key(current_key);
158
0
    RebuildIterator();
159
0
    SeekInternal(old_key, false);
160
0
    UpdateCurrent();
161
0
    if (!valid_) {
162
0
      return;
163
0
    }
164
0
    if (key().compare(old_key) != 0) {
165
0
      valid_ = false;
166
0
      status_ = STATUS(Incomplete, "Cannot do Prev now");
167
0
      return;
168
0
    }
169
0
  }
170
0
  mutable_iter_->Prev();
171
0
  if (mutable_iter_->status().ok()) {
172
0
    UpdateCurrent();
173
0
    status_ = Status::OK();
174
0
  } else {
175
0
    status_ = mutable_iter_->status();
176
0
  }
177
0
}
178
179
10.0k
void ManagedIterator::Next() {
180
10.0k
  if (!valid_) {
181
0
    status_ = STATUS(InvalidArgument, "Iterator value invalid");
182
0
    return;
183
0
  }
184
10.0k
  MILock l(&in_use_, this);
185
10.0k
  if (NeedToRebuild()) {
186
0
    std::string current_key = key().ToString();
187
0
    Slice old_key(current_key.data(), cached_key_.Size());
188
0
    RebuildIterator();
189
0
    SeekInternal(old_key, false);
190
0
    UpdateCurrent();
191
0
    if (!valid_) {
192
0
      return;
193
0
    }
194
0
    if (key().compare(old_key) != 0) {
195
0
      valid_ = false;
196
0
      status_ = STATUS(Incomplete, "Cannot do Next now");
197
0
      return;
198
0
    }
199
0
  }
200
10.0k
  mutable_iter_->Next();
201
10.0k
  UpdateCurrent();
202
10.0k
}
203
204
13.0k
Slice ManagedIterator::key() const {
205
13.0k
  assert(valid_);
206
0
  return cached_key_.GetKey();
207
13.0k
}
208
209
0
Slice ManagedIterator::value() const {
210
0
  assert(valid_);
211
0
  return cached_value_.GetKey();
212
0
}
213
214
70
Status ManagedIterator::status() const { return status_; }
215
216
0
void ManagedIterator::RebuildIterator() {
217
0
  svnum_ = cfd_->GetSuperVersionNumber();
218
0
  mutable_iter_ = unique_ptr<Iterator>(db_->NewIterator(read_options_, &cfh_));
219
0
}
220
221
23.1k
void ManagedIterator::UpdateCurrent() {
222
23.1k
  assert(mutable_iter_ != nullptr);
223
224
23.1k
  if (!(valid_ = mutable_iter_->Valid())) {
225
75
    status_ = mutable_iter_->status();
226
75
    return;
227
75
  }
228
229
23.0k
  status_ = Status::OK();
230
23.0k
  cached_key_.SetKey(mutable_iter_->key());
231
23.0k
  cached_value_.SetKey(mutable_iter_->value());
232
23.0k
}
233
234
0
void ManagedIterator::ReleaseIter(bool only_old) {
235
0
  if ((mutable_iter_ == nullptr) || (!release_supported_)) {
236
0
    return;
237
0
  }
238
0
  if (svnum_ != cfd_->GetSuperVersionNumber() || !only_old) {
239
0
    if (!TryLock()) {  // Don't release iter if in use
240
0
      return;
241
0
    }
242
0
    mutable_iter_ = nullptr;  // in_use for a very short time
243
0
    UnLock();
244
0
  }
245
0
}
246
247
23.1k
bool ManagedIterator::NeedToRebuild() {
248
23.1k
  if ((mutable_iter_ == nullptr) || (status_.IsIncomplete()) ||
249
23.1k
      (!only_drop_old_ && 
(svnum_ != cfd_->GetSuperVersionNumber())0
)) {
250
0
    return true;
251
0
  }
252
23.1k
  return false;
253
23.1k
}
254
255
76
void ManagedIterator::Lock() {
256
76
  in_use_.lock();
257
76
  return;
258
76
}
259
260
0
bool ManagedIterator::TryLock() { return in_use_.try_lock(); }
261
262
76
void ManagedIterator::UnLock() {
263
76
  in_use_.unlock();
264
76
  XFUNC_TEST("managed_xftest_release", "managed_unlock", managed_unlock1,
265
76
             xf_manage_release, this);
266
76
}
267
268
}  // namespace rocksdb
269
270
#endif  // ROCKSDB_LITE