YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/mt-log-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <algorithm>
34
#include <mutex>
35
#include <vector>
36
37
#include "yb/consensus/log-test-base.h"
38
#include "yb/consensus/log_index.h"
39
40
#include "yb/gutil/algorithm.h"
41
#include "yb/gutil/ref_counted.h"
42
#include "yb/gutil/strings/substitute.h"
43
44
#include "yb/util/locks.h"
45
#include "yb/util/metrics.h"
46
#include "yb/util/random.h"
47
#include "yb/util/status_log.h"
48
#include "yb/util/stopwatch.h"
49
#include "yb/util/thread.h"
50
51
// TODO: Semantics of the Log and Appender thread interactions changed and now multi-threaded
52
// writing is no longer allowed, or to be more precise, does no longer guarantee the ordering of
53
// events being written, across threads.
54
DEFINE_int32(num_writer_threads, 1, "Number of threads writing to the log");
55
DEFINE_int32(num_batches_per_thread, 2000, "Number of batches per thread");
56
DEFINE_int32(num_ops_per_batch_avg, 5, "Target average number of ops per batch");
57
58
namespace yb {
59
namespace log {
60
61
using std::vector;
62
using consensus::ReplicateMsgPtr;
63
64
namespace {
65
66
class CustomLatchCallback : public RefCountedThreadSafe<CustomLatchCallback> {
67
 public:
68
  CustomLatchCallback(CountDownLatch* latch, vector<Status>* errors)
69
      : latch_(latch),
70
2.00k
        errors_(errors) {
71
2.00k
  }
72
73
2.00k
  void StatusCB(const Status& s) {
74
2.00k
    if (!s.ok()) {
75
0
      errors_->push_back(s);
76
0
    }
77
2.00k
    latch_->CountDown();
78
2.00k
  }
79
80
2.00k
  StatusCallback AsStatusCallback() {
81
2.00k
    return Bind(&CustomLatchCallback::StatusCB, this);
82
2.00k
  }
83
84
 private:
85
  CountDownLatch* latch_;
86
  vector<Status>* errors_;
87
};
88
89
} // anonymous namespace
90
91
extern const char *kTestTablet;
92
93
class MultiThreadedLogTest : public LogTestBase {
94
 public:
95
  MultiThreadedLogTest()
96
1
      : random_(SeedRandom()) {
97
1
  }
98
99
1
  void SetUp() override {
100
1
    LogTestBase::SetUp();
101
1
  }
102
103
1
  void LogWriterThread(int thread_id) {
104
1
    CountDownLatch latch(FLAGS_num_batches_per_thread);
105
1
    vector<Status> errors;
106
2.00k
    for (int i = 0; i < FLAGS_num_batches_per_thread; i++) {
107
2.00k
      LogEntryBatch* entry_batch;
108
2.00k
      ReplicateMsgs batch_replicates;
109
2.00k
      int num_ops = static_cast<int>(random_.Normal(
110
2.00k
          static_cast<double>(FLAGS_num_ops_per_batch_avg), 1.0));
111
0
      DVLOG(1) << num_ops << " ops in this batch";
112
2.00k
      num_ops =  std::max(num_ops, 1);
113
2.00k
      {
114
2.00k
        std::lock_guard<simple_spinlock> lock_guard(lock_);
115
10.9k
        for (int j = 0; j < num_ops; j++) {
116
8.93k
          auto replicate = std::make_shared<ReplicateMsg>();
117
8.93k
          auto index = current_index_++;
118
8.93k
          OpIdPB* op_id = replicate->mutable_id();
119
8.93k
          op_id->set_term(0);
120
8.93k
          op_id->set_index(index);
121
122
8.93k
          replicate->set_op_type(WRITE_OP);
123
8.93k
          replicate->set_hybrid_time(clock_->Now().ToUint64());
124
125
8.93k
          tserver::WriteRequestPB request;
126
8.93k
          AddTestRowInsert(narrow_cast<int32_t>(index), 0, "this is a test insert", &request);
127
8.93k
          request.set_tablet_id(kTestTablet);
128
8.93k
          batch_replicates.push_back(replicate);
129
8.93k
        }
130
131
2.00k
        auto entry_batch_pb = CreateBatchFromAllocatedOperations(batch_replicates);
132
133
2.00k
        log_->Reserve(REPLICATE, &entry_batch_pb, &entry_batch);
134
2.00k
      } // lock_guard scope
135
2.00k
      auto cb = new CustomLatchCallback(&latch, &errors);
136
2.00k
      ASSERT_OK(log_->TEST_AsyncAppendWithReplicates(
137
2.00k
          entry_batch, batch_replicates, cb->AsStatusCallback()));
138
2.00k
    }
139
1
    LOG_TIMING(INFO, strings::Substitute("thread $0 waiting to append and sync $1 batches",
140
1
                                         thread_id, FLAGS_num_batches_per_thread)) {
141
1
      latch.Wait();
142
1
    }
143
0
    for (const Status& status : errors) {
144
0
      WARN_NOT_OK(status, "Unexpected failure during AsyncAppend");
145
0
    }
146
1
    ASSERT_EQ(0, errors.size());
147
1
  }
148
149
1
  void Run() {
150
2
    for (int i = 0; i < FLAGS_num_writer_threads; i++) {
151
1
      scoped_refptr<yb::Thread> new_thread;
152
1
      CHECK_OK(yb::Thread::Create("test", "inserter",
153
1
          &MultiThreadedLogTest::LogWriterThread, this, i, &new_thread));
154
1
      threads_.push_back(new_thread);
155
1
    }
156
1
    for (scoped_refptr<yb::Thread>& thread : threads_) {
157
1
      ASSERT_OK(ThreadJoiner(thread.get()).Join());
158
1
    }
159
1
  }
160
 private:
161
  ThreadSafeRandom random_;
162
  simple_spinlock lock_;
163
  vector<scoped_refptr<yb::Thread> > threads_;
164
};
165
166
1
TEST_F(MultiThreadedLogTest, TestAppends) {
167
1
  BuildLog();
168
1
  auto start_current_id = current_index_;
169
1
  LOG_TIMING(INFO, strings::Substitute("inserting $0 batches($1 threads, $2 per-thread)",
170
1
                                      FLAGS_num_writer_threads * FLAGS_num_batches_per_thread,
171
1
                                      FLAGS_num_batches_per_thread, FLAGS_num_writer_threads)) {
172
1
    ASSERT_NO_FATALS(Run());
173
1
  }
174
1
  ASSERT_OK(log_->Close());
175
176
1
  std::unique_ptr<LogReader> reader;
177
1
  ASSERT_OK(LogReader::Open(fs_manager_->env(), nullptr, "Log reader: ",
178
1
                            fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet),
179
1
                            nullptr, nullptr, &reader));
180
1
  SegmentSequence segments;
181
1
  ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
182
183
1
  std::vector<int64_t> ids;
184
1
  for (const SegmentSequence::value_type& entry : segments) {
185
1
    auto read_entries = entry->ReadEntries();
186
1
    ASSERT_OK(read_entries.status);
187
8.93k
    for (const auto& entry : read_entries.entries) {
188
8.93k
      if (entry->type() == REPLICATE) {
189
8.93k
        ids.push_back(entry->replicate().id().index());
190
8.93k
      }
191
8.93k
    }
192
1
  }
193
0
  DVLOG(1) << "Wrote total of " << current_index_ - start_current_id << " ops";
194
1
  ASSERT_EQ(current_index_ - start_current_id, ids.size());
195
1
  ASSERT_TRUE(std::is_sorted(ids.begin(), ids.end()));
196
1
}
197
198
} // namespace log
199
} // namespace yb