YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/preparer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/preparer.h"
15
16
#include <atomic>
17
#include <condition_variable>
18
#include <memory>
19
#include <mutex>
20
#include <thread>
21
#include <vector>
22
23
#include <boost/range/iterator_range_core.hpp>
24
#include <gflags/gflags.h>
25
26
#include "yb/consensus/consensus.h"
27
28
#include "yb/gutil/macros.h"
29
30
#include "yb/tablet/operations/operation_driver.h"
31
32
#include "yb/util/flag_tags.h"
33
#include "yb/util/lockfree.h"
34
#include "yb/util/logging.h"
35
#include "yb/util/threadpool.h"
36
37
DEFINE_uint64(max_group_replicate_batch_size, 16,
38
              "Maximum number of operations to submit to consensus for replication in a batch.");
39
40
DEFINE_test_flag(int32, preparer_batch_inject_latency_ms, 0,
41
                 "Inject latency before replicating batch.");
42
43
using namespace std::literals;
44
using std::vector;
45
46
namespace yb {
47
class ThreadPool;
48
class ThreadPoolToken;
49
50
namespace tablet {
51
52
// ------------------------------------------------------------------------------------------------
53
// PreparerImpl
54
55
class PreparerImpl {
56
 public:
57
  explicit PreparerImpl(consensus::Consensus* consensus, ThreadPool* tablet_prepare_pool);
58
  ~PreparerImpl();
59
  CHECKED_STATUS Start();
60
  void Stop();
61
62
  CHECKED_STATUS Submit(OperationDriver* operation_driver);
63
64
0
  ThreadPoolToken* PoolToken() {
65
0
    return tablet_prepare_pool_token_.get();
66
0
  }
67
68
 private:
69
  using OperationDrivers = std::vector<OperationDriver*>;
70
71
  consensus::Consensus* const consensus_;
72
73
  // We set this to true to tell the Run function to return. No new tasks will be accepted, but
74
  // existing tasks will still be processed.
75
  std::atomic<bool> stop_requested_{false};
76
77
  // If true, a task is running for this tablet already.
78
  // If false, no tasks are running for this tablet,
79
  // and we can submit a task to the thread pool token.
80
  std::atomic<bool> running_{false};
81
82
  // This is set to true immediately before the thread exits.
83
  std::atomic<bool> stopped_{false};
84
85
  // Number or active tasks is incremented before task added to queue and decremented after
86
  // it was popped.
87
  // So it always greater than or equal to number of entries in queue.
88
  std::atomic<int64_t> active_tasks_{0};
89
90
  // This flag is used in a sanity check to ensure that after this server becomes a follower,
91
  // the earlier leader-side operations that are still in the preparer's queue should fail to get
92
  // prepared due to old term. This sanity check will only be performed when an UpdateConsensus
93
  // with follower-side operations is received while earlier leader-side operations still have not
94
  // been processed, e.g. in an overloaded tablet server with lots of leader changes.
95
  std::atomic<bool> prepare_should_fail_{false};
96
97
  MPSCQueue<OperationDriver> queue_;
98
99
  // This mutex/condition combination is used in Stop() in case multiple threads are calling that
100
  // function concurrently. One of them will ask the prepare thread to stop and wait for it, and
101
  // then will notify other threads that have called Stop().
102
  std::mutex stop_mtx_;
103
  std::condition_variable stop_cond_;
104
105
  OperationDrivers leader_side_batch_;
106
107
  std::unique_ptr<ThreadPoolToken> tablet_prepare_pool_token_;
108
109
  // A temporary buffer of rounds to replicate, used to reduce reallocation.
110
  consensus::ConsensusRounds rounds_to_replicate_;
111
112
  void Run();
113
  void ProcessItem(OperationDriver* item);
114
115
  void ProcessAndClearLeaderSideBatch();
116
117
  // A wrapper around ProcessAndClearLeaderSideBatch that assumes we are currently holding the
118
  // mutex.
119
120
  void ReplicateSubBatch(OperationDrivers::iterator begin,
121
                         OperationDrivers::iterator end);
122
};
123
124
PreparerImpl::PreparerImpl(consensus::Consensus* consensus, ThreadPool* tablet_prepare_pool)
125
    : consensus_(consensus),
126
      tablet_prepare_pool_token_(tablet_prepare_pool
127
150k
                                     ->NewToken(ThreadPool::ExecutionMode::SERIAL)) {
128
150k
}
129
130
75.6k
PreparerImpl::~PreparerImpl() {
131
75.6k
  Stop();
132
75.6k
}
133
134
150k
Status PreparerImpl::Start() {
135
150k
  return Status::OK();
136
150k
}
137
138
151k
void PreparerImpl::Stop() {
139
151k
  if (stopped_.load(std::memory_order_acquire)) {
140
75.6k
    return;
141
75.6k
  }
142
75.6k
  stop_requested_ = true;
143
75.6k
  {
144
75.6k
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
145
75.6k
    stop_cond_.wait(stop_lock, [this] {
146
75.6k
      return !running_.load(std::memory_order_acquire) &&
147
75.6k
             
active_tasks_.load(std::memory_order_acquire) == 075.5k
;
148
75.6k
    });
149
75.6k
  }
150
75.6k
  stopped_.store(true, std::memory_order_release);
151
75.6k
}
152
153
31.4M
Status PreparerImpl::Submit(OperationDriver* operation_driver) {
154
31.4M
  if (stop_requested_.load(std::memory_order_acquire)) {
155
0
    return STATUS(IllegalState, "Tablet is shutting down");
156
0
  }
157
158
31.4M
  const bool leader_side = operation_driver->is_leader_side();
159
160
  // When a leader becomes a follower, we expect the leader-side operations still in the preparer's
161
  // queue to fail to be prepared because their term will be too old as we try to add them to the
162
  // Raft queue.
163
31.4M
  prepare_should_fail_.store(!leader_side, std::memory_order_release);
164
165
31.4M
  if (leader_side) {
166
    // Prepare leader-side operations on the "preparer thread" so we can only acquire the
167
    // ReplicaState lock once and append multiple operations.
168
5.04M
    active_tasks_.fetch_add(1, std::memory_order_release);
169
5.04M
    queue_.Push(operation_driver);
170
26.4M
  } else {
171
    // For follower-side operations, there would be no benefit in preparing them on the preparer
172
    // thread.
173
26.4M
    operation_driver->PrepareAndStartTask();
174
26.4M
  }
175
176
31.4M
  auto expected = false;
177
31.4M
  if (!running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
178
    // running_ was already true, so we are not creating a task to process operations.
179
800k
    return Status::OK();
180
800k
  }
181
  // We flipped running_ from 0 to 1. The previously running thread could go back to doing another
182
  // iteration, but in that case since we are submitting to a token of a thread pool, only one
183
  // such thread will be running, the other will be in the queue.
184
30.6M
  return tablet_prepare_pool_token_->SubmitFunc(std::bind(&PreparerImpl::Run, this));
185
31.4M
}
186
187
30.6M
void PreparerImpl::Run() {
188
30.6M
  VLOG
(2) << "Starting prepare task:" << this34
;
189
30.7M
  for (;;) {
190
35.8M
    while (OperationDriver *item = queue_.Pop()) {
191
5.04M
      active_tasks_.fetch_sub(1, std::memory_order_release);
192
5.04M
      ProcessItem(item);
193
5.04M
    }
194
30.7M
    ProcessAndClearLeaderSideBatch();
195
30.7M
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
196
30.7M
    running_.store(false, std::memory_order_release);
197
    // Check whether tasks were added while we were setting running to false.
198
30.7M
    if (active_tasks_.load(std::memory_order_acquire)) {
199
      // Got more operations, try stay in the loop.
200
76.7k
      bool expected = false;
201
76.7k
      if (running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
202
76.7k
        continue;
203
76.7k
      }
204
      // If someone else has flipped running_ to true, we can safely exit this function because
205
      // another task is already submitted to the same token.
206
76.7k
    }
207
30.6M
    if (stop_requested_.load(std::memory_order_acquire)) {
208
2
      VLOG
(2) << "Prepare task's Run() function is returning because stop is requested."0
;
209
2
      stop_cond_.notify_all();
210
2
    }
211
30.6M
    VLOG
(2) << "Returning from prepare task after inactivity: " << this376
;
212
30.6M
    return;
213
30.7M
  }
214
30.6M
}
215
216
namespace {
217
218
5.04M
bool ShouldApplySeparately(OperationType operation_type) {
219
5.04M
  switch (operation_type) {
220
    // For certain operations types we have to apply them in a batch of their own.
221
    // E.g. ChangeMetadataOperation::Prepare calls Tablet::CreatePreparedChangeMetadata, which
222
    // acquires the schema lock. Because of this, we must not attempt to process two
223
    // ChangeMetadataOperations in one batch, otherwise we'll deadlock.
224
    //
225
    // Also, for infrequently occuring operations batching has little performance benefit in
226
    // general.
227
462k
    case OperationType::kChangeMetadata: FALLTHROUGH_INTENDED;
228
462k
    case OperationType::kSnapshot: FALLTHROUGH_INTENDED;
229
520k
    case OperationType::kTruncate: FALLTHROUGH_INTENDED;
230
520k
    case OperationType::kSplit: FALLTHROUGH_INTENDED;
231
520k
    case OperationType::kEmpty: FALLTHROUGH_INTENDED;
232
520k
    case OperationType::kHistoryCutoff:
233
520k
      return true;
234
235
3.02M
    case OperationType::kWrite: FALLTHROUGH_INTENDED;
236
4.52M
    case OperationType::kUpdateTransaction:
237
4.52M
      return false;
238
5.04M
  }
239
0
  FATAL_INVALID_ENUM_VALUE(OperationType, operation_type);
240
0
}
241
242
}  // anonymous namespace
243
244
5.04M
void PreparerImpl::ProcessItem(OperationDriver* item) {
245
5.04M
  CHECK_NOTNULL(item);
246
247
18.4E
  LOG_IF(DFATAL, !item->is_leader_side()) << "Processing follower-side item";
248
249
5.04M
  auto operation_type = item->operation_type();
250
251
5.04M
  const bool apply_separately = ShouldApplySeparately(operation_type);
252
5.04M
  const int64_t bound_term = apply_separately ? 
-1520k
:
item->consensus_round()->bound_term()4.52M
;
253
254
  // Don't add more than the max number of operations to a batch, and also don't add
255
  // operations bound to different terms, so as not to fail unrelated operations
256
  // unnecessarily in case of a bound term mismatch.
257
5.04M
  if (leader_side_batch_.size() >= FLAGS_max_group_replicate_batch_size ||
258
5.04M
      
(5.04M
!leader_side_batch_.empty()5.04M
&&
259
5.04M
          
bound_term != leader_side_batch_.back()->consensus_round()->bound_term()116k
)) {
260
3.07k
    ProcessAndClearLeaderSideBatch();
261
3.07k
  }
262
5.04M
  leader_side_batch_.push_back(item);
263
5.04M
  if (apply_separately) {
264
520k
    ProcessAndClearLeaderSideBatch();
265
520k
  }
266
5.04M
}
267
268
31.2M
void PreparerImpl::ProcessAndClearLeaderSideBatch() {
269
31.2M
  if (leader_side_batch_.empty()) {
270
26.3M
    return;
271
26.3M
  }
272
273
4.93M
  VLOG
(2) << "Preparing a batch of " << leader_side_batch_.size() << " leader-side operations"147
;
274
275
4.93M
  auto iter = leader_side_batch_.begin();
276
4.93M
  auto replication_subbatch_begin = iter;
277
4.93M
  auto replication_subbatch_end = iter;
278
279
  // PrepareAndStart does not call Consensus::Replicate anymore as of 07/07/2017, and it is our
280
  // responsibility to do so in case of success. We call Consensus::ReplicateBatch for batches
281
  // of consecutive successfully prepared operations.
282
283
9.97M
  while (iter != leader_side_batch_.end()) {
284
5.04M
    auto* operation_driver = *iter;
285
286
5.04M
    Status s = operation_driver->PrepareAndStart();
287
288
5.04M
    if (PREDICT_TRUE(s.ok())) {
289
5.04M
      replication_subbatch_end = ++iter;
290
5.04M
    } else {
291
178
      ReplicateSubBatch(replication_subbatch_begin, replication_subbatch_end);
292
293
      // Handle failure for this operation itself.
294
178
      operation_driver->HandleFailure(s);
295
296
      // Now we'll start accumulating a new batch.
297
178
      replication_subbatch_begin = replication_subbatch_end = ++iter;
298
178
    }
299
5.04M
  }
300
301
  // Replicate the remaining batch. No-op for an empty batch.
302
4.93M
  ReplicateSubBatch(replication_subbatch_begin, replication_subbatch_end);
303
304
4.93M
  leader_side_batch_.clear();
305
4.93M
}
306
307
void PreparerImpl::ReplicateSubBatch(
308
    OperationDrivers::iterator batch_begin,
309
4.93M
    OperationDrivers::iterator batch_end) {
310
4.93M
  DCHECK_GE(std::distance(batch_begin, batch_end), 0);
311
4.93M
  if (batch_begin == batch_end) {
312
0
    return;
313
0
  }
314
4.93M
  VLOG(2) << "Replicating a sub-batch of " << std::distance(batch_begin, batch_end)
315
285
          << " leader-side operations";
316
4.93M
  if (VLOG_IS_ON(3)) {
317
2
    for (auto batch_iter = batch_begin; batch_iter != batch_end; 
++batch_iter1
) {
318
1
      VLOG
(3) << "Leader-side operation to be replicated: " << (*batch_iter)->ToString()0
;
319
1
    }
320
1
  }
321
322
4.93M
  rounds_to_replicate_.clear();
323
4.93M
  rounds_to_replicate_.reserve(std::distance(batch_begin, batch_end));
324
9.97M
  for (auto batch_iter = batch_begin; batch_iter != batch_end; 
++batch_iter5.04M
) {
325
5.04M
    DCHECK_ONLY_NOTNULL(*batch_iter);
326
5.04M
    DCHECK_ONLY_NOTNULL((*batch_iter)->consensus_round());
327
5.04M
    rounds_to_replicate_.push_back((*batch_iter)->consensus_round());
328
5.04M
  }
329
330
4.93M
  AtomicFlagSleepMs(&FLAGS_TEST_preparer_batch_inject_latency_ms);
331
  // Have to save this value before calling replicate batch.
332
  // Because the following scenario is legal:
333
  // Operation successfully processed by ReplicateBatch, but ReplicateBatch did not return yet.
334
  // Submit of follower side operation is called from another thread.
335
4.93M
  bool should_fail = prepare_should_fail_.load(std::memory_order_acquire);
336
4.93M
  const Status s = consensus_->ReplicateBatch(rounds_to_replicate_);
337
4.93M
  rounds_to_replicate_.clear();
338
339
4.93M
  if (
s.ok()4.93M
&& should_fail) {
340
0
    LOG(DFATAL) << "Operations should fail, but was successfully prepared: "
341
0
                << AsString(boost::make_iterator_range(batch_begin, batch_end));
342
0
  }
343
4.93M
}
344
345
// ------------------------------------------------------------------------------------------------
346
// Preparer
347
348
Preparer::Preparer(consensus::Consensus* consensus, ThreadPool* tablet_prepare_thread)
349
150k
    : impl_(std::make_unique<PreparerImpl>(consensus, tablet_prepare_thread)) {
350
150k
}
351
352
75.6k
Preparer::~Preparer() = default;
353
354
150k
Status Preparer::Start() {
355
150k
  VLOG
(1) << "Starting the preparer"42
;
356
150k
  return impl_->Start();
357
150k
}
358
359
75.6k
void Preparer::Stop() {
360
75.6k
  VLOG
(1) << "Stopping the preparer"28
;
361
75.6k
  impl_->Stop();
362
75.6k
  VLOG
(1) << "The preparer has stopped"48
;
363
75.6k
}
364
365
31.4M
Status Preparer::Submit(OperationDriver* operation_driver) {
366
31.4M
  return impl_->Submit(operation_driver);
367
31.4M
}
368
369
0
ThreadPoolToken* Preparer::PoolToken() {
370
0
  return impl_->PoolToken();
371
0
}
372
373
}  // namespace tablet
374
}  // namespace yb