YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/mvcc.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/mvcc.h"
34
35
#include <boost/circular_buffer.hpp>
36
#include <boost/variant.hpp>
37
38
#include "yb/gutil/macros.h"
39
40
#include "yb/util/atomic.h"
41
#include "yb/util/compare_util.h"
42
#include "yb/util/enums.h"
43
#include "yb/util/flag_tags.h"
44
#include "yb/util/format.h"
45
#include "yb/util/logging.h"
46
47
using namespace std::literals;
48
49
DEFINE_test_flag(int64, mvcc_op_trace_num_items, 32,
50
                 "Number of items to keep in an MvccManager operation trace. Set to 0 to disable "
51
                 "MVCC operation tracing.");
52
53
DEFINE_test_flag(int32, inject_mvcc_delay_add_leader_pending_ms, 0,
54
                 "Inject delay after MvccManager::AddLeaderPending read clock.");
55
56
namespace yb {
57
namespace tablet {
58
59
namespace {
60
61
struct SetLeaderOnlyModeTraceItem {
62
  bool leader_only;
63
64
0
  std::string ToString() const {
65
0
    return Format("SetLeaderOnlyMode $0", YB_STRUCT_TO_STRING(leader_only));
66
0
  }
67
};
68
69
struct SetLastReplicatedTraceItem {
70
  HybridTime ht;
71
72
0
  std::string ToString() const {
73
0
    return Format("SetLastReplicated $0", YB_STRUCT_TO_STRING(ht));
74
0
  }
75
};
76
77
struct SetPropagatedSafeTimeOnFollowerTraceItem {
78
  HybridTime ht;
79
80
0
  std::string ToString() const {
81
0
    return Format("SetPropagatedSafeTimeOnFollower $0", YB_STRUCT_TO_STRING(ht));
82
0
  }
83
};
84
85
struct UpdatePropagatedSafeTimeOnLeaderTraceItem {
86
  FixedHybridTimeLease ht_lease;
87
  HybridTime safe_time;
88
89
0
  std::string ToString() const {
90
0
    return Format("UpdatePropagatedSafeTimeOnLeader $0",
91
0
                  YB_STRUCT_TO_STRING(ht_lease, safe_time));
92
0
  }
93
};
94
95
struct AddLeaderPendingTraceItem {
96
  HybridTime ht;
97
  OpId op_id;
98
99
1
  std::string ToString() const {
100
1
    return Format("AddLeaderPending $0", YB_STRUCT_TO_STRING(ht, op_id));
101
1
  }
102
};
103
104
struct AddFollowerPendingTraceItem {
105
  HybridTime ht;
106
  OpId op_id;
107
108
1
  std::string ToString() const {
109
1
    return Format("AddFollowerPending $0", YB_STRUCT_TO_STRING(ht, op_id));
110
1
  }
111
};
112
113
struct ReplicatedTraceItem {
114
  HybridTime ht;
115
  OpId op_id;
116
117
2
  std::string ToString() const {
118
2
    return Format("Replicated $0", YB_STRUCT_TO_STRING(ht, op_id));
119
2
  }
120
};
121
122
struct AbortedTraceItem {
123
  HybridTime ht;
124
  OpId op_id;
125
126
0
  std::string ToString() const {
127
0
    return Format("Aborted $0", YB_STRUCT_TO_STRING(ht, op_id));
128
0
  }
129
};
130
131
struct SafeTimeTraceItem {
132
  HybridTime min_allowed;
133
  CoarseTimePoint deadline;
134
  FixedHybridTimeLease ht_lease;
135
  HybridTime safe_time;
136
137
5
  std::string ToString() const {
138
5
    return Format("SafeTime $0", YB_STRUCT_TO_STRING(min_allowed, deadline, ht_lease, safe_time));
139
5
  }
140
};
141
142
struct SafeTimeForFollowerTraceItem {
143
  HybridTime min_allowed;
144
  CoarseTimePoint deadline;
145
  SafeTimeWithSource safe_time_with_source;
146
147
0
  std::string ToString() const {
148
0
    return Format("SafeTimeForFollower $0",
149
0
                  YB_STRUCT_TO_STRING(min_allowed, deadline, safe_time_with_source));
150
0
  }
151
};
152
153
struct LastReplicatedHybridTimeTraceItem {
154
  HybridTime last_replicated;
155
156
0
  std::string ToString() const {
157
0
    return Format("LastReplicatedHybridTime $0", YB_STRUCT_TO_STRING(last_replicated));
158
0
  }
159
};
160
161
typedef boost::variant<
162
    SetLeaderOnlyModeTraceItem,
163
    SetLastReplicatedTraceItem,
164
    SetPropagatedSafeTimeOnFollowerTraceItem,
165
    UpdatePropagatedSafeTimeOnLeaderTraceItem,
166
    AddLeaderPendingTraceItem,
167
    AddFollowerPendingTraceItem,
168
    ReplicatedTraceItem,
169
    AbortedTraceItem,
170
    SafeTimeTraceItem,
171
    SafeTimeForFollowerTraceItem,
172
    LastReplicatedHybridTimeTraceItem
173
    > TraceItemVariant;
174
175
class ItemPrintingVisitor : public boost::static_visitor<>{
176
 public:
177
  explicit ItemPrintingVisitor(std::ostream* out, size_t index)
178
      : out_(*out),
179
9
        index_(index) {
180
9
  }
181
182
9
  template<typename T> void operator()(const T& t) const {
183
9
    out_ << index_ << ". " << t.ToString() << std::endl;
184
9
  }
Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::SetLeaderOnlyModeTraceItem>(yb::tablet::(anonymous namespace)::SetLeaderOnlyModeTraceItem const&) const
Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::SetLastReplicatedTraceItem>(yb::tablet::(anonymous namespace)::SetLastReplicatedTraceItem const&) const
Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::SetPropagatedSafeTimeOnFollowerTraceItem>(yb::tablet::(anonymous namespace)::SetPropagatedSafeTimeOnFollowerTraceItem const&) const
Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::UpdatePropagatedSafeTimeOnLeaderTraceItem>(yb::tablet::(anonymous namespace)::UpdatePropagatedSafeTimeOnLeaderTraceItem const&) const
mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::AddLeaderPendingTraceItem>(yb::tablet::(anonymous namespace)::AddLeaderPendingTraceItem const&) const
Line
Count
Source
182
1
  template<typename T> void operator()(const T& t) const {
183
1
    out_ << index_ << ". " << t.ToString() << std::endl;
184
1
  }
mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::AddFollowerPendingTraceItem>(yb::tablet::(anonymous namespace)::AddFollowerPendingTraceItem const&) const
Line
Count
Source
182
1
  template<typename T> void operator()(const T& t) const {
183
1
    out_ << index_ << ". " << t.ToString() << std::endl;
184
1
  }
mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::ReplicatedTraceItem>(yb::tablet::(anonymous namespace)::ReplicatedTraceItem const&) const
Line
Count
Source
182
2
  template<typename T> void operator()(const T& t) const {
183
2
    out_ << index_ << ". " << t.ToString() << std::endl;
184
2
  }
Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::AbortedTraceItem>(yb::tablet::(anonymous namespace)::AbortedTraceItem const&) const
mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::SafeTimeTraceItem>(yb::tablet::(anonymous namespace)::SafeTimeTraceItem const&) const
Line
Count
Source
182
5
  template<typename T> void operator()(const T& t) const {
183
5
    out_ << index_ << ". " << t.ToString() << std::endl;
184
5
  }
Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::SafeTimeForFollowerTraceItem>(yb::tablet::(anonymous namespace)::SafeTimeForFollowerTraceItem const&) const
Unexecuted instantiation: mvcc.cc:void yb::tablet::(anonymous namespace)::ItemPrintingVisitor::operator()<yb::tablet::(anonymous namespace)::LastReplicatedHybridTimeTraceItem>(yb::tablet::(anonymous namespace)::LastReplicatedHybridTimeTraceItem const&) const
185
186
 private:
187
  std::ostream& out_;
188
  size_t index_;
189
};
190
191
}  // namespace
192
193
5
std::string FixedHybridTimeLease::ToString() const {
194
5
  return YB_STRUCT_TO_STRING(time, lease);
195
5
}
196
197
class MvccManager::MvccOpTrace {
198
 public:
199
150k
  explicit MvccOpTrace(size_t capacity) : items_(capacity) {}
200
75.9k
  ~MvccOpTrace() = default;
201
202
155M
  void Add(TraceItemVariant v) {
203
155M
    items_.push_back(std::move(v));
204
155M
  }
205
206
2
  void DumpTrace(ostream* out) const {
207
2
    if (items_.empty()) {
208
1
      *out << "No MVCC operations" << std::endl;
209
1
      return;
210
1
    }
211
1
    *out << "Recent " << items_.size() << " MVCC operations:" << std::endl;
212
1
    size_t i = 1;
213
9
    for (const auto& item : items_) {
214
9
      boost::apply_visitor(ItemPrintingVisitor(out, i), item);
215
9
      ++i;
216
9
    }
217
1
  }
218
219
 private:
220
  boost::circular_buffer_space_optimized<TraceItemVariant, std::allocator<TraceItemVariant>> items_;
221
};
222
223
struct MvccManager::InvariantViolationLoggingHelper {
224
  const std::string& log_prefix;
225
  MvccOpTrace* mvcc_op_trace;
226
};
227
228
std::ostream& operator<< (
229
    std::ostream& out,
230
0
    const MvccManager::InvariantViolationLoggingHelper& log_helper) {
231
0
  out << log_helper.log_prefix;
232
0
  log_helper.mvcc_op_trace->DumpTrace(&out);
233
0
  return out;
234
0
}
235
236
// ------------------------------------------------------------------------------------------------
237
// SafeTimeWithSource
238
// ------------------------------------------------------------------------------------------------
239
240
0
std::string SafeTimeWithSource::ToString() const {
241
0
  return Format("{ safe_time: $0 source: $1 }", safe_time, source);
242
0
}
243
244
// ------------------------------------------------------------------------------------------------
245
// MvccManager
246
// ------------------------------------------------------------------------------------------------
247
248
MvccManager::MvccManager(std::string prefix, server::ClockPtr clock)
249
    : prefix_(std::move(prefix)),
250
150k
      clock_(std::move(clock)) {
251
150k
  auto op_trace_num_items = GetAtomicFlag(&FLAGS_TEST_mvcc_op_trace_num_items);
252
150k
  if (op_trace_num_items > 0) {
253
150k
    op_trace_ = std::make_unique<MvccManager::MvccOpTrace>(op_trace_num_items);
254
150k
  }
255
150k
}
256
257
76.0k
MvccManager::~MvccManager() {
258
76.0k
}
259
260
13.4M
void MvccManager::Replicated(HybridTime ht, const OpId& op_id) {
261
13.4M
  
VLOG_WITH_PREFIX991
(1) << __func__ << "(" << ht << ", " << op_id << ")"991
;
262
13.4M
  CHECK(!op_id.empty());
263
264
13.4M
  {
265
13.4M
    std::lock_guard<std::mutex> lock(mutex_);
266
13.4M
    if (op_trace_) {
267
13.4M
      op_trace_->Add(ReplicatedTraceItem { .ht = ht, .op_id = op_id });
268
13.4M
    }
269
18.4E
    CHECK(!queue_.empty()) << InvariantViolationLogPrefix();
270
13.4M
    CHECK_EQ(queue_.front(),
271
0
             (QueueItem{ .hybrid_time = ht, .op_id = op_id })) << InvariantViolationLogPrefix();
272
13.4M
    queue_.pop_front();
273
13.4M
    last_replicated_ = ht;
274
13.4M
  }
275
13.4M
  cond_.notify_all();
276
13.4M
}
277
278
20.4k
void MvccManager::Aborted(HybridTime ht, const OpId& op_id) {
279
20.4k
  
VLOG_WITH_PREFIX0
(1) << __func__ << "(" << ht << ", " << op_id << ")"0
;
280
281
20.4k
  {
282
20.4k
    std::lock_guard<std::mutex> lock(mutex_);
283
20.4k
    if (op_trace_) {
284
20.4k
      op_trace_->Add(AbortedTraceItem { .ht = ht, .op_id = op_id });
285
20.4k
    }
286
20.4k
    CHECK
(!queue_.empty()) << InvariantViolationLogPrefix()0
;
287
20.4k
    CHECK_EQ(queue_.back(),
288
0
             (QueueItem{ .hybrid_time = ht, .op_id = op_id }))
289
0
        << InvariantViolationLogPrefix() << "It is allowed to abort only last operation";
290
20.4k
    queue_.pop_back();
291
20.4k
  }
292
20.4k
  cond_.notify_all();
293
20.4k
}
294
295
2.02M
bool BadNextOpId(const OpId& prev, const OpId& next) {
296
2.02M
  if (prev.index >= next.index) {
297
0
    return true;
298
0
  }
299
2.02M
  if (prev.term > next.term) {
300
0
    return true;
301
0
  }
302
2.02M
  return false;
303
2.02M
}
304
305
4.64M
HybridTime MvccManager::AddLeaderPending(const OpId& op_id) {
306
4.64M
  std::lock_guard<std::mutex> lock(mutex_);
307
4.64M
  auto ht = clock_->Now();
308
4.64M
  AtomicFlagSleepMs(&FLAGS_TEST_inject_mvcc_delay_add_leader_pending_ms);
309
18.4E
  VLOG_WITH_PREFIX(1) << __func__ << "(" << op_id << "), time: " << ht;
310
4.64M
  AddPending(ht, op_id, /* is_follower_side= */ false);
311
312
4.64M
  if (op_trace_) {
313
4.64M
    op_trace_->Add(AddLeaderPendingTraceItem {
314
4.64M
      .ht = ht,
315
4.64M
      .op_id = op_id,
316
4.64M
    });
317
4.64M
  }
318
319
4.64M
  return ht;
320
4.64M
}
321
322
8.80M
void MvccManager::AddFollowerPending(HybridTime ht, const OpId& op_id) {
323
8.80M
  std::lock_guard<std::mutex> lock(mutex_);
324
8.80M
  
VLOG_WITH_PREFIX2.62k
(1) << __func__ << "(" << ht << ", " << op_id << ")"2.62k
;
325
326
8.80M
  AddPending(ht, op_id, /* is_follower_side= */ true);
327
328
8.80M
  if (op_trace_) {
329
8.79M
    op_trace_->Add(AddFollowerPendingTraceItem {
330
8.79M
      .ht = ht,
331
8.79M
      .op_id = op_id,
332
8.79M
    });
333
8.79M
  }
334
8.80M
}
335
336
13.4M
void MvccManager::AddPending(HybridTime ht, const OpId& op_id, bool is_follower_side) {
337
13.4M
  CHECK(!op_id.empty());
338
339
13.4M
  HybridTime last_ht_in_queue = queue_.empty() ? 
HybridTime::kMin11.4M
:
queue_.back().hybrid_time2.03M
;
340
341
13.4M
  HybridTime sanity_check_lower_bound =
342
13.4M
      std::max({
343
13.4M
          max_safe_time_returned_with_lease_.safe_time,
344
13.4M
          max_safe_time_returned_without_lease_.safe_time,
345
13.4M
          max_safe_time_returned_for_follower_.safe_time,
346
13.4M
          propagated_safe_time_,
347
13.4M
          last_replicated_,
348
13.4M
          last_ht_in_queue});
349
350
13.4M
  if (ht <= sanity_check_lower_bound) {
351
0
    auto get_details_msg = [&](bool drain_aborted) {
352
0
      std::ostringstream ss;
353
0
#define LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(full, safe_time) \
354
0
             "\n  " << EXPR_VALUE_FOR_LOG(full) \
355
0
          << "\n  " << (ht <= safe_time ? "!!! " : "") << EXPR_VALUE_FOR_LOG(ht <= safe_time) \
356
0
          << "\n  " << EXPR_VALUE_FOR_LOG( \
357
0
                           static_cast<int64_t>(ht.ToUint64() - safe_time.ToUint64())) \
358
0
          << "\n  " << EXPR_VALUE_FOR_LOG(ht.PhysicalDiff(safe_time)) \
359
0
          << "\n  "
360
361
0
#define LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(t) LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(t, t.safe_time)
362
0
#define LOG_INFO_FOR_HT_LOWER_BOUND(t) LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(t, t)
363
364
0
      ss << "New operation's hybrid time too low: " << ht << ", op id: " << op_id
365
0
         << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_with_lease_)
366
0
         << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_without_lease_)
367
0
         << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_for_follower_)
368
0
         << LOG_INFO_FOR_HT_LOWER_BOUND(last_replicated_)
369
0
         << LOG_INFO_FOR_HT_LOWER_BOUND(last_ht_in_queue)
370
0
         << LOG_INFO_FOR_HT_LOWER_BOUND(propagated_safe_time_)
371
0
         << "\n  " << EXPR_VALUE_FOR_LOG(queue_.size())
372
0
         << "\n  " << EXPR_VALUE_FOR_LOG(queue_);
373
0
      return ss.str();
374
0
#undef LOG_INFO_FOR_HT_LOWER_BOUND
375
0
    };
376
377
#ifdef NDEBUG
378
    // In release mode, let's try to avoid crashing if possible if we ever hit this situation.
379
    // On the leader side, we can assign a timestamp that is high enough.
380
    if (!is_follower_side &&
381
        sanity_check_lower_bound &&
382
        sanity_check_lower_bound != HybridTime::kMax) {
383
      HybridTime incremented_hybrid_time = sanity_check_lower_bound.Incremented();
384
      YB_LOG_EVERY_N_SECS(ERROR, 5) << LogPrefix()
385
          << "Assigning an artificially incremented hybrid time: " << incremented_hybrid_time
386
          << ". This needs to be investigated. " << get_details_msg(/* drain_aborted */ false);
387
      ht = incremented_hybrid_time;
388
    }
389
#endif
390
391
0
    if (ht <= sanity_check_lower_bound) {
392
0
      LOG_WITH_PREFIX(FATAL) << InvariantViolationLogPrefix()
393
0
                             << get_details_msg(/* drain_aborted */ true);
394
0
    }
395
0
  }
396
397
13.4M
  
LOG_IF_WITH_PREFIX3.38k
(DFATAL,
398
3.38k
                     !queue_.empty() && BadNextOpId(queue_.back().op_id, op_id))
399
3.38k
      << "Op sequence failure: " << AsString(queue_.back().op_id) << " followed by "
400
3.38k
      << AsString(op_id) << " " << InvariantViolationLogPrefix();
401
402
13.4M
  queue_.push_back(QueueItem {
403
13.4M
    .hybrid_time = ht,
404
13.4M
    .op_id = op_id,
405
13.4M
  });
406
13.4M
}
407
408
2.79k
void MvccManager::SetLastReplicated(HybridTime ht) {
409
2.79k
  
VLOG_WITH_PREFIX0
(1) << __func__ << "(" << ht << ")"0
;
410
411
2.79k
  {
412
2.79k
    std::lock_guard<std::mutex> lock(mutex_);
413
2.79k
    if (op_trace_) {
414
2.79k
      op_trace_->Add(SetLastReplicatedTraceItem { .ht = ht });
415
2.79k
    }
416
2.79k
    last_replicated_ = ht;
417
2.79k
  }
418
2.79k
  cond_.notify_all();
419
2.79k
}
420
421
25.2M
void MvccManager::SetPropagatedSafeTimeOnFollower(HybridTime ht) {
422
25.2M
  
VLOG_WITH_PREFIX7.42k
(1) << __func__ << "(" << ht << ")"7.42k
;
423
424
25.2M
  {
425
25.2M
    std::lock_guard<std::mutex> lock(mutex_);
426
25.2M
    if (op_trace_) {
427
25.2M
      op_trace_->Add(SetPropagatedSafeTimeOnFollowerTraceItem { .ht = ht });
428
25.2M
    }
429
25.2M
    if (ht >= propagated_safe_time_) {
430
25.2M
      propagated_safe_time_ = ht;
431
25.2M
    } else {
432
30.3k
      LOG_WITH_PREFIX(WARNING)
433
30.3k
          << "Received propagated safe time " << ht << " less than the old value: "
434
30.3k
          << propagated_safe_time_ << ". This could happen on followers when a new leader "
435
30.3k
          << "is elected.";
436
30.3k
    }
437
25.2M
  }
438
25.2M
  cond_.notify_all();
439
25.2M
}
440
441
// NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock.
442
void MvccManager::UpdatePropagatedSafeTimeOnLeader(const FixedHybridTimeLease& ht_lease)
443
34.6M
    NO_THREAD_SAFETY_ANALYSIS {
444
34.6M
  
VLOG_WITH_PREFIX7.66k
(1) << __func__ << "(" << ht_lease << ")"7.66k
;
445
446
34.6M
  {
447
34.6M
    std::unique_lock<std::mutex> lock(mutex_);
448
34.6M
    auto safe_time = DoGetSafeTime(HybridTime::kMin,       // min_allowed
449
34.6M
                                   CoarseTimePoint::max(), // deadline
450
34.6M
                                   ht_lease,
451
34.6M
                                   &lock);
452
34.6M
#ifndef NDEBUG
453
    // This should only be called from RaftConsensus::UpdateMajorityReplicated, and ht_lease passed
454
    // in here should keep increasing, so we should not see propagated_safe_time_ going backwards.
455
34.6M
    CHECK_GE(safe_time, propagated_safe_time_)
456
0
        << InvariantViolationLogPrefix()
457
0
        << "ht_lease: " << ht_lease;
458
34.6M
    propagated_safe_time_ = safe_time;
459
#else
460
    // Do not crash in production.
461
    if (safe_time < propagated_safe_time_) {
462
      YB_LOG_EVERY_N_SECS(ERROR, 5) << LogPrefix()
463
          << "Previously saw " << EXPR_VALUE_FOR_LOG(propagated_safe_time_)
464
          << ", but now safe time is " << safe_time;
465
    } else {
466
      propagated_safe_time_ = safe_time;
467
    }
468
#endif
469
470
34.6M
    if (op_trace_) {
471
34.6M
      op_trace_->Add(UpdatePropagatedSafeTimeOnLeaderTraceItem {
472
34.6M
        .ht_lease = ht_lease,
473
34.6M
        .safe_time = safe_time
474
34.6M
      });
475
34.6M
    }
476
34.6M
  }
477
34.6M
  cond_.notify_all();
478
34.6M
}
479
480
169k
void MvccManager::SetLeaderOnlyMode(bool leader_only) {
481
169k
  std::lock_guard<std::mutex> lock(mutex_);
482
169k
  if (op_trace_) {
483
169k
    op_trace_->Add(SetLeaderOnlyModeTraceItem {
484
169k
      .leader_only = leader_only
485
169k
    });
486
169k
  }
487
169k
  leader_only_mode_ = leader_only;
488
169k
}
489
490
// NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock.
491
HybridTime MvccManager::SafeTimeForFollower(
492
904k
    HybridTime min_allowed, CoarseTimePoint deadline) const NO_THREAD_SAFETY_ANALYSIS {
493
904k
  std::unique_lock<std::mutex> lock(mutex_);
494
495
904k
  if (leader_only_mode_) {
496
    // If there are no followers (RF == 1), use SafeTime() because propagated_safe_time_ might not
497
    // have a valid value.
498
81.1k
    return DoGetSafeTime(min_allowed, deadline, FixedHybridTimeLease(), &lock);
499
81.1k
  }
500
501
823k
  SafeTimeWithSource result;
502
827k
  auto predicate = [this, &result, min_allowed] {
503
    // last_replicated_ is updated earlier than propagated_safe_time_, so because of concurrency it
504
    // could be greater than propagated_safe_time_.
505
827k
    if (propagated_safe_time_ > last_replicated_) {
506
636k
      if (queue_.empty() || 
propagated_safe_time_ < queue_.front().hybrid_time215k
) {
507
617k
        result.safe_time = propagated_safe_time_;
508
617k
        result.source = SafeTimeSource::kPropagated;
509
617k
      } else {
510
18.8k
        result.safe_time = queue_.front().hybrid_time.Decremented();
511
18.8k
        result.source = SafeTimeSource::kNextInQueue;
512
18.8k
      }
513
636k
    } else {
514
190k
      result.safe_time = last_replicated_;
515
190k
      result.source = SafeTimeSource::kLastReplicated;
516
190k
    }
517
827k
    return result.safe_time >= min_allowed;
518
827k
  };
519
823k
  if (deadline == CoarseTimePoint::max()) {
520
322k
    cond_.wait(lock, predicate);
521
500k
  } else if (!cond_.wait_until(lock, deadline, predicate)) {
522
0
    return HybridTime::kInvalid;
523
0
  }
524
18.4E
  VLOG_WITH_PREFIX(1) << "SafeTimeForFollower(" << min_allowed
525
18.4E
                      << "), result = " << result.ToString();
526
823k
  CHECK_GE(result.safe_time, max_safe_time_returned_for_follower_.safe_time)
527
0
      << InvariantViolationLogPrefix()
528
0
      << "result: " << result.ToString()
529
0
      << ", max_safe_time_returned_for_follower_: "
530
0
      << max_safe_time_returned_for_follower_.ToString();
531
823k
  max_safe_time_returned_for_follower_ = result;
532
823k
  if (
op_trace_823k
) {
533
823k
    op_trace_->Add(SafeTimeForFollowerTraceItem {
534
823k
      .min_allowed = min_allowed,
535
823k
      .deadline = deadline,
536
823k
      .safe_time_with_source = result
537
823k
    });
538
823k
  }
539
823k
  return result.safe_time;
540
823k
}
541
542
// NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock.
543
HybridTime MvccManager::SafeTime(
544
    HybridTime min_allowed,
545
    CoarseTimePoint deadline,
546
38.7M
    const FixedHybridTimeLease& ht_lease) const NO_THREAD_SAFETY_ANALYSIS {
547
38.7M
  std::unique_lock<std::mutex> lock(mutex_);
548
38.7M
  auto safe_time = DoGetSafeTime(min_allowed, deadline, ht_lease, &lock);
549
38.7M
  if (
op_trace_38.7M
) {
550
38.7M
    op_trace_->Add(SafeTimeTraceItem {
551
38.7M
      .min_allowed = min_allowed,
552
38.7M
      .deadline = deadline,
553
38.7M
      .ht_lease = ht_lease,
554
38.7M
      .safe_time = safe_time
555
38.7M
    });
556
38.7M
  }
557
38.7M
  return safe_time;
558
38.7M
}
559
560
HybridTime MvccManager::DoGetSafeTime(const HybridTime min_allowed,
561
                                      const CoarseTimePoint deadline,
562
                                      const FixedHybridTimeLease& ht_lease,
563
73.5M
                                      std::unique_lock<std::mutex>* lock) const {
564
73.5M
  DCHECK_ONLY_NOTNULL(lock);
565
73.5M
  CHECK
(ht_lease.lease.is_valid()) << InvariantViolationLogPrefix()36.9k
;
566
73.5M
  CHECK_LE
(min_allowed, ht_lease.lease) << InvariantViolationLogPrefix()0
;
567
568
73.5M
  const bool has_lease = !ht_lease.empty();
569
  // Because different calls that have current hybrid time leader lease as an argument can come to
570
  // us out of order, we might see an older value of hybrid time leader lease expiration after a
571
  // newer value. We mitigate this by always using the highest value we've seen.
572
73.5M
  if (has_lease) {
573
72.3M
    
LOG_IF_WITH_PREFIX3.30k
(DFATAL, !ht_lease.time.is_valid()) << "Bad ht lease: " << ht_lease3.30k
;
574
72.3M
  }
575
576
73.5M
  HybridTime result;
577
73.5M
  SafeTimeSource source = SafeTimeSource::kUnknown;
578
73.5M
  auto predicate = [this, &result, &source, min_allowed, ht_lease, has_lease] {
579
73.5M
    if (queue_.empty()) {
580
55.8M
      result = ht_lease.time.is_valid()
581
55.8M
          ? 
std::max(max_safe_time_returned_with_lease_.safe_time, ht_lease.time)54.7M
582
55.8M
          : 
clock_->Now()1.17M
;
583
55.8M
      source = SafeTimeSource::kNow;
584
55.8M
      
VLOG_WITH_PREFIX12.6k
(2) << "DoGetSafeTime, Now: " << result12.6k
;
585
55.8M
    } else {
586
17.7M
      result = queue_.front().hybrid_time.Decremented();
587
17.7M
      source = SafeTimeSource::kNextInQueue;
588
17.7M
      
VLOG_WITH_PREFIX48.3k
(2) << "DoGetSafeTime, Queue front (decremented): " << result48.3k
;
589
17.7M
    }
590
591
73.5M
    if (has_lease) {
592
72.3M
      auto used_lease = std::max({ht_lease.lease, max_safe_time_returned_with_lease_.safe_time});
593
72.3M
      if (result > used_lease) {
594
3.38M
        result = used_lease;
595
3.38M
        source = SafeTimeSource::kHybridTimeLease;
596
3.38M
      }
597
72.3M
    }
598
599
    // This function could be invoked at a follower, so it has a very old ht_lease. In this case it
600
    // is safe to read at least at last_replicated_.
601
73.5M
    result = std::max(result, last_replicated_);
602
603
73.5M
    return result >= min_allowed;
604
73.5M
  };
605
606
  // In the case of an empty queue, the safe hybrid time to read at is only limited by hybrid time
607
  // ht_lease, which is by definition higher than min_allowed, so we would not get blocked.
608
73.5M
  if (deadline == CoarseTimePoint::max()) {
609
71.9M
    cond_.wait(*lock, predicate);
610
71.9M
  } else 
if (1.58M
!cond_.wait_until(*lock, deadline, predicate)1.58M
) {
611
1
    return HybridTime::kInvalid;
612
1
  }
613
73.5M
  
VLOG_WITH_PREFIX_AND_FUNC13.2k
(1)
614
13.2k
      << "(" << min_allowed << ", " << ht_lease << "),  result = " << result;
615
616
73.5M
  auto enforced_min_time = has_lease ? 
max_safe_time_returned_with_lease_.safe_time72.3M
617
73.5M
                                     : 
max_safe_time_returned_without_lease_.safe_time1.21M
;
618
73.5M
  CHECK_GE(result, enforced_min_time)
619
0
      << InvariantViolationLogPrefix()
620
0
      << ": " << EXPR_VALUE_FOR_LOG(has_lease)
621
0
      << ", " << EXPR_VALUE_FOR_LOG(enforced_min_time.ToUint64() - result.ToUint64())
622
0
      << ", " << EXPR_VALUE_FOR_LOG(ht_lease)
623
0
      << ", " << EXPR_VALUE_FOR_LOG(last_replicated_)
624
0
      << ", " << EXPR_VALUE_FOR_LOG(clock_->Now())
625
0
      << ", " << EXPR_VALUE_FOR_LOG(ToString(deadline))
626
0
      << ", " << EXPR_VALUE_FOR_LOG(queue_.size())
627
0
      << ", " << EXPR_VALUE_FOR_LOG(queue_);
628
629
73.5M
  if (has_lease) {
630
72.3M
    max_safe_time_returned_with_lease_ = { result, source };
631
72.3M
  } else {
632
1.22M
    max_safe_time_returned_without_lease_ = { result, source };
633
1.22M
  }
634
73.5M
  return result;
635
73.5M
}
636
637
29.2M
HybridTime MvccManager::LastReplicatedHybridTime() const {
638
29.2M
  std::lock_guard<std::mutex> lock(mutex_);
639
18.4E
  VLOG_WITH_PREFIX(1) << __func__ << "(), result = " << last_replicated_;
640
29.2M
  if (
op_trace_29.2M
) {
641
29.2M
    op_trace_->Add(LastReplicatedHybridTimeTraceItem {
642
29.2M
      .last_replicated = last_replicated_
643
29.2M
    });
644
29.2M
  }
645
29.2M
  return last_replicated_;
646
29.2M
}
647
648
// Using NO_THREAD_SAFETY_ANALYSIS here because we're only reading op_trace_ here and it is set
649
// in the constructor.
650
0
MvccManager::InvariantViolationLoggingHelper MvccManager::InvariantViolationLogPrefix() const {
651
0
  return { prefix_, op_trace_.get() };
652
0
}
653
654
// Ditto regarding NO_THREAD_SAFETY_ANALYSIS.
655
2
void MvccManager::TEST_DumpTrace(std::ostream* out) NO_THREAD_SAFETY_ANALYSIS {
656
2
  if (op_trace_)
657
2
    op_trace_->DumpTrace(out);
658
2
}
659
660
0
std::string MvccManager::QueueItem::ToString() const {
661
0
  return YB_STRUCT_TO_STRING(hybrid_time, op_id);
662
0
}
663
664
13.4M
bool MvccManager::QueueItem::Eq(const MvccManager::QueueItem& rhs) const {
665
13.4M
  const auto& lhs = *this;
666
13.4M
  return YB_STRUCT_EQUALS(hybrid_time, op_id);
667
13.4M
}
668
669
}  // namespace tablet
670
}  // namespace yb