YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/log-test-base.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_CONSENSUS_LOG_TEST_BASE_H
33
#define YB_CONSENSUS_LOG_TEST_BASE_H
34
35
#include <utility>
36
#include <vector>
37
38
#include <glog/logging.h>
39
#include <gtest/gtest.h>
40
41
#include "yb/common/hybrid_time.h"
42
#include "yb/common/schema.h"
43
#include "yb/common/transaction.h"
44
#include "yb/common/wire_protocol-test-util.h"
45
#include "yb/consensus/log.h"
46
#include "yb/consensus/log_anchor_registry.h"
47
#include "yb/consensus/log_reader.h"
48
#include "yb/consensus/opid_util.h"
49
#include "yb/fs/fs_manager.h"
50
51
#include "yb/gutil/bind.h"
52
#include "yb/gutil/stl_util.h"
53
#include "yb/gutil/stringprintf.h"
54
#include "yb/gutil/strings/substitute.h"
55
#include "yb/gutil/strings/util.h"
56
57
#include "yb/server/clock.h"
58
#include "yb/server/hybrid_clock.h"
59
60
#include "yb/tserver/tserver.pb.h"
61
62
#include "yb/util/async_util.h"
63
#include "yb/util/env_util.h"
64
#include "yb/util/metrics.h"
65
#include "yb/util/path_util.h"
66
#include "yb/util/result.h"
67
#include "yb/util/test_macros.h"
68
#include "yb/util/test_util.h"
69
#include "yb/util/threadpool.h"
70
71
METRIC_DECLARE_entity(table);
72
METRIC_DECLARE_entity(tablet);
73
74
DECLARE_int32(log_min_seconds_to_retain);
75
76
namespace yb {
77
namespace log {
78
79
using consensus::ReplicateMsg;
80
using consensus::WRITE_OP;
81
using consensus::NO_OP;
82
using consensus::MakeOpId;
83
using consensus::MakeOpIdPB;
84
85
using server::Clock;
86
87
using tserver::WriteRequestPB;
88
89
const char* kTestNamespace = "test-ns";
90
const char* kTestTable = "test-log-table";
91
const char* kTestTablet = "test-log-tablet";
92
93
YB_STRONGLY_TYPED_BOOL(AppendSync);
94
95
// Append a single batch of 'count' NoOps to the log.  If 'size' is not nullptr, increments it by
96
// the expected increase in log size.  Increments 'op_id''s index once for each operation logged.
97
static CHECKED_STATUS AppendNoOpsToLogSync(const scoped_refptr<Clock>& clock,
98
                                           Log* log, OpIdPB* op_id,
99
                                           int count,
100
157k
                                           ssize_t* size = nullptr) {
101
157k
  ReplicateMsgs replicates;
102
314k
  for (int i = 0; i < count; i++) {
103
157k
    auto replicate = std::make_shared<ReplicateMsg>();
104
157k
    ReplicateMsg* repl = replicate.get();
105
106
157k
    repl->mutable_id()->CopyFrom(*op_id);
107
157k
    repl->set_op_type(NO_OP);
108
157k
    repl->set_hybrid_time(clock->Now().ToUint64());
109
110
    // Increment op_id.
111
157k
    op_id->set_index(op_id->index() + 1);
112
113
157k
    if (size) {
114
      // If we're tracking the sizes we need to account for the fact that the Log wraps the log
115
      // entry in an LogEntryBatchPB, and each actual entry will have a one-byte tag.
116
102k
      *size += repl->ByteSize() + 1;
117
102k
    }
118
157k
    replicates.push_back(replicate);
119
157k
  }
120
121
  // Account for the entry batch header and wrapper PB.
122
157k
  if (size) {
123
102k
    *size += log::kEntryHeaderSize + 7;
124
102k
  }
125
126
157k
  Synchronizer s;
127
157k
  RETURN_NOT_OK(log->AsyncAppendReplicates(
128
157k
      replicates, yb::OpId() /* committed_op_id */, RestartSafeCoarseTimePoint::FromUInt64(1),
129
157k
      s.AsStatusCallback()));
130
157k
  RETURN_NOT_OK(s.Wait());
131
157k
  return Status::OK();
132
157k
}
consensus_queue-test.cc:_ZN2yb3logL20AppendNoOpsToLogSyncERK13scoped_refptrINS_6server5ClockEEPNS0_3LogEPNS_6OpIdPBEiPl
Line
Count
Source
100
120
                                           ssize_t* size = nullptr) {
101
120
  ReplicateMsgs replicates;
102
240
  for (int i = 0; i < count; i++) {
103
120
    auto replicate = std::make_shared<ReplicateMsg>();
104
120
    ReplicateMsg* repl = replicate.get();
105
106
120
    repl->mutable_id()->CopyFrom(*op_id);
107
120
    repl->set_op_type(NO_OP);
108
120
    repl->set_hybrid_time(clock->Now().ToUint64());
109
110
    // Increment op_id.
111
120
    op_id->set_index(op_id->index() + 1);
112
113
120
    if (size) {
114
      // If we're tracking the sizes we need to account for the fact that the Log wraps the log
115
      // entry in an LogEntryBatchPB, and each actual entry will have a one-byte tag.
116
0
      *size += repl->ByteSize() + 1;
117
0
    }
118
120
    replicates.push_back(replicate);
119
120
  }
120
121
  // Account for the entry batch header and wrapper PB.
122
120
  if (size) {
123
0
    *size += log::kEntryHeaderSize + 7;
124
0
  }
125
126
120
  Synchronizer s;
127
120
  RETURN_NOT_OK(log->AsyncAppendReplicates(
128
120
      replicates, yb::OpId() /* committed_op_id */, RestartSafeCoarseTimePoint::FromUInt64(1),
129
120
      s.AsStatusCallback()));
130
120
  RETURN_NOT_OK(s.Wait());
131
120
  return Status::OK();
132
120
}
log-test.cc:_ZN2yb3logL20AppendNoOpsToLogSyncERK13scoped_refptrINS_6server5ClockEEPNS0_3LogEPNS_6OpIdPBEiPl
Line
Count
Source
100
157k
                                           ssize_t* size = nullptr) {
101
157k
  ReplicateMsgs replicates;
102
314k
  for (int i = 0; i < count; i++) {
103
157k
    auto replicate = std::make_shared<ReplicateMsg>();
104
157k
    ReplicateMsg* repl = replicate.get();
105
106
157k
    repl->mutable_id()->CopyFrom(*op_id);
107
157k
    repl->set_op_type(NO_OP);
108
157k
    repl->set_hybrid_time(clock->Now().ToUint64());
109
110
    // Increment op_id.
111
157k
    op_id->set_index(op_id->index() + 1);
112
113
157k
    if (size) {
114
      // If we're tracking the sizes we need to account for the fact that the Log wraps the log
115
      // entry in an LogEntryBatchPB, and each actual entry will have a one-byte tag.
116
102k
      *size += repl->ByteSize() + 1;
117
102k
    }
118
157k
    replicates.push_back(replicate);
119
157k
  }
120
121
  // Account for the entry batch header and wrapper PB.
122
157k
  if (size) {
123
102k
    *size += log::kEntryHeaderSize + 7;
124
102k
  }
125
126
157k
  Synchronizer s;
127
157k
  RETURN_NOT_OK(log->AsyncAppendReplicates(
128
157k
      replicates, yb::OpId() /* committed_op_id */, RestartSafeCoarseTimePoint::FromUInt64(1),
129
157k
      s.AsStatusCallback()));
130
157k
  RETURN_NOT_OK(s.Wait());
131
157k
  return Status::OK();
132
157k
}
Unexecuted instantiation: mt-log-test.cc:_ZN2yb3logL20AppendNoOpsToLogSyncERK13scoped_refptrINS_6server5ClockEEPNS0_3LogEPNS_6OpIdPBEiPl
Unexecuted instantiation: tablet_bootstrap-test.cc:_ZN2yb3logL20AppendNoOpsToLogSyncERK13scoped_refptrINS_6server5ClockEEPNS0_3LogEPNS_6OpIdPBEiPl
Unexecuted instantiation: tablet_server-test.cc:_ZN2yb3logL20AppendNoOpsToLogSyncERK13scoped_refptrINS_6server5ClockEEPNS0_3LogEPNS_6OpIdPBEiPl
133
134
static CHECKED_STATUS AppendNoOpToLogSync(const scoped_refptr<Clock>& clock,
135
                                          Log* log, OpIdPB* op_id,
136
157k
                                          ssize_t* size = nullptr) {
137
157k
  return AppendNoOpsToLogSync(clock, log, op_id, 1, size);
138
157k
}
consensus_queue-test.cc:_ZN2yb3logL19AppendNoOpToLogSyncERK13scoped_refptrINS_6server5ClockEEPNS0_3LogEPNS_6OpIdPBEPl
Line
Count
Source
136
120
                                          ssize_t* size = nullptr) {
137
120
  return AppendNoOpsToLogSync(clock, log, op_id, 1, size);
138
120
}
log-test.cc:_ZN2yb3logL19AppendNoOpToLogSyncERK13scoped_refptrINS_6server5ClockEEPNS0_3LogEPNS_6OpIdPBEPl
Line
Count
Source
136
157k
                                          ssize_t* size = nullptr) {
137
157k
  return AppendNoOpsToLogSync(clock, log, op_id, 1, size);
138
157k
}
Unexecuted instantiation: mt-log-test.cc:_ZN2yb3logL19AppendNoOpToLogSyncERK13scoped_refptrINS_6server5ClockEEPNS0_3LogEPNS_6OpIdPBEPl
Unexecuted instantiation: tablet_bootstrap-test.cc:_ZN2yb3logL19AppendNoOpToLogSyncERK13scoped_refptrINS_6server5ClockEEPNS0_3LogEPNS_6OpIdPBEPl
Unexecuted instantiation: tablet_server-test.cc:_ZN2yb3logL19AppendNoOpToLogSyncERK13scoped_refptrINS_6server5ClockEEPNS0_3LogEPNS_6OpIdPBEPl
139
140
class LogTestBase : public YBTest {
141
 public:
142
143
  typedef pair<int, int> DeltaId;
144
145
  typedef std::tuple<int, int, string> TupleForAppend;
146
147
  LogTestBase()
148
      : schema_({ ColumnSchema("key", INT32, false, true),
149
                  ColumnSchema("int_val", INT32),
150
                  ColumnSchema("string_val", STRING, true) },
151
                1),
152
40
        log_anchor_registry_(new LogAnchorRegistry()) {
153
40
  }
154
155
40
  virtual void SetUp() override {
156
40
    YBTest::SetUp();
157
40
    current_index_ = 1;
158
40
    fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test"));
159
40
    metric_registry_.reset(new MetricRegistry());
160
40
    table_metric_entity_ = METRIC_ENTITY_table.Instantiate(metric_registry_.get(), "log-test-base");
161
40
    tablet_metric_entity_ = METRIC_ENTITY_tablet.Instantiate(
162
40
                                metric_registry_.get(), "log-test-base-tablet");
163
40
    ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
164
40
    ASSERT_OK(fs_manager_->Open());
165
40
    tablet_wal_path_ = fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet);
166
40
    clock_.reset(new server::HybridClock());
167
40
    ASSERT_OK(clock_->Init());
168
40
    FLAGS_log_min_seconds_to_retain = 0;
169
40
    ASSERT_OK(ThreadPoolBuilder("log")
170
40
                 .unlimited_threads()
171
40
                 .Build(&log_thread_pool_));
172
40
  }
173
174
400
  void CleanTablet() {
175
400
    ASSERT_OK(fs_manager_->DeleteFileSystemLayout(ShouldDeleteLogs::kTrue));
176
400
    ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
177
400
  }
178
179
439
  void BuildLog() {
180
439
    Schema schema_with_ids = SchemaBuilder(schema_).Build();
181
439
    ASSERT_OK(Log::Open(options_,
182
439
                       kTestTablet,
183
439
                       tablet_wal_path_,
184
439
                       fs_manager_->uuid(),
185
439
                       schema_with_ids,
186
439
                       0, // schema_version
187
439
                       table_metric_entity_.get(),
188
439
                       tablet_metric_entity_.get(),
189
439
                       log_thread_pool_.get(),
190
439
                       log_thread_pool_.get(),
191
439
                       std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index
192
439
                       &log_));
193
439
    LOG(INFO) << "Sucessfully opened the log at " << tablet_wal_path_;
194
439
  }
195
196
2
  void CheckRightNumberOfSegmentFiles(int expected) {
197
    // Test that we actually have the expected number of files in the fs. We should have n segments.
198
2
    const vector<string> files =
199
2
        ASSERT_RESULT(env_->GetChildren(tablet_wal_path_, ExcludeDots::kTrue));
200
2
    int count = 0;
201
6
    for (const string& s : files) {
202
6
      if (HasPrefixString(s, FsManager::kWalFileNamePrefix)) {
203
4
        count++;
204
4
      }
205
6
    }
206
2
    ASSERT_EQ(expected, count);
207
2
  }
208
209
250
  static void CheckReplicateResult(const consensus::ReplicateMsgPtr& msg, const Status& s) {
210
250
    ASSERT_OK(s);
211
250
  }
212
213
  struct AppendReplicateBatchData {
214
    yb::OpId op_id;
215
    yb::OpId committed_op_id;
216
    std::vector<TupleForAppend> writes;
217
    AppendSync sync = AppendSync::kTrue;
218
    consensus::OperationType op_type = consensus::OperationType::WRITE_OP;
219
    TransactionId txn_id = TransactionId::Nil();
220
    TransactionStatus txn_status = TransactionStatus::IMMEDIATE_CLEANUP;
221
  };
222
223
600k
  void AppendReplicateBatch(AppendReplicateBatchData data) {
224
600k
    AppendReplicateBatch(
225
600k
        MakeOpIdPB(data.op_id),
226
600k
        MakeOpIdPB(data.committed_op_id),
227
600k
        std::move(data.writes),
228
600k
        data.sync,
229
600k
        data.op_type,
230
600k
        data.txn_id,
231
600k
        data.txn_status);
232
600k
  }
233
234
  // Appends a batch with size 2, or the given set of writes.
235
  void AppendReplicateBatch(
236
      const OpIdPB& opid,
237
      const OpIdPB& committed_opid = MakeOpId(0, 0),
238
      std::vector<TupleForAppend> writes = {},
239
      AppendSync sync = AppendSync::kTrue,
240
      consensus::OperationType op_type = consensus::OperationType::WRITE_OP,
241
      TransactionId txn_id = TransactionId::Nil(),
242
600k
      TransactionStatus txn_status = TransactionStatus::APPLYING) {
243
600k
    auto replicate = std::make_shared<ReplicateMsg>();
244
600k
    replicate->set_op_type(op_type);
245
600k
    replicate->mutable_id()->CopyFrom(opid);
246
600k
    replicate->mutable_committed_op_id()->CopyFrom(committed_opid);
247
600k
    replicate->set_hybrid_time(clock_->Now().ToUint64());
248
600k
    auto *batch_request = replicate->mutable_write();
249
250
600k
    if (op_type == consensus::OperationType::UPDATE_TRANSACTION_OP) {
251
80.5k
      ASSERT_TRUE(!txn_id.IsNil());
252
80.5k
      replicate->mutable_transaction_state()->set_status(txn_status);
253
519k
    } else if (op_type == consensus::OperationType::WRITE_OP) {
254
519k
      if (writes.empty()) {
255
519k
        const int opid_index_as_int = static_cast<int>(opid.index());
256
        // Since OpIds deal with int64 index and term, we are downcasting here. In order to be able
257
        // to test with values > INT_MAX, we need to make sure we do not overflow, while still
258
        // wanting to add 2 different values here.
259
        //
260
        // Picking x and x / 2 + 1 as the 2 values.
261
        // For small numbers, special casing x <= 2.
262
519k
        const int other_int = opid_index_as_int <= 2 ? 3 : opid_index_as_int / 2 + 1;
263
519k
        writes.emplace_back(
264
519k
            /* key */ opid_index_as_int, /* int_val */ 0, /* string_val */ "this is a test insert");
265
519k
        writes.emplace_back(
266
519k
            /* key */ other_int, /* int_val */ 0, /* string_val */ "this is a test mutate");
267
519k
      }
268
269
519k
      auto write_batch = batch_request->mutable_write_batch();
270
519k
      if (!txn_id.IsNil()) {
271
120k
        write_batch->mutable_transaction()->set_transaction_id(txn_id.data(), txn_id.size());
272
120k
      }
273
1.03M
      for (const auto &w : writes) {
274
1.03M
        AddKVToPB(std::get<0>(w), std::get<1>(w), std::get<2>(w), write_batch);
275
1.03M
      }
276
0
    } else {
277
0
      FAIL() << "Unexpected operation type: " << consensus::OperationType_Name(op_type);
278
0
    }
279
280
600k
    AppendReplicateBatch(replicate, sync);
281
600k
  }
282
283
  // Appends the provided batch to the log.
284
  void AppendReplicateBatch(const consensus::ReplicateMsgPtr& replicate,
285
600k
                            AppendSync sync = AppendSync::kTrue) {
286
600k
    const auto committed_op_id = yb::OpId::FromPB(replicate->committed_op_id());
287
600k
    const auto batch_mono_time = restart_safe_coarse_mono_clock_.Now();
288
600k
    if (sync) {
289
600k
      Synchronizer s;
290
600k
      ASSERT_OK(log_->AsyncAppendReplicates(
291
600k
          { replicate }, committed_op_id, batch_mono_time, s.AsStatusCallback()));
292
600k
      ASSERT_OK(s.Wait());
293
250
    } else {
294
      // AsyncAppendReplicates does not free the ReplicateMsg on completion, so we
295
      // need to pass it through to our callback.
296
250
      ASSERT_OK(log_->AsyncAppendReplicates(
297
250
          { replicate }, committed_op_id, batch_mono_time,
298
250
          Bind(&LogTestBase::CheckReplicateResult, replicate)));
299
250
    }
300
600k
  }
301
302
  // Appends 'count' ReplicateMsgs to the log as committed entries.
303
58
  void AppendReplicateBatchToLog(size_t count, AppendSync sync = AppendSync::kTrue) {
304
486
    for (size_t i = 0; i < count; i++) {
305
428
      OpIdPB opid = consensus::MakeOpId(1, current_index_);
306
428
      AppendReplicateBatch(opid, opid, /* writes */ {}, sync);
307
428
      current_index_ += 1;
308
428
    }
309
58
  }
310
311
  // Append a single NO_OP entry. Increments op_id by one.  If non-nullptr, and if the write is
312
  // successful, 'size' is incremented by the size of the written operation.
313
157k
  CHECKED_STATUS AppendNoOp(OpIdPB* op_id, ssize_t* size = nullptr) {
314
157k
    return AppendNoOpToLogSync(clock_, log_.get(), op_id, size);
315
157k
  }
316
317
  // Append a number of no-op entries to the log.  Increments op_id's index by the number of records
318
  // written.  If non-nullptr, 'size' keeps track of the size of the operations successfully
319
  // written.
320
569
  CHECKED_STATUS AppendNoOps(OpIdPB* op_id, int num, ssize_t* size = nullptr) {
321
157k
    for (int i = 0; i < num; i++) {
322
157k
      RETURN_NOT_OK(AppendNoOp(op_id, size));
323
157k
    }
324
569
    return Status::OK();
325
569
  }
326
327
6.13k
  CHECKED_STATUS RollLog() {
328
6.13k
    return log_->AllocateSegmentAndRollOver();
329
6.13k
  }
330
331
0
  string DumpSegmentsToString(const SegmentSequence& segments) {
332
0
    string dump;
333
0
    for (const scoped_refptr<ReadableLogSegment>& segment : segments) {
334
0
      dump.append("------------\n");
335
0
      strings::SubstituteAndAppend(&dump, "Segment: $0, Path: $1\n",
336
0
                                   segment->header().sequence_number(), segment->path());
337
0
      strings::SubstituteAndAppend(&dump, "Header: $0\n",
338
0
                                   segment->header().ShortDebugString());
339
0
      if (segment->HasFooter()) {
340
0
        strings::SubstituteAndAppend(&dump, "Footer: $0\n", segment->footer().ShortDebugString());
341
0
      } else {
342
0
        dump.append("Footer: None or corrupt.");
343
0
      }
344
0
    }
345
0
    return dump;
346
0
  }
347
348
 protected:
349
  const Schema schema_;
350
  std::unique_ptr<FsManager> fs_manager_;
351
  std::unique_ptr<MetricRegistry> metric_registry_;
352
  scoped_refptr<MetricEntity> table_metric_entity_;
353
  scoped_refptr<MetricEntity> tablet_metric_entity_;
354
  std::unique_ptr<ThreadPool> log_thread_pool_;
355
  scoped_refptr<Log> log_;
356
  int64_t current_index_;
357
  LogOptions options_;
358
  // Reusable entries vector that deletes the entries on destruction.
359
  scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
360
  scoped_refptr<Clock> clock_;
361
  string tablet_wal_path_;
362
  RestartSafeCoarseMonoClock restart_safe_coarse_mono_clock_;
363
};
364
365
// Corrupts the last segment of the provided log by either truncating it
366
// or modifying a byte at the given offset.
367
enum CorruptionType {
368
  TRUNCATE_FILE,
369
  FLIP_BYTE
370
};
371
372
Status CorruptLogFile(Env* env, const string& log_path,
373
5
                      CorruptionType type, size_t corruption_offset) {
374
5
  faststring buf;
375
5
  RETURN_NOT_OK_PREPEND(ReadFileToString(env, log_path, &buf),
376
5
                        "Couldn't read log");
377
378
5
  switch (type) {
379
2
    case TRUNCATE_FILE:
380
2
      buf.resize(corruption_offset);
381
2
      break;
382
3
    case FLIP_BYTE:
383
3
      CHECK_LT(corruption_offset, buf.size());
384
3
      buf[corruption_offset] ^= 0xff;
385
3
      break;
386
5
  }
387
388
  // Rewrite the file with the corrupt log.
389
5
  RETURN_NOT_OK_PREPEND(WriteStringToFile(env, Slice(buf), log_path),
390
5
                        "Couldn't rewrite corrupt log file");
391
392
5
  return Status::OK();
393
5
}
394
395
} // namespace log
396
} // namespace yb
397
398
#endif // YB_CONSENSUS_LOG_TEST_BASE_H