YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/preparer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/preparer.h"
15
16
#include <atomic>
17
#include <condition_variable>
18
#include <memory>
19
#include <mutex>
20
#include <thread>
21
#include <vector>
22
23
#include <boost/range/iterator_range_core.hpp>
24
#include <gflags/gflags.h>
25
26
#include "yb/consensus/consensus.h"
27
28
#include "yb/gutil/macros.h"
29
30
#include "yb/tablet/operations/operation_driver.h"
31
32
#include "yb/util/flag_tags.h"
33
#include "yb/util/lockfree.h"
34
#include "yb/util/logging.h"
35
#include "yb/util/threadpool.h"
36
37
DEFINE_uint64(max_group_replicate_batch_size, 16,
38
              "Maximum number of operations to submit to consensus for replication in a batch.");
39
40
DEFINE_test_flag(int32, preparer_batch_inject_latency_ms, 0,
41
                 "Inject latency before replicating batch.");
42
43
using namespace std::literals;
44
using std::vector;
45
46
namespace yb {
47
class ThreadPool;
48
class ThreadPoolToken;
49
50
namespace tablet {
51
52
// ------------------------------------------------------------------------------------------------
53
// PreparerImpl
54
55
class PreparerImpl {
56
 public:
57
  explicit PreparerImpl(consensus::Consensus* consensus, ThreadPool* tablet_prepare_pool);
58
  ~PreparerImpl();
59
  CHECKED_STATUS Start();
60
  void Stop();
61
62
  CHECKED_STATUS Submit(OperationDriver* operation_driver);
63
64
0
  ThreadPoolToken* PoolToken() {
65
0
    return tablet_prepare_pool_token_.get();
66
0
  }
67
68
 private:
69
  using OperationDrivers = std::vector<OperationDriver*>;
70
71
  consensus::Consensus* const consensus_;
72
73
  // We set this to true to tell the Run function to return. No new tasks will be accepted, but
74
  // existing tasks will still be processed.
75
  std::atomic<bool> stop_requested_{false};
76
77
  // If true, a task is running for this tablet already.
78
  // If false, no tasks are running for this tablet,
79
  // and we can submit a task to the thread pool token.
80
  std::atomic<bool> running_{false};
81
82
  // This is set to true immediately before the thread exits.
83
  std::atomic<bool> stopped_{false};
84
85
  // Number or active tasks is incremented before task added to queue and decremented after
86
  // it was popped.
87
  // So it always greater than or equal to number of entries in queue.
88
  std::atomic<int64_t> active_tasks_{0};
89
90
  // This flag is used in a sanity check to ensure that after this server becomes a follower,
91
  // the earlier leader-side operations that are still in the preparer's queue should fail to get
92
  // prepared due to old term. This sanity check will only be performed when an UpdateConsensus
93
  // with follower-side operations is received while earlier leader-side operations still have not
94
  // been processed, e.g. in an overloaded tablet server with lots of leader changes.
95
  std::atomic<bool> prepare_should_fail_{false};
96
97
  MPSCQueue<OperationDriver> queue_;
98
99
  // This mutex/condition combination is used in Stop() in case multiple threads are calling that
100
  // function concurrently. One of them will ask the prepare thread to stop and wait for it, and
101
  // then will notify other threads that have called Stop().
102
  std::mutex stop_mtx_;
103
  std::condition_variable stop_cond_;
104
105
  OperationDrivers leader_side_batch_;
106
107
  std::unique_ptr<ThreadPoolToken> tablet_prepare_pool_token_;
108
109
  // A temporary buffer of rounds to replicate, used to reduce reallocation.
110
  consensus::ConsensusRounds rounds_to_replicate_;
111
112
  void Run();
113
  void ProcessItem(OperationDriver* item);
114
115
  void ProcessAndClearLeaderSideBatch();
116
117
  // A wrapper around ProcessAndClearLeaderSideBatch that assumes we are currently holding the
118
  // mutex.
119
120
  void ReplicateSubBatch(OperationDrivers::iterator begin,
121
                         OperationDrivers::iterator end);
122
};
123
124
PreparerImpl::PreparerImpl(consensus::Consensus* consensus, ThreadPool* tablet_prepare_pool)
125
    : consensus_(consensus),
126
      tablet_prepare_pool_token_(tablet_prepare_pool
127
88.7k
                                     ->NewToken(ThreadPool::ExecutionMode::SERIAL)) {
128
88.7k
}
129
130
47.7k
PreparerImpl::~PreparerImpl() {
131
47.7k
  Stop();
132
47.7k
}
133
134
88.7k
Status PreparerImpl::Start() {
135
88.7k
  return Status::OK();
136
88.7k
}
137
138
95.5k
void PreparerImpl::Stop() {
139
95.5k
  if (stopped_.load(std::memory_order_acquire)) {
140
47.7k
    return;
141
47.7k
  }
142
47.7k
  stop_requested_ = true;
143
47.7k
  {
144
47.7k
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
145
47.7k
    stop_cond_.wait(stop_lock, [this] {
146
47.7k
      return !running_.load(std::memory_order_acquire) &&
147
47.7k
             active_tasks_.load(std::memory_order_acquire) == 0;
148
47.7k
    });
149
47.7k
  }
150
47.7k
  stopped_.store(true, std::memory_order_release);
151
47.7k
}
152
153
13.4M
Status PreparerImpl::Submit(OperationDriver* operation_driver) {
154
13.4M
  if (stop_requested_.load(std::memory_order_acquire)) {
155
0
    return STATUS(IllegalState, "Tablet is shutting down");
156
0
  }
157
158
13.4M
  const bool leader_side = operation_driver->is_leader_side();
159
160
  // When a leader becomes a follower, we expect the leader-side operations still in the preparer's
161
  // queue to fail to be prepared because their term will be too old as we try to add them to the
162
  // Raft queue.
163
13.4M
  prepare_should_fail_.store(!leader_side, std::memory_order_release);
164
165
13.4M
  if (leader_side) {
166
    // Prepare leader-side operations on the "preparer thread" so we can only acquire the
167
    // ReplicaState lock once and append multiple operations.
168
2.69M
    active_tasks_.fetch_add(1, std::memory_order_release);
169
2.69M
    queue_.Push(operation_driver);
170
10.8M
  } else {
171
    // For follower-side operations, there would be no benefit in preparing them on the preparer
172
    // thread.
173
10.8M
    operation_driver->PrepareAndStartTask();
174
10.8M
  }
175
176
13.4M
  auto expected = false;
177
13.4M
  if (!running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
178
    // running_ was already true, so we are not creating a task to process operations.
179
468k
    return Status::OK();
180
468k
  }
181
  // We flipped running_ from 0 to 1. The previously running thread could go back to doing another
182
  // iteration, but in that case since we are submitting to a token of a thread pool, only one
183
  // such thread will be running, the other will be in the queue.
184
13.0M
  return tablet_prepare_pool_token_->SubmitFunc(std::bind(&PreparerImpl::Run, this));
185
13.0M
}
186
187
13.0M
void PreparerImpl::Run() {
188
43
  VLOG(2) << "Starting prepare task:" << this;
189
13.0M
  for (;;) {
190
15.7M
    while (OperationDriver *item = queue_.Pop()) {
191
2.70M
      active_tasks_.fetch_sub(1, std::memory_order_release);
192
2.70M
      ProcessItem(item);
193
2.70M
    }
194
13.0M
    ProcessAndClearLeaderSideBatch();
195
13.0M
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
196
13.0M
    running_.store(false, std::memory_order_release);
197
    // Check whether tasks were added while we were setting running to false.
198
13.0M
    if (active_tasks_.load(std::memory_order_acquire)) {
199
      // Got more operations, try stay in the loop.
200
32.8k
      bool expected = false;
201
32.8k
      if (running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
202
32.8k
        continue;
203
32.8k
      }
204
      // If someone else has flipped running_ to true, we can safely exit this function because
205
      // another task is already submitted to the same token.
206
32.8k
    }
207
13.0M
    if (stop_requested_.load(std::memory_order_acquire)) {
208
0
      VLOG(2) << "Prepare task's Run() function is returning because stop is requested.";
209
0
      stop_cond_.notify_all();
210
0
    }
211
352
    VLOG(2) << "Returning from prepare task after inactivity: " << this;
212
13.0M
    return;
213
13.0M
  }
214
13.0M
}
215
216
namespace {
217
218
2.70M
bool ShouldApplySeparately(OperationType operation_type) {
219
2.70M
  switch (operation_type) {
220
    // For certain operations types we have to apply them in a batch of their own.
221
    // E.g. ChangeMetadataOperation::Prepare calls Tablet::CreatePreparedChangeMetadata, which
222
    // acquires the schema lock. Because of this, we must not attempt to process two
223
    // ChangeMetadataOperations in one batch, otherwise we'll deadlock.
224
    //
225
    // Also, for infrequently occuring operations batching has little performance benefit in
226
    // general.
227
225k
    case OperationType::kChangeMetadata: FALLTHROUGH_INTENDED;
228
225k
    case OperationType::kSnapshot: FALLTHROUGH_INTENDED;
229
279k
    case OperationType::kTruncate: FALLTHROUGH_INTENDED;
230
279k
    case OperationType::kSplit: FALLTHROUGH_INTENDED;
231
279k
    case OperationType::kEmpty: FALLTHROUGH_INTENDED;
232
279k
    case OperationType::kHistoryCutoff:
233
279k
      return true;
234
235
1.54M
    case OperationType::kWrite: FALLTHROUGH_INTENDED;
236
2.42M
    case OperationType::kUpdateTransaction:
237
2.42M
      return false;
238
0
  }
239
0
  FATAL_INVALID_ENUM_VALUE(OperationType, operation_type);
240
0
}
241
242
}  // anonymous namespace
243
244
2.70M
void PreparerImpl::ProcessItem(OperationDriver* item) {
245
2.70M
  CHECK_NOTNULL(item);
246
247
18.4E
  LOG_IF(DFATAL, !item->is_leader_side()) << "Processing follower-side item";
248
249
2.70M
  auto operation_type = item->operation_type();
250
251
2.70M
  const bool apply_separately = ShouldApplySeparately(operation_type);
252
2.42M
  const int64_t bound_term = apply_separately ? -1 : item->consensus_round()->bound_term();
253
254
  // Don't add more than the max number of operations to a batch, and also don't add
255
  // operations bound to different terms, so as not to fail unrelated operations
256
  // unnecessarily in case of a bound term mismatch.
257
2.70M
  if (leader_side_batch_.size() >= FLAGS_max_group_replicate_batch_size ||
258
2.69M
      (!leader_side_batch_.empty() &&
259
96.4k
          bound_term != leader_side_batch_.back()->consensus_round()->bound_term())) {
260
3.53k
    ProcessAndClearLeaderSideBatch();
261
3.53k
  }
262
2.70M
  leader_side_batch_.push_back(item);
263
2.70M
  if (apply_separately) {
264
279k
    ProcessAndClearLeaderSideBatch();
265
279k
  }
266
2.70M
}
267
268
13.3M
void PreparerImpl::ProcessAndClearLeaderSideBatch() {
269
13.3M
  if (leader_side_batch_.empty()) {
270
10.7M
    return;
271
10.7M
  }
272
273
110
  VLOG(2) << "Preparing a batch of " << leader_side_batch_.size() << " leader-side operations";
274
275
2.60M
  auto iter = leader_side_batch_.begin();
276
2.60M
  auto replication_subbatch_begin = iter;
277
2.60M
  auto replication_subbatch_end = iter;
278
279
  // PrepareAndStart does not call Consensus::Replicate anymore as of 07/07/2017, and it is our
280
  // responsibility to do so in case of success. We call Consensus::ReplicateBatch for batches
281
  // of consecutive successfully prepared operations.
282
283
5.30M
  while (iter != leader_side_batch_.end()) {
284
2.70M
    auto* operation_driver = *iter;
285
286
2.70M
    Status s = operation_driver->PrepareAndStart();
287
288
2.70M
    if (PREDICT_TRUE(s.ok())) {
289
2.70M
      replication_subbatch_end = ++iter;
290
115
    } else {
291
115
      ReplicateSubBatch(replication_subbatch_begin, replication_subbatch_end);
292
293
      // Handle failure for this operation itself.
294
115
      operation_driver->HandleFailure(s);
295
296
      // Now we'll start accumulating a new batch.
297
115
      replication_subbatch_begin = replication_subbatch_end = ++iter;
298
115
    }
299
2.70M
  }
300
301
  // Replicate the remaining batch. No-op for an empty batch.
302
2.60M
  ReplicateSubBatch(replication_subbatch_begin, replication_subbatch_end);
303
304
2.60M
  leader_side_batch_.clear();
305
2.60M
}
306
307
void PreparerImpl::ReplicateSubBatch(
308
    OperationDrivers::iterator batch_begin,
309
2.60M
    OperationDrivers::iterator batch_end) {
310
2.60M
  DCHECK_GE(std::distance(batch_begin, batch_end), 0);
311
2.60M
  if (batch_begin == batch_end) {
312
0
    return;
313
0
  }
314
188
  VLOG(2) << "Replicating a sub-batch of " << std::distance(batch_begin, batch_end)
315
188
          << " leader-side operations";
316
0
  if (VLOG_IS_ON(3)) {
317
0
    for (auto batch_iter = batch_begin; batch_iter != batch_end; ++batch_iter) {
318
0
      VLOG(3) << "Leader-side operation to be replicated: " << (*batch_iter)->ToString();
319
0
    }
320
0
  }
321
322
2.60M
  rounds_to_replicate_.clear();
323
2.60M
  rounds_to_replicate_.reserve(std::distance(batch_begin, batch_end));
324
5.30M
  for (auto batch_iter = batch_begin; batch_iter != batch_end; ++batch_iter) {
325
2.70M
    DCHECK_ONLY_NOTNULL(*batch_iter);
326
2.70M
    DCHECK_ONLY_NOTNULL((*batch_iter)->consensus_round());
327
2.70M
    rounds_to_replicate_.push_back((*batch_iter)->consensus_round());
328
2.70M
  }
329
330
2.60M
  AtomicFlagSleepMs(&FLAGS_TEST_preparer_batch_inject_latency_ms);
331
  // Have to save this value before calling replicate batch.
332
  // Because the following scenario is legal:
333
  // Operation successfully processed by ReplicateBatch, but ReplicateBatch did not return yet.
334
  // Submit of follower side operation is called from another thread.
335
2.60M
  bool should_fail = prepare_should_fail_.load(std::memory_order_acquire);
336
2.60M
  const Status s = consensus_->ReplicateBatch(rounds_to_replicate_);
337
2.60M
  rounds_to_replicate_.clear();
338
339
2.60M
  if (s.ok() && should_fail) {
340
0
    LOG(DFATAL) << "Operations should fail, but was successfully prepared: "
341
0
                << AsString(boost::make_iterator_range(batch_begin, batch_end));
342
0
  }
343
2.60M
}
344
345
// ------------------------------------------------------------------------------------------------
346
// Preparer
347
348
Preparer::Preparer(consensus::Consensus* consensus, ThreadPool* tablet_prepare_thread)
349
88.7k
    : impl_(std::make_unique<PreparerImpl>(consensus, tablet_prepare_thread)) {
350
88.7k
}
351
352
47.7k
Preparer::~Preparer() = default;
353
354
88.7k
Status Preparer::Start() {
355
11
  VLOG(1) << "Starting the preparer";
356
88.7k
  return impl_->Start();
357
88.7k
}
358
359
47.7k
void Preparer::Stop() {
360
3
  VLOG(1) << "Stopping the preparer";
361
47.7k
  impl_->Stop();
362
15
  VLOG(1) << "The preparer has stopped";
363
47.7k
}
364
365
13.5M
Status Preparer::Submit(OperationDriver* operation_driver) {
366
13.5M
  return impl_->Submit(operation_driver);
367
13.5M
}
368
369
0
ThreadPoolToken* Preparer::PoolToken() {
370
0
  return impl_->PoolToken();
371
0
}
372
373
}  // namespace tablet
374
}  // namespace yb