YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/mvcc.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/mvcc.h"
34
35
#include <boost/circular_buffer.hpp>
36
#include <boost/variant.hpp>
37
38
#include "yb/gutil/macros.h"
39
40
#include "yb/util/atomic.h"
41
#include "yb/util/compare_util.h"
42
#include "yb/util/enums.h"
43
#include "yb/util/flag_tags.h"
44
#include "yb/util/format.h"
45
#include "yb/util/logging.h"
46
47
using namespace std::literals;
48
49
DEFINE_test_flag(int64, mvcc_op_trace_num_items, 32,
50
                 "Number of items to keep in an MvccManager operation trace. Set to 0 to disable "
51
                 "MVCC operation tracing.");
52
53
DEFINE_test_flag(int32, inject_mvcc_delay_add_leader_pending_ms, 0,
54
                 "Inject delay after MvccManager::AddLeaderPending read clock.");
55
56
namespace yb {
57
namespace tablet {
58
59
namespace {
60
61
struct SetLeaderOnlyModeTraceItem {
62
  bool leader_only;
63
64
0
  std::string ToString() const {
65
0
    return Format("SetLeaderOnlyMode $0", YB_STRUCT_TO_STRING(leader_only));
66
0
  }
67
};
68
69
struct SetLastReplicatedTraceItem {
70
  HybridTime ht;
71
72
0
  std::string ToString() const {
73
0
    return Format("SetLastReplicated $0", YB_STRUCT_TO_STRING(ht));
74
0
  }
75
};
76
77
struct SetPropagatedSafeTimeOnFollowerTraceItem {
78
  HybridTime ht;
79
80
0
  std::string ToString() const {
81
0
    return Format("SetPropagatedSafeTimeOnFollower $0", YB_STRUCT_TO_STRING(ht));
82
0
  }
83
};
84
85
struct UpdatePropagatedSafeTimeOnLeaderTraceItem {
86
  FixedHybridTimeLease ht_lease;
87
  HybridTime safe_time;
88
89
0
  std::string ToString() const {
90
0
    return Format("UpdatePropagatedSafeTimeOnLeader $0",
91
0
                  YB_STRUCT_TO_STRING(ht_lease, safe_time));
92
0
  }
93
};
94
95
struct AddLeaderPendingTraceItem {
96
  HybridTime ht;
97
  OpId op_id;
98
99
1
  std::string ToString() const {
100
1
    return Format("AddLeaderPending $0", YB_STRUCT_TO_STRING(ht, op_id));
101
1
  }
102
};
103
104
struct AddFollowerPendingTraceItem {
105
  HybridTime ht;
106
  OpId op_id;
107
108
1
  std::string ToString() const {
109
1
    return Format("AddFollowerPending $0", YB_STRUCT_TO_STRING(ht, op_id));
110
1
  }
111
};
112
113
struct ReplicatedTraceItem {
114
  HybridTime ht;
115
  OpId op_id;
116
117
2
  std::string ToString() const {
118
2
    return Format("Replicated $0", YB_STRUCT_TO_STRING(ht, op_id));
119
2
  }
120
};
121
122
struct AbortedTraceItem {
123
  HybridTime ht;
124
  OpId op_id;
125
126
0
  std::string ToString() const {
127
0
    return Format("Aborted $0", YB_STRUCT_TO_STRING(ht, op_id));
128
0
  }
129
};
130
131
struct SafeTimeTraceItem {
132
  HybridTime min_allowed;
133
  CoarseTimePoint deadline;
134
  FixedHybridTimeLease ht_lease;
135
  HybridTime safe_time;
136
137
5
  std::string ToString() const {
138
5
    return Format("SafeTime $0", YB_STRUCT_TO_STRING(min_allowed, deadline, ht_lease, safe_time));
139
5
  }
140
};
141
142
struct SafeTimeForFollowerTraceItem {
143
  HybridTime min_allowed;
144
  CoarseTimePoint deadline;
145
  SafeTimeWithSource safe_time_with_source;
146
147
0
  std::string ToString() const {
148
0
    return Format("SafeTimeForFollower $0",
149
0
                  YB_STRUCT_TO_STRING(min_allowed, deadline, safe_time_with_source));
150
0
  }
151
};
152
153
struct LastReplicatedHybridTimeTraceItem {
154
  HybridTime last_replicated;
155
156
0
  std::string ToString() const {
157
0
    return Format("LastReplicatedHybridTime $0", YB_STRUCT_TO_STRING(last_replicated));
158
0
  }
159
};
160
161
typedef boost::variant<
162
    SetLeaderOnlyModeTraceItem,
163
    SetLastReplicatedTraceItem,
164
    SetPropagatedSafeTimeOnFollowerTraceItem,
165
    UpdatePropagatedSafeTimeOnLeaderTraceItem,
166
    AddLeaderPendingTraceItem,
167
    AddFollowerPendingTraceItem,
168
    ReplicatedTraceItem,
169
    AbortedTraceItem,
170
    SafeTimeTraceItem,
171
    SafeTimeForFollowerTraceItem,
172
    LastReplicatedHybridTimeTraceItem
173
    > TraceItemVariant;
174
175
class ItemPrintingVisitor : public boost::static_visitor<>{
176
 public:
177
  explicit ItemPrintingVisitor(std::ostream* out, size_t index)
178
      : out_(*out),
179
9
        index_(index) {
180
9
  }
181
182
9
  template<typename T> void operator()(const T& t) const {
183
9
    out_ << index_ << ". " << t.ToString() << std::endl;
184
9
  }
Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_26SetLeaderOnlyModeTraceItemEEEvRKT_
Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_26SetLastReplicatedTraceItemEEEvRKT_
Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_40SetPropagatedSafeTimeOnFollowerTraceItemEEEvRKT_
Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_41UpdatePropagatedSafeTimeOnLeaderTraceItemEEEvRKT_
mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_25AddLeaderPendingTraceItemEEEvRKT_
Line
Count
Source
182
1
  template<typename T> void operator()(const T& t) const {
183
1
    out_ << index_ << ". " << t.ToString() << std::endl;
184
1
  }
mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_27AddFollowerPendingTraceItemEEEvRKT_
Line
Count
Source
182
1
  template<typename T> void operator()(const T& t) const {
183
1
    out_ << index_ << ". " << t.ToString() << std::endl;
184
1
  }
mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_19ReplicatedTraceItemEEEvRKT_
Line
Count
Source
182
2
  template<typename T> void operator()(const T& t) const {
183
2
    out_ << index_ << ". " << t.ToString() << std::endl;
184
2
  }
Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_16AbortedTraceItemEEEvRKT_
mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_17SafeTimeTraceItemEEEvRKT_
Line
Count
Source
182
5
  template<typename T> void operator()(const T& t) const {
183
5
    out_ << index_ << ". " << t.ToString() << std::endl;
184
5
  }
Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_28SafeTimeForFollowerTraceItemEEEvRKT_
Unexecuted instantiation: mvcc.cc:_ZNK2yb6tablet12_GLOBAL__N_119ItemPrintingVisitorclINS1_33LastReplicatedHybridTimeTraceItemEEEvRKT_
185
186
 private:
187
  std::ostream& out_;
188
  size_t index_;
189
};
190
191
}  // namespace
192
193
5
std::string FixedHybridTimeLease::ToString() const {
194
5
  return YB_STRUCT_TO_STRING(time, lease);
195
5
}
196
197
class MvccManager::MvccOpTrace {
198
 public:
199
89.2k
  explicit MvccOpTrace(size_t capacity) : items_(capacity) {}
200
48.1k
  ~MvccOpTrace() = default;
201
202
71.3M
  void Add(TraceItemVariant v) {
203
71.3M
    items_.push_back(std::move(v));
204
71.3M
  }
205
206
2
  void DumpTrace(ostream* out) const {
207
2
    if (items_.empty()) {
208
1
      *out << "No MVCC operations" << std::endl;
209
1
      return;
210
1
    }
211
1
    *out << "Recent " << items_.size() << " MVCC operations:" << std::endl;
212
1
    size_t i = 1;
213
9
    for (const auto& item : items_) {
214
9
      boost::apply_visitor(ItemPrintingVisitor(out, i), item);
215
9
      ++i;
216
9
    }
217
1
  }
218
219
 private:
220
  boost::circular_buffer_space_optimized<TraceItemVariant, std::allocator<TraceItemVariant>> items_;
221
};
222
223
struct MvccManager::InvariantViolationLoggingHelper {
224
  const std::string& log_prefix;
225
  MvccOpTrace* mvcc_op_trace;
226
};
227
228
std::ostream& operator<< (
229
    std::ostream& out,
230
0
    const MvccManager::InvariantViolationLoggingHelper& log_helper) {
231
0
  out << log_helper.log_prefix;
232
0
  log_helper.mvcc_op_trace->DumpTrace(&out);
233
0
  return out;
234
0
}
235
236
// ------------------------------------------------------------------------------------------------
237
// SafeTimeWithSource
238
// ------------------------------------------------------------------------------------------------
239
240
0
std::string SafeTimeWithSource::ToString() const {
241
0
  return Format("{ safe_time: $0 source: $1 }", safe_time, source);
242
0
}
243
244
// ------------------------------------------------------------------------------------------------
245
// MvccManager
246
// ------------------------------------------------------------------------------------------------
247
248
MvccManager::MvccManager(std::string prefix, server::ClockPtr clock)
249
    : prefix_(std::move(prefix)),
250
89.2k
      clock_(std::move(clock)) {
251
89.2k
  auto op_trace_num_items = GetAtomicFlag(&FLAGS_TEST_mvcc_op_trace_num_items);
252
89.2k
  if (op_trace_num_items > 0) {
253
89.2k
    op_trace_ = std::make_unique<MvccManager::MvccOpTrace>(op_trace_num_items);
254
89.2k
  }
255
89.2k
}
256
257
48.1k
MvccManager::~MvccManager() {
258
48.1k
}
259
260
7.48M
void MvccManager::Replicated(HybridTime ht, const OpId& op_id) {
261
882
  VLOG_WITH_PREFIX(1) << __func__ << "(" << ht << ", " << op_id << ")";
262
7.48M
  CHECK(!op_id.empty());
263
264
7.48M
  {
265
7.48M
    std::lock_guard<std::mutex> lock(mutex_);
266
7.48M
    if (op_trace_) {
267
7.48M
      op_trace_->Add(ReplicatedTraceItem { .ht = ht, .op_id = op_id });
268
7.48M
    }
269
18.4E
    CHECK(!queue_.empty()) << InvariantViolationLogPrefix();
270
0
    CHECK_EQ(queue_.front(),
271
0
             (QueueItem{ .hybrid_time = ht, .op_id = op_id })) << InvariantViolationLogPrefix();
272
7.48M
    queue_.pop_front();
273
7.48M
    last_replicated_ = ht;
274
7.48M
  }
275
7.48M
  cond_.notify_all();
276
7.48M
}
277
278
20.6k
void MvccManager::Aborted(HybridTime ht, const OpId& op_id) {
279
0
  VLOG_WITH_PREFIX(1) << __func__ << "(" << ht << ", " << op_id << ")";
280
281
20.6k
  {
282
20.6k
    std::lock_guard<std::mutex> lock(mutex_);
283
20.6k
    if (op_trace_) {
284
20.6k
      op_trace_->Add(AbortedTraceItem { .ht = ht, .op_id = op_id });
285
20.6k
    }
286
0
    CHECK(!queue_.empty()) << InvariantViolationLogPrefix();
287
0
    CHECK_EQ(queue_.back(),
288
0
             (QueueItem{ .hybrid_time = ht, .op_id = op_id }))
289
0
        << InvariantViolationLogPrefix() << "It is allowed to abort only last operation";
290
20.6k
    queue_.pop_back();
291
20.6k
  }
292
20.6k
  cond_.notify_all();
293
20.6k
}
294
295
1.16M
bool BadNextOpId(const OpId& prev, const OpId& next) {
296
1.16M
  if (prev.index >= next.index) {
297
0
    return true;
298
0
  }
299
1.16M
  if (prev.term > next.term) {
300
0
    return true;
301
0
  }
302
1.16M
  return false;
303
1.16M
}
304
305
2.53M
HybridTime MvccManager::AddLeaderPending(const OpId& op_id) {
306
2.53M
  std::lock_guard<std::mutex> lock(mutex_);
307
2.53M
  auto ht = clock_->Now();
308
2.53M
  AtomicFlagSleepMs(&FLAGS_TEST_inject_mvcc_delay_add_leader_pending_ms);
309
18.4E
  VLOG_WITH_PREFIX(1) << __func__ << "(" << op_id << "), time: " << ht;
310
2.53M
  AddPending(ht, op_id, /* is_follower_side= */ false);
311
312
2.53M
  if (op_trace_) {
313
2.53M
    op_trace_->Add(AddLeaderPendingTraceItem {
314
2.53M
      .ht = ht,
315
2.53M
      .op_id = op_id,
316
2.53M
    });
317
2.53M
  }
318
319
2.53M
  return ht;
320
2.53M
}
321
322
4.97M
void MvccManager::AddFollowerPending(HybridTime ht, const OpId& op_id) {
323
4.97M
  std::lock_guard<std::mutex> lock(mutex_);
324
803
  VLOG_WITH_PREFIX(1) << __func__ << "(" << ht << ", " << op_id << ")";
325
326
4.97M
  AddPending(ht, op_id, /* is_follower_side= */ true);
327
328
4.97M
  if (op_trace_) {
329
4.96M
    op_trace_->Add(AddFollowerPendingTraceItem {
330
4.96M
      .ht = ht,
331
4.96M
      .op_id = op_id,
332
4.96M
    });
333
4.96M
  }
334
4.97M
}
335
336
7.51M
void MvccManager::AddPending(HybridTime ht, const OpId& op_id, bool is_follower_side) {
337
7.51M
  CHECK(!op_id.empty());
338
339
6.33M
  HybridTime last_ht_in_queue = queue_.empty() ? HybridTime::kMin : queue_.back().hybrid_time;
340
341
7.51M
  HybridTime sanity_check_lower_bound =
342
7.51M
      std::max({
343
7.51M
          max_safe_time_returned_with_lease_.safe_time,
344
7.51M
          max_safe_time_returned_without_lease_.safe_time,
345
7.51M
          max_safe_time_returned_for_follower_.safe_time,
346
7.51M
          propagated_safe_time_,
347
7.51M
          last_replicated_,
348
7.51M
          last_ht_in_queue});
349
350
7.51M
  if (ht <= sanity_check_lower_bound) {
351
0
    auto get_details_msg = [&](bool drain_aborted) {
352
0
      std::ostringstream ss;
353
0
#define LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(full, safe_time) \
354
0
             "\n  " << EXPR_VALUE_FOR_LOG(full) \
355
0
          << "\n  " << (ht <= safe_time ? "!!! " : "") << EXPR_VALUE_FOR_LOG(ht <= safe_time) \
356
0
          << "\n  " << EXPR_VALUE_FOR_LOG( \
357
0
                           static_cast<int64_t>(ht.ToUint64() - safe_time.ToUint64())) \
358
0
          << "\n  " << EXPR_VALUE_FOR_LOG(ht.PhysicalDiff(safe_time)) \
359
0
          << "\n  "
360
361
0
#define LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(t) LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(t, t.safe_time)
362
0
#define LOG_INFO_FOR_HT_LOWER_BOUND(t) LOG_INFO_FOR_HT_LOWER_BOUND_IMPL(t, t)
363
364
0
      ss << "New operation's hybrid time too low: " << ht << ", op id: " << op_id
365
0
         << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_with_lease_)
366
0
         << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_without_lease_)
367
0
         << LOG_INFO_FOR_HT_LOWER_BOUND_WITH_SOURCE(max_safe_time_returned_for_follower_)
368
0
         << LOG_INFO_FOR_HT_LOWER_BOUND(last_replicated_)
369
0
         << LOG_INFO_FOR_HT_LOWER_BOUND(last_ht_in_queue)
370
0
         << LOG_INFO_FOR_HT_LOWER_BOUND(propagated_safe_time_)
371
0
         << "\n  " << EXPR_VALUE_FOR_LOG(queue_.size())
372
0
         << "\n  " << EXPR_VALUE_FOR_LOG(queue_);
373
0
      return ss.str();
374
0
#undef LOG_INFO_FOR_HT_LOWER_BOUND
375
0
    };
376
377
#ifdef NDEBUG
378
    // In release mode, let's try to avoid crashing if possible if we ever hit this situation.
379
    // On the leader side, we can assign a timestamp that is high enough.
380
    if (!is_follower_side &&
381
        sanity_check_lower_bound &&
382
        sanity_check_lower_bound != HybridTime::kMax) {
383
      HybridTime incremented_hybrid_time = sanity_check_lower_bound.Incremented();
384
      YB_LOG_EVERY_N_SECS(ERROR, 5) << LogPrefix()
385
          << "Assigning an artificially incremented hybrid time: " << incremented_hybrid_time
386
          << ". This needs to be investigated. " << get_details_msg(/* drain_aborted */ false);
387
      ht = incremented_hybrid_time;
388
    }
389
#endif
390
391
0
    if (ht <= sanity_check_lower_bound) {
392
0
      LOG_WITH_PREFIX(FATAL) << InvariantViolationLogPrefix()
393
0
                             << get_details_msg(/* drain_aborted */ true);
394
0
    }
395
0
  }
396
397
2.18k
  LOG_IF_WITH_PREFIX(DFATAL,
398
2.18k
                     !queue_.empty() && BadNextOpId(queue_.back().op_id, op_id))
399
2.18k
      << "Op sequence failure: " << AsString(queue_.back().op_id) << " followed by "
400
2.18k
      << AsString(op_id) << " " << InvariantViolationLogPrefix();
401
402
7.51M
  queue_.push_back(QueueItem {
403
7.51M
    .hybrid_time = ht,
404
7.51M
    .op_id = op_id,
405
7.51M
  });
406
7.51M
}
407
408
1.72k
void MvccManager::SetLastReplicated(HybridTime ht) {
409
0
  VLOG_WITH_PREFIX(1) << __func__ << "(" << ht << ")";
410
411
1.72k
  {
412
1.72k
    std::lock_guard<std::mutex> lock(mutex_);
413
1.72k
    if (op_trace_) {
414
1.72k
      op_trace_->Add(SetLastReplicatedTraceItem { .ht = ht });
415
1.72k
    }
416
1.72k
    last_replicated_ = ht;
417
1.72k
  }
418
1.72k
  cond_.notify_all();
419
1.72k
}
420
421
10.1M
void MvccManager::SetPropagatedSafeTimeOnFollower(HybridTime ht) {
422
2.15k
  VLOG_WITH_PREFIX(1) << __func__ << "(" << ht << ")";
423
424
10.1M
  {
425
10.1M
    std::lock_guard<std::mutex> lock(mutex_);
426
10.1M
    if (op_trace_) {
427
10.1M
      op_trace_->Add(SetPropagatedSafeTimeOnFollowerTraceItem { .ht = ht });
428
10.1M
    }
429
10.1M
    if (ht >= propagated_safe_time_) {
430
10.1M
      propagated_safe_time_ = ht;
431
8.80k
    } else {
432
8.80k
      LOG_WITH_PREFIX(WARNING)
433
8.80k
          << "Received propagated safe time " << ht << " less than the old value: "
434
8.80k
          << propagated_safe_time_ << ". This could happen on followers when a new leader "
435
8.80k
          << "is elected.";
436
8.80k
    }
437
10.1M
  }
438
10.1M
  cond_.notify_all();
439
10.1M
}
440
441
// NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock.
442
void MvccManager::UpdatePropagatedSafeTimeOnLeader(const FixedHybridTimeLease& ht_lease)
443
15.1M
    NO_THREAD_SAFETY_ANALYSIS {
444
1.03k
  VLOG_WITH_PREFIX(1) << __func__ << "(" << ht_lease << ")";
445
446
15.1M
  {
447
15.1M
    std::unique_lock<std::mutex> lock(mutex_);
448
15.1M
    auto safe_time = DoGetSafeTime(HybridTime::kMin,       // min_allowed
449
15.1M
                                   CoarseTimePoint::max(), // deadline
450
15.1M
                                   ht_lease,
451
15.1M
                                   &lock);
452
15.1M
#ifndef NDEBUG
453
    // This should only be called from RaftConsensus::UpdateMajorityReplicated, and ht_lease passed
454
    // in here should keep increasing, so we should not see propagated_safe_time_ going backwards.
455
0
    CHECK_GE(safe_time, propagated_safe_time_)
456
0
        << InvariantViolationLogPrefix()
457
0
        << "ht_lease: " << ht_lease;
458
15.1M
    propagated_safe_time_ = safe_time;
459
#else
460
    // Do not crash in production.
461
    if (safe_time < propagated_safe_time_) {
462
      YB_LOG_EVERY_N_SECS(ERROR, 5) << LogPrefix()
463
          << "Previously saw " << EXPR_VALUE_FOR_LOG(propagated_safe_time_)
464
          << ", but now safe time is " << safe_time;
465
    } else {
466
      propagated_safe_time_ = safe_time;
467
    }
468
#endif
469
470
15.1M
    if (op_trace_) {
471
15.1M
      op_trace_->Add(UpdatePropagatedSafeTimeOnLeaderTraceItem {
472
15.1M
        .ht_lease = ht_lease,
473
15.1M
        .safe_time = safe_time
474
15.1M
      });
475
15.1M
    }
476
15.1M
  }
477
15.1M
  cond_.notify_all();
478
15.1M
}
479
480
98.8k
void MvccManager::SetLeaderOnlyMode(bool leader_only) {
481
98.8k
  std::lock_guard<std::mutex> lock(mutex_);
482
98.8k
  if (op_trace_) {
483
98.8k
    op_trace_->Add(SetLeaderOnlyModeTraceItem {
484
98.8k
      .leader_only = leader_only
485
98.8k
    });
486
98.8k
  }
487
98.8k
  leader_only_mode_ = leader_only;
488
98.8k
}
489
490
// NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock.
491
HybridTime MvccManager::SafeTimeForFollower(
492
453k
    HybridTime min_allowed, CoarseTimePoint deadline) const NO_THREAD_SAFETY_ANALYSIS {
493
453k
  std::unique_lock<std::mutex> lock(mutex_);
494
495
453k
  if (leader_only_mode_) {
496
    // If there are no followers (RF == 1), use SafeTime() because propagated_safe_time_ might not
497
    // have a valid value.
498
5.21k
    return DoGetSafeTime(min_allowed, deadline, FixedHybridTimeLease(), &lock);
499
5.21k
  }
500
501
448k
  SafeTimeWithSource result;
502
450k
  auto predicate = [this, &result, min_allowed] {
503
    // last_replicated_ is updated earlier than propagated_safe_time_, so because of concurrency it
504
    // could be greater than propagated_safe_time_.
505
450k
    if (propagated_safe_time_ > last_replicated_) {
506
318k
      if (queue_.empty() || propagated_safe_time_ < queue_.front().hybrid_time) {
507
309k
        result.safe_time = propagated_safe_time_;
508
309k
        result.source = SafeTimeSource::kPropagated;
509
9.43k
      } else {
510
9.43k
        result.safe_time = queue_.front().hybrid_time.Decremented();
511
9.43k
        result.source = SafeTimeSource::kNextInQueue;
512
9.43k
      }
513
131k
    } else {
514
131k
      result.safe_time = last_replicated_;
515
131k
      result.source = SafeTimeSource::kLastReplicated;
516
131k
    }
517
450k
    return result.safe_time >= min_allowed;
518
450k
  };
519
448k
  if (deadline == CoarseTimePoint::max()) {
520
112k
    cond_.wait(lock, predicate);
521
335k
  } else if (!cond_.wait_until(lock, deadline, predicate)) {
522
0
    return HybridTime::kInvalid;
523
0
  }
524
18.4E
  VLOG_WITH_PREFIX(1) << "SafeTimeForFollower(" << min_allowed
525
18.4E
                      << "), result = " << result.ToString();
526
0
  CHECK_GE(result.safe_time, max_safe_time_returned_for_follower_.safe_time)
527
0
      << InvariantViolationLogPrefix()
528
0
      << "result: " << result.ToString()
529
0
      << ", max_safe_time_returned_for_follower_: "
530
0
      << max_safe_time_returned_for_follower_.ToString();
531
448k
  max_safe_time_returned_for_follower_ = result;
532
448k
  if (op_trace_) {
533
448k
    op_trace_->Add(SafeTimeForFollowerTraceItem {
534
448k
      .min_allowed = min_allowed,
535
448k
      .deadline = deadline,
536
448k
      .safe_time_with_source = result
537
448k
    });
538
448k
  }
539
448k
  return result.safe_time;
540
448k
}
541
542
// NO_THREAD_SAFETY_ANALYSIS because this analysis does not work with unique_lock.
543
HybridTime MvccManager::SafeTime(
544
    HybridTime min_allowed,
545
    CoarseTimePoint deadline,
546
18.2M
    const FixedHybridTimeLease& ht_lease) const NO_THREAD_SAFETY_ANALYSIS {
547
18.2M
  std::unique_lock<std::mutex> lock(mutex_);
548
18.2M
  auto safe_time = DoGetSafeTime(min_allowed, deadline, ht_lease, &lock);
549
18.2M
  if (op_trace_) {
550
18.2M
    op_trace_->Add(SafeTimeTraceItem {
551
18.2M
      .min_allowed = min_allowed,
552
18.2M
      .deadline = deadline,
553
18.2M
      .ht_lease = ht_lease,
554
18.2M
      .safe_time = safe_time
555
18.2M
    });
556
18.2M
  }
557
18.2M
  return safe_time;
558
18.2M
}
559
560
HybridTime MvccManager::DoGetSafeTime(const HybridTime min_allowed,
561
                                      const CoarseTimePoint deadline,
562
                                      const FixedHybridTimeLease& ht_lease,
563
33.3M
                                      std::unique_lock<std::mutex>* lock) const {
564
33.3M
  DCHECK_ONLY_NOTNULL(lock);
565
12.3k
  CHECK(ht_lease.lease.is_valid()) << InvariantViolationLogPrefix();
566
0
  CHECK_LE(min_allowed, ht_lease.lease) << InvariantViolationLogPrefix();
567
568
33.3M
  const bool has_lease = !ht_lease.empty();
569
  // Because different calls that have current hybrid time leader lease as an argument can come to
570
  // us out of order, we might see an older value of hybrid time leader lease expiration after a
571
  // newer value. We mitigate this by always using the highest value we've seen.
572
33.3M
  if (has_lease) {
573
18.4E
    LOG_IF_WITH_PREFIX(DFATAL, !ht_lease.time.is_valid()) << "Bad ht lease: " << ht_lease;
574
32.9M
  }
575
576
33.3M
  HybridTime result;
577
33.3M
  SafeTimeSource source = SafeTimeSource::kUnknown;
578
33.3M
  auto predicate = [this, &result, &source, min_allowed, ht_lease, has_lease] {
579
33.3M
    if (queue_.empty()) {
580
24.0M
      result = ht_lease.time.is_valid()
581
23.6M
          ? std::max(max_safe_time_returned_with_lease_.safe_time, ht_lease.time)
582
381k
          : clock_->Now();
583
24.0M
      source = SafeTimeSource::kNow;
584
8.09k
      VLOG_WITH_PREFIX(2) << "DoGetSafeTime, Now: " << result;
585
9.34M
    } else {
586
9.34M
      result = queue_.front().hybrid_time.Decremented();
587
9.34M
      source = SafeTimeSource::kNextInQueue;
588
18.4E
      VLOG_WITH_PREFIX(2) << "DoGetSafeTime, Queue front (decremented): " << result;
589
9.34M
    }
590
591
33.3M
    if (has_lease) {
592
32.9M
      auto used_lease = std::max({ht_lease.lease, max_safe_time_returned_with_lease_.safe_time});
593
32.9M
      if (result > used_lease) {
594
3.17M
        result = used_lease;
595
3.17M
        source = SafeTimeSource::kHybridTimeLease;
596
3.17M
      }
597
32.9M
    }
598
599
    // This function could be invoked at a follower, so it has a very old ht_lease. In this case it
600
    // is safe to read at least at last_replicated_.
601
33.3M
    result = std::max(result, last_replicated_);
602
603
33.3M
    return result >= min_allowed;
604
33.3M
  };
605
606
  // In the case of an empty queue, the safe hybrid time to read at is only limited by hybrid time
607
  // ht_lease, which is by definition higher than min_allowed, so we would not get blocked.
608
33.3M
  if (deadline == CoarseTimePoint::max()) {
609
32.7M
    cond_.wait(*lock, predicate);
610
595k
  } else if (!cond_.wait_until(*lock, deadline, predicate)) {
611
1
    return HybridTime::kInvalid;
612
1
  }
613
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(1)
614
18.4E
      << "(" << min_allowed << ", " << ht_lease << "),  result = " << result;
615
616
32.9M
  auto enforced_min_time = has_lease ? max_safe_time_returned_with_lease_.safe_time
617
419k
                                     : max_safe_time_returned_without_lease_.safe_time;
618
0
  CHECK_GE(result, enforced_min_time)
619
0
      << InvariantViolationLogPrefix()
620
0
      << ": " << EXPR_VALUE_FOR_LOG(has_lease)
621
0
      << ", " << EXPR_VALUE_FOR_LOG(enforced_min_time.ToUint64() - result.ToUint64())
622
0
      << ", " << EXPR_VALUE_FOR_LOG(ht_lease)
623
0
      << ", " << EXPR_VALUE_FOR_LOG(last_replicated_)
624
0
      << ", " << EXPR_VALUE_FOR_LOG(clock_->Now())
625
0
      << ", " << EXPR_VALUE_FOR_LOG(ToString(deadline))
626
0
      << ", " << EXPR_VALUE_FOR_LOG(queue_.size())
627
0
      << ", " << EXPR_VALUE_FOR_LOG(queue_);
628
629
33.3M
  if (has_lease) {
630
32.9M
    max_safe_time_returned_with_lease_ = { result, source };
631
418k
  } else {
632
418k
    max_safe_time_returned_without_lease_ = { result, source };
633
418k
  }
634
33.3M
  return result;
635
33.3M
}
636
637
12.3M
HybridTime MvccManager::LastReplicatedHybridTime() const {
638
12.3M
  std::lock_guard<std::mutex> lock(mutex_);
639
18.4E
  VLOG_WITH_PREFIX(1) << __func__ << "(), result = " << last_replicated_;
640
12.3M
  if (op_trace_) {
641
12.3M
    op_trace_->Add(LastReplicatedHybridTimeTraceItem {
642
12.3M
      .last_replicated = last_replicated_
643
12.3M
    });
644
12.3M
  }
645
12.3M
  return last_replicated_;
646
12.3M
}
647
648
// Using NO_THREAD_SAFETY_ANALYSIS here because we're only reading op_trace_ here and it is set
649
// in the constructor.
650
0
MvccManager::InvariantViolationLoggingHelper MvccManager::InvariantViolationLogPrefix() const {
651
0
  return { prefix_, op_trace_.get() };
652
0
}
653
654
// Ditto regarding NO_THREAD_SAFETY_ANALYSIS.
655
2
void MvccManager::TEST_DumpTrace(std::ostream* out) NO_THREAD_SAFETY_ANALYSIS {
656
2
  if (op_trace_)
657
2
    op_trace_->DumpTrace(out);
658
2
}
659
660
0
std::string MvccManager::QueueItem::ToString() const {
661
0
  return YB_STRUCT_TO_STRING(hybrid_time, op_id);
662
0
}
663
664
7.51M
bool MvccManager::QueueItem::Eq(const MvccManager::QueueItem& rhs) const {
665
7.51M
  const auto& lhs = *this;
666
7.51M
  return YB_STRUCT_EQUALS(hybrid_time, op_id);
667
7.51M
}
668
669
}  // namespace tablet
670
}  // namespace yb