YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/common/consistent_read_point.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/common/consistent_read_point.h"
14
15
#include "yb/common/common.pb.h"
16
17
namespace yb {
18
19
ConsistentReadPoint::ConsistentReadPoint(const scoped_refptr<ClockBase>& clock)
20
360k
    : clock_(clock) {
21
360k
}
22
23
void ConsistentReadPoint::SetReadTime(
24
280k
    const ReadHybridTime& read_time, HybridTimeMap&& local_limits) {
25
280k
  std::lock_guard<simple_spinlock> lock(mutex_);
26
280k
  read_time_ = read_time;
27
280k
  restart_read_ht_ = read_time_.read;
28
280k
  local_limits_ = std::move(local_limits);
29
280k
  restarts_.clear();
30
280k
}
31
32
5.08M
void ConsistentReadPoint::SetCurrentReadTime() {
33
5.08M
  std::lock_guard<simple_spinlock> lock(mutex_);
34
5.08M
  read_time_ = ReadHybridTime::FromHybridTimeRange(clock_->NowRange());
35
5.08M
  restart_read_ht_ = read_time_.read;
36
5.08M
  local_limits_.clear();
37
5.08M
  restarts_.clear();
38
5.08M
}
39
40
1.20M
ReadHybridTime ConsistentReadPoint::GetReadTime(const TabletId& tablet) const {
41
1.20M
  std::lock_guard<simple_spinlock> lock(mutex_);
42
1.20M
  ReadHybridTime read_time = read_time_;
43
1.20M
  if (read_time) {
44
    // Use the local limit for the tablet but no earlier than the read time we want.
45
936k
    const auto it = local_limits_.find(tablet);
46
936k
    if (it != local_limits_.end()) {
47
376k
      read_time.local_limit = it->second;
48
376k
    }
49
936k
  }
50
1.20M
  return read_time;
51
1.20M
}
52
53
void ConsistentReadPoint::RestartRequired(const TabletId& tablet,
54
42
                                          const ReadHybridTime& restart_time) {
55
42
  std::lock_guard<simple_spinlock> lock(mutex_);
56
42
  RestartRequiredUnlocked(tablet, restart_time);
57
42
}
58
59
void ConsistentReadPoint::RestartRequiredUnlocked(
60
42
    const TabletId& tablet, const ReadHybridTime& restart_time) {
61
0
  DCHECK(read_time_) << "Unexpected restart without a read time set";
62
42
  restart_read_ht_.MakeAtLeast(restart_time.read);
63
  // We should inherit per-tablet restart time limits before restart, doing it lazily.
64
42
  if (restarts_.empty()) {
65
34
    restarts_ = local_limits_;
66
34
  }
67
42
  UpdateLimitsMapUnlocked(tablet, restart_time.local_limit, &restarts_);
68
42
}
69
70
574k
void ConsistentReadPoint::UpdateLocalLimit(const TabletId& tablet, HybridTime local_limit) {
71
574k
  std::lock_guard<simple_spinlock> lock(mutex_);
72
574k
  UpdateLimitsMapUnlocked(tablet, local_limit, &local_limits_);
73
574k
}
74
75
void ConsistentReadPoint::UpdateLimitsMapUnlocked(
76
574k
    const TabletId& tablet, const HybridTime& local_limit, HybridTimeMap* map) {
77
574k
  auto emplace_result = map->emplace(tablet, local_limit);
78
574k
  bool inserted = emplace_result.second;
79
574k
  if (!inserted) {
80
304k
    auto& existing_local_limit = emplace_result.first->second;
81
304k
    existing_local_limit = std::min(existing_local_limit, local_limit);
82
304k
  }
83
574k
}
84
85
401k
bool ConsistentReadPoint::IsRestartRequired() const {
86
401k
  std::lock_guard<simple_spinlock> lock(mutex_);
87
401k
  return IsRestartRequiredUnlocked();
88
401k
}
89
90
416k
bool ConsistentReadPoint::IsRestartRequiredUnlocked() const {
91
416k
  return !restarts_.empty();
92
416k
}
93
94
34
void ConsistentReadPoint::Restart() {
95
34
  std::lock_guard<simple_spinlock> lock(mutex_);
96
34
  local_limits_.swap(restarts_);
97
34
  restarts_.clear();
98
34
  read_time_.read = restart_read_ht_;
99
34
}
100
101
0
void ConsistentReadPoint::Defer() {
102
0
  std::lock_guard<simple_spinlock> lock(mutex_);
103
0
  read_time_.read = read_time_.global_limit;
104
0
}
105
106
5.73M
void ConsistentReadPoint::UpdateClock(HybridTime propagated_hybrid_time) {
107
5.73M
  clock_->Update(propagated_hybrid_time);
108
5.73M
}
109
110
6.25M
HybridTime ConsistentReadPoint::Now() const {
111
6.25M
  return clock_->Now();
112
6.25M
}
113
114
82.3k
void ConsistentReadPoint::PrepareChildTransactionData(ChildTransactionDataPB* data) const {
115
82.3k
  std::lock_guard<simple_spinlock> lock(mutex_);
116
82.3k
  read_time_.AddToPB(data);
117
82.3k
  auto& local_limits = *data->mutable_local_limits();
118
0
  for (const auto& entry : local_limits_) {
119
0
    typedef std::remove_reference<decltype(*local_limits.begin())>::type PairType;
120
0
    local_limits.insert(PairType(entry.first, entry.second.ToUint64()));
121
0
  }
122
82.3k
}
123
124
void ConsistentReadPoint::FinishChildTransactionResult(
125
14.5k
    HadReadTime had_read_time, ChildTransactionResultPB* result) const {
126
14.5k
  std::lock_guard<simple_spinlock> lock(mutex_);
127
14.5k
  if (IsRestartRequiredUnlocked()) {
128
0
    result->set_restart_read_ht(restart_read_ht_.ToUint64());
129
0
    auto& restarts = *result->mutable_read_restarts();
130
0
    for (const auto& restart : restarts_) {
131
0
      typedef std::remove_reference<decltype(*restarts.begin())>::type PairType;
132
0
      restarts.insert(PairType(restart.first, restart.second.ToUint64()));
133
0
    }
134
14.5k
  } else {
135
14.5k
    result->set_restart_read_ht(HybridTime::kInvalid.ToUint64());
136
14.5k
  }
137
138
14.5k
  if (!had_read_time && read_time_) {
139
0
    read_time_.ToPB(result->mutable_used_read_time());
140
0
  }
141
14.5k
}
142
143
14.1k
void ConsistentReadPoint::ApplyChildTransactionResult(const ChildTransactionResultPB& result) {
144
14.1k
  std::lock_guard<simple_spinlock> lock(mutex_);
145
14.1k
  if (result.has_used_read_time()) {
146
0
    LOG_IF(DFATAL, read_time_)
147
0
        << "Read time already picked (" << read_time_
148
0
        << ", but child result contains used read time: "
149
0
        << result.used_read_time().ShortDebugString();
150
0
    read_time_ = ReadHybridTime::FromPB(result.used_read_time());
151
0
    restart_read_ht_ = read_time_.read;
152
0
  }
153
154
14.1k
  HybridTime restart_read_ht(result.restart_read_ht());
155
14.1k
  if (restart_read_ht.is_valid()) {
156
0
    ReadHybridTime read_time;
157
0
    read_time.read = restart_read_ht;
158
0
    for (const auto& restart : result.read_restarts()) {
159
0
      read_time.local_limit = HybridTime(restart.second);
160
0
      RestartRequiredUnlocked(restart.first, read_time);
161
0
    }
162
0
  }
163
14.1k
}
164
165
531k
void ConsistentReadPoint::SetInTxnLimit(HybridTime value) {
166
531k
  std::lock_guard<simple_spinlock> lock(mutex_);
167
531k
  read_time_.in_txn_limit = value;
168
531k
}
169
170
1.20M
ReadHybridTime ConsistentReadPoint::GetReadTime() const {
171
1.20M
  std::lock_guard<simple_spinlock> lock(mutex_);
172
1.20M
  return read_time_;
173
1.20M
}
174
175
// NO_THREAD_SAFETY_ANALYSIS is required here because anylysis does not understand std::lock.
176
628
void ConsistentReadPoint::MoveFrom(ConsistentReadPoint* rhs) NO_THREAD_SAFETY_ANALYSIS {
177
628
  std::lock(mutex_, rhs->mutex_);
178
628
  std::lock_guard<simple_spinlock> lock1(mutex_, std::adopt_lock);
179
628
  std::lock_guard<simple_spinlock> lock2(rhs->mutex_, std::adopt_lock);
180
628
  read_time_ = rhs->read_time_;
181
628
  restart_read_ht_ = rhs->restart_read_ht_;
182
628
  local_limits_ = std::move(rhs->local_limits_);
183
628
  restarts_ = std::move(rhs->restarts_);
184
628
}
185
186
} // namespace yb