YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/common/consistent_read_point.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/common/consistent_read_point.h"
14
15
#include "yb/common/common.pb.h"
16
17
namespace yb {
18
19
ConsistentReadPoint::ConsistentReadPoint(const scoped_refptr<ClockBase>& clock)
20
534k
    : clock_(clock) {
21
534k
}
22
23
void ConsistentReadPoint::SetReadTime(
24
770k
    const ReadHybridTime& read_time, HybridTimeMap&& local_limits) {
25
770k
  std::lock_guard<simple_spinlock> lock(mutex_);
26
770k
  read_time_ = read_time;
27
770k
  restart_read_ht_ = read_time_.read;
28
770k
  local_limits_ = std::move(local_limits);
29
770k
  restarts_.clear();
30
770k
}
31
32
9.67M
void ConsistentReadPoint::SetCurrentReadTime() {
33
9.67M
  std::lock_guard<simple_spinlock> lock(mutex_);
34
9.67M
  read_time_ = ReadHybridTime::FromHybridTimeRange(clock_->NowRange());
35
9.67M
  restart_read_ht_ = read_time_.read;
36
9.67M
  local_limits_.clear();
37
9.67M
  restarts_.clear();
38
9.67M
}
39
40
2.78M
ReadHybridTime ConsistentReadPoint::GetReadTime(const TabletId& tablet) const {
41
2.78M
  std::lock_guard<simple_spinlock> lock(mutex_);
42
2.78M
  ReadHybridTime read_time = read_time_;
43
2.78M
  if (read_time) {
44
    // Use the local limit for the tablet but no earlier than the read time we want.
45
2.21M
    const auto it = local_limits_.find(tablet);
46
2.21M
    if (it != local_limits_.end()) {
47
886k
      read_time.local_limit = it->second;
48
886k
    }
49
2.21M
  }
50
2.78M
  return read_time;
51
2.78M
}
52
53
void ConsistentReadPoint::RestartRequired(const TabletId& tablet,
54
1.87k
                                          const ReadHybridTime& restart_time) {
55
1.87k
  std::lock_guard<simple_spinlock> lock(mutex_);
56
1.87k
  RestartRequiredUnlocked(tablet, restart_time);
57
1.87k
}
58
59
void ConsistentReadPoint::RestartRequiredUnlocked(
60
1.87k
    const TabletId& tablet, const ReadHybridTime& restart_time) {
61
1.87k
  DCHECK
(read_time_) << "Unexpected restart without a read time set"1
;
62
1.87k
  restart_read_ht_.MakeAtLeast(restart_time.read);
63
  // We should inherit per-tablet restart time limits before restart, doing it lazily.
64
1.87k
  if (restarts_.empty()) {
65
1.79k
    restarts_ = local_limits_;
66
1.79k
  }
67
1.87k
  UpdateLimitsMapUnlocked(tablet, restart_time.local_limit, &restarts_);
68
1.87k
}
69
70
1.54M
void ConsistentReadPoint::UpdateLocalLimit(const TabletId& tablet, HybridTime local_limit) {
71
1.54M
  std::lock_guard<simple_spinlock> lock(mutex_);
72
1.54M
  UpdateLimitsMapUnlocked(tablet, local_limit, &local_limits_);
73
1.54M
}
74
75
void ConsistentReadPoint::UpdateLimitsMapUnlocked(
76
1.54M
    const TabletId& tablet, const HybridTime& local_limit, HybridTimeMap* map) {
77
1.54M
  auto emplace_result = map->emplace(tablet, local_limit);
78
1.54M
  bool inserted = emplace_result.second;
79
1.54M
  if (!inserted) {
80
715k
    auto& existing_local_limit = emplace_result.first->second;
81
715k
    existing_local_limit = std::min(existing_local_limit, local_limit);
82
715k
  }
83
1.54M
}
84
85
470k
bool ConsistentReadPoint::IsRestartRequired() const {
86
470k
  std::lock_guard<simple_spinlock> lock(mutex_);
87
470k
  return IsRestartRequiredUnlocked();
88
470k
}
89
90
484k
bool ConsistentReadPoint::IsRestartRequiredUnlocked() const {
91
484k
  return !restarts_.empty();
92
484k
}
93
94
687
void ConsistentReadPoint::Restart() {
95
687
  std::lock_guard<simple_spinlock> lock(mutex_);
96
687
  local_limits_.swap(restarts_);
97
687
  restarts_.clear();
98
687
  read_time_.read = restart_read_ht_;
99
687
}
100
101
80
void ConsistentReadPoint::Defer() {
102
80
  std::lock_guard<simple_spinlock> lock(mutex_);
103
80
  read_time_.read = read_time_.global_limit;
104
80
}
105
106
11.6M
void ConsistentReadPoint::UpdateClock(HybridTime propagated_hybrid_time) {
107
11.6M
  clock_->Update(propagated_hybrid_time);
108
11.6M
}
109
110
12.3M
HybridTime ConsistentReadPoint::Now() const {
111
12.3M
  return clock_->Now();
112
12.3M
}
113
114
66.6k
void ConsistentReadPoint::PrepareChildTransactionData(ChildTransactionDataPB* data) const {
115
66.6k
  std::lock_guard<simple_spinlock> lock(mutex_);
116
66.6k
  read_time_.AddToPB(data);
117
66.6k
  auto& local_limits = *data->mutable_local_limits();
118
66.6k
  for (const auto& entry : local_limits_) {
119
0
    typedef std::remove_reference<decltype(*local_limits.begin())>::type PairType;
120
0
    local_limits.insert(PairType(entry.first, entry.second.ToUint64()));
121
0
  }
122
66.6k
}
123
124
void ConsistentReadPoint::FinishChildTransactionResult(
125
13.7k
    HadReadTime had_read_time, ChildTransactionResultPB* result) const {
126
13.7k
  std::lock_guard<simple_spinlock> lock(mutex_);
127
13.7k
  if (IsRestartRequiredUnlocked()) {
128
0
    result->set_restart_read_ht(restart_read_ht_.ToUint64());
129
0
    auto& restarts = *result->mutable_read_restarts();
130
0
    for (const auto& restart : restarts_) {
131
0
      typedef std::remove_reference<decltype(*restarts.begin())>::type PairType;
132
0
      restarts.insert(PairType(restart.first, restart.second.ToUint64()));
133
0
    }
134
13.7k
  } else {
135
13.7k
    result->set_restart_read_ht(HybridTime::kInvalid.ToUint64());
136
13.7k
  }
137
138
13.7k
  if (!had_read_time && 
read_time_0
) {
139
0
    read_time_.ToPB(result->mutable_used_read_time());
140
0
  }
141
13.7k
}
142
143
13.4k
void ConsistentReadPoint::ApplyChildTransactionResult(const ChildTransactionResultPB& result) {
144
13.4k
  std::lock_guard<simple_spinlock> lock(mutex_);
145
13.4k
  if (result.has_used_read_time()) {
146
0
    LOG_IF(DFATAL, read_time_)
147
0
        << "Read time already picked (" << read_time_
148
0
        << ", but child result contains used read time: "
149
0
        << result.used_read_time().ShortDebugString();
150
0
    read_time_ = ReadHybridTime::FromPB(result.used_read_time());
151
0
    restart_read_ht_ = read_time_.read;
152
0
  }
153
154
13.4k
  HybridTime restart_read_ht(result.restart_read_ht());
155
13.4k
  if (restart_read_ht.is_valid()) {
156
0
    ReadHybridTime read_time;
157
0
    read_time.read = restart_read_ht;
158
0
    for (const auto& restart : result.read_restarts()) {
159
0
      read_time.local_limit = HybridTime(restart.second);
160
0
      RestartRequiredUnlocked(restart.first, read_time);
161
0
    }
162
0
  }
163
13.4k
}
164
165
994k
void ConsistentReadPoint::SetInTxnLimit(HybridTime value) {
166
994k
  std::lock_guard<simple_spinlock> lock(mutex_);
167
994k
  read_time_.in_txn_limit = value;
168
994k
}
169
170
2.13M
ReadHybridTime ConsistentReadPoint::GetReadTime() const {
171
2.13M
  std::lock_guard<simple_spinlock> lock(mutex_);
172
2.13M
  return read_time_;
173
2.13M
}
174
175
// NO_THREAD_SAFETY_ANALYSIS is required here because anylysis does not understand std::lock.
176
13.4k
void ConsistentReadPoint::MoveFrom(ConsistentReadPoint* rhs) NO_THREAD_SAFETY_ANALYSIS {
177
13.4k
  std::lock(mutex_, rhs->mutex_);
178
13.4k
  std::lock_guard<simple_spinlock> lock1(mutex_, std::adopt_lock);
179
13.4k
  std::lock_guard<simple_spinlock> lock2(rhs->mutex_, std::adopt_lock);
180
13.4k
  read_time_ = rhs->read_time_;
181
13.4k
  restart_read_ht_ = rhs->restart_read_ht_;
182
13.4k
  local_limits_ = std::move(rhs->local_limits_);
183
13.4k
  restarts_ = std::move(rhs->restarts_);
184
13.4k
}
185
186
} // namespace yb