YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/tools/db_repl_stress.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
#ifndef GFLAGS
23
#include <cstdio>
24
int main() {
25
  fprintf(stderr, "Please install gflags to run rocksdb tools\n");
26
  return 1;
27
}
28
#else
29
30
#include <cstdio>
31
#include <atomic>
32
33
#include <gflags/gflags.h>
34
35
#include "yb/rocksdb/db/write_batch_internal.h"
36
#include "yb/rocksdb/db.h"
37
#include "yb/rocksdb/types.h"
38
#include "yb/rocksdb/util/testutil.h"
39
40
#include "yb/util/status_log.h"
41
42
// Run a thread to perform Put's.
43
// Another thread uses GetUpdatesSince API to keep getting the updates.
44
// options :
45
// --num_inserts = the num of inserts the first thread should perform.
46
// --wal_ttl = the wal ttl for the run.
47
48
using GFLAGS::ParseCommandLineFlags;
49
using GFLAGS::SetUsageMessage;
50
51
namespace rocksdb {
52
53
struct DataPumpThread {
54
  size_t no_records;
55
  DB* db; // Assumption DB is Open'ed already.
56
};
57
58
0
static void DataPumpThreadBody(void* arg) {
59
0
  DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
60
0
  DB* db = t->db;
61
0
  Random rnd(301);
62
0
  size_t i = 0;
63
0
  while (i++ < t->no_records) {
64
0
    if (!db->Put(
65
0
        WriteOptions(), Slice(RandomString(&rnd, 500)),
66
0
        Slice(RandomString(&rnd, 500))).ok()) {
67
0
      fprintf(stderr, "Error in put\n");
68
0
      exit(1);
69
0
    }
70
0
  }
71
0
}
72
73
struct ReplicationThread {
74
  std::atomic<bool> stop;
75
  DB* db;
76
  volatile size_t no_read;
77
};
78
79
0
static void ReplicationThreadBody(void* arg) {
80
0
  ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
81
0
  DB* db = t->db;
82
0
  unique_ptr<TransactionLogIterator> iter;
83
0
  SequenceNumber currentSeqNum = 1;
84
0
  while (!t->stop.load(std::memory_order_acquire)) {
85
0
    iter.reset();
86
0
    Status s;
87
0
    while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
88
0
      if (t->stop.load(std::memory_order_acquire)) {
89
0
        return;
90
0
      }
91
0
    }
92
0
    fprintf(stderr, "Refreshing iterator\n");
93
0
    for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
94
0
      BatchResult res = iter->GetBatch();
95
0
      if (res.sequence != currentSeqNum) {
96
0
        fprintf(
97
0
            stderr,
98
0
            "Missed a seq no. b/w %" PRISN " and %" PRISN "\n",
99
0
            currentSeqNum,
100
0
            res.sequence);
101
0
        exit(1);
102
0
      }
103
0
    }
104
0
  }
105
0
}
106
107
DEFINE_uint64(num_inserts, 1000, "the num of inserts the first thread should"
108
    " perform.");
109
DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
110
DEFINE_uint64(wal_size_limit_MB, 10, "the wal size limit for the run"
111
    "(in MB)");
112
113
0
int db_repl_stress(int argc, const char** argv) {
114
0
  SetUsageMessage(
115
0
      std::string("\nUSAGE:\n") + std::string(argv[0]) +
116
0
          " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" +
117
0
          " --wal_size_limit_MB=<WAL_size_limit_MB>");
118
0
  ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true);
119
120
0
  Env* env = Env::Default();
121
0
  std::string default_db_path;
122
0
  CHECK_OK(env->GetTestDirectory(&default_db_path));
123
0
  default_db_path += "db_repl_stress";
124
0
  Options options;
125
0
  options.create_if_missing = true;
126
0
  options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
127
0
  options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
128
0
  DB* db;
129
0
  CHECK_OK(DestroyDB(default_db_path, options));
130
131
0
  Status s = DB::Open(options, default_db_path, &db);
132
133
0
  if (!s.ok()) {
134
0
    fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
135
0
    exit(1);
136
0
  }
137
138
0
  DataPumpThread dataPump;
139
0
  dataPump.no_records = FLAGS_num_inserts;
140
0
  dataPump.db = db;
141
0
  env->StartThread(DataPumpThreadBody, &dataPump);
142
143
0
  ReplicationThread replThread;
144
0
  replThread.db = db;
145
0
  replThread.no_read = 0;
146
0
  replThread.stop.store(false, std::memory_order_release);
147
148
0
  env->StartThread(ReplicationThreadBody, &replThread);
149
0
  while (replThread.no_read < FLAGS_num_inserts) {}
150
0
  replThread.stop.store(true, std::memory_order_release);
151
0
  if (replThread.no_read < dataPump.no_records) {
152
    // no. read should be => than inserted.
153
0
    fprintf(
154
0
        stderr,
155
0
        "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt
156
0
            " Written : %" ROCKSDB_PRIszt "\n",
157
0
        replThread.no_read, dataPump.no_records);
158
0
    exit(1);
159
0
  }
160
0
  fprintf(stderr, "Successful!\n");
161
0
  exit(0);
162
0
}
163
164
} // namespace rocksdb
165
166
int main(int argc, const char** argv) { rocksdb::db_repl_stress(argc, argv); }
167
168
#endif  // GFLAGS
169
170
#else  // ROCKSDB_LITE
171
#include <stdio.h>
172
int main(int argc, char** argv) {
173
  fprintf(stderr, "Not supported in lite mode.\n");
174
  return 1;
175
}
176
#endif  // ROCKSDB_LITE