YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/log_cache-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <atomic>
34
#include <chrono>
35
#include <memory>
36
#include <string>
37
#include <thread>
38
#include <vector>
39
40
#include <gtest/gtest.h>
41
42
#include "yb/common/wire_protocol-test-util.h"
43
44
#include "yb/consensus/consensus-test-util.h"
45
#include "yb/consensus/log.h"
46
#include "yb/consensus/log_cache.h"
47
48
#include "yb/fs/fs_manager.h"
49
50
#include "yb/gutil/bind.h"
51
#include "yb/gutil/stl_util.h"
52
53
#include "yb/server/hybrid_clock.h"
54
55
#include "yb/util/mem_tracker.h"
56
#include "yb/util/metrics.h"
57
#include "yb/util/monotime.h"
58
#include "yb/util/scope_exit.h"
59
#include "yb/util/size_literals.h"
60
#include "yb/util/test_util.h"
61
62
using std::atomic;
63
using std::shared_ptr;
64
using std::thread;
65
66
DECLARE_int32(log_cache_size_limit_mb);
67
DECLARE_int32(global_log_cache_size_limit_mb);
68
DECLARE_int32(global_log_cache_size_limit_percentage);
69
70
METRIC_DECLARE_entity(tablet);
71
72
using std::atomic;
73
using std::vector;
74
using std::thread;
75
using namespace std::chrono_literals;
76
77
namespace yb {
78
namespace consensus {
79
80
static const char* kPeerUuid = "leader";
81
static const char* kTestTable = "test-table";
82
static const char* kTestTablet = "test-tablet";
83
84
constexpr int kNumMessages = 100;
85
constexpr int kMessageIndex1 = 60;
86
constexpr int kMessageIndex2 = 80;
87
88
6
std::string OpIdToString(const yb::OpId& opid) {
89
6
  return Format("$0.$1", opid.term, opid.index);
90
6
}
91
92
class LogCacheTest : public YBTest {
93
 public:
94
  LogCacheTest()
95
    : schema_(GetSimpleTestSchema()),
96
8
      metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "LogCacheTest")) {
97
8
  }
98
99
8
  void SetUp() override {
100
8
    YBTest::SetUp();
101
8
    fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test"));
102
8
    ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
103
8
    ASSERT_OK(fs_manager_->Open());
104
8
    ASSERT_OK(ThreadPoolBuilder("log").Build(&log_thread_pool_));
105
8
    ASSERT_OK(log::Log::Open(log::LogOptions(),
106
8
                            kTestTablet,
107
8
                            fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet),
108
8
                            fs_manager_->uuid(),
109
8
                            schema_,
110
8
                            0, // schema_version
111
8
                            nullptr, // table_metrics_entity
112
8
                            nullptr, // tablet_metrics_entity
113
8
                            log_thread_pool_.get(),
114
8
                            log_thread_pool_.get(),
115
8
                            std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index
116
8
                            &log_));
117
118
8
    CloseAndReopenCache(MinimumOpId());
119
8
    clock_.reset(new server::HybridClock());
120
8
    ASSERT_OK(clock_->Init());
121
8
  }
122
123
8
  void TearDown() override {
124
8
    ASSERT_OK(log_->WaitUntilAllFlushed());
125
8
  }
126
127
11
  void CloseAndReopenCache(const OpIdPB& preceding_id) {
128
    // Blow away the memtrackers before creating the new cache.
129
11
    cache_.reset();
130
131
11
    cache_.reset(new LogCache(
132
11
        metric_entity_, log_.get(), nullptr /* mem_tracker */, kPeerUuid, kTestTablet));
133
11
    cache_->Init(preceding_id);
134
11
  }
135
136
 protected:
137
173
  static void FatalOnError(const Status& s) {
138
173
    ASSERT_OK(s);
139
173
  }
140
141
24
  Status AppendReplicateMessagesToCache(int64_t first, int64_t count, size_t payload_size = 0) {
142
197
    for (int64_t cur_index = first; cur_index < first + count; cur_index++) {
143
173
      int64_t term = cur_index / kTermDivisor;
144
173
      int64_t index = cur_index;
145
173
      ReplicateMsgs msgs = { CreateDummyReplicate(term, index, clock_->Now(), payload_size) };
146
173
      RETURN_NOT_OK(cache_->AppendOperations(
147
173
          msgs, yb::OpId() /* committed_op_id */, RestartSafeCoarseMonoClock().Now(),
148
173
          Bind(&FatalOnError)));
149
173
      cache_->TrackOperationsMemory({yb::OpId::FromPB(msgs[0]->id())});
150
173
      std::this_thread::sleep_for(100ms);
151
173
    }
152
24
    return Status::OK();
153
24
  }
154
155
  const Schema schema_;
156
  MetricRegistry metric_registry_;
157
  scoped_refptr<MetricEntity> metric_entity_;
158
  std::unique_ptr<FsManager> fs_manager_;
159
  std::unique_ptr<ThreadPool> log_thread_pool_;
160
  std::unique_ptr<LogCache> cache_;
161
  scoped_refptr<log::Log> log_;
162
  scoped_refptr<server::Clock> clock_;
163
};
164
165
1
TEST_F(LogCacheTest, TestAppendAndGetMessages) {
166
1
  ASSERT_EQ(0, cache_->metrics_.num_ops->value());
167
1
  ASSERT_EQ(0, cache_->metrics_.size->value());
168
1
  ASSERT_OK(AppendReplicateMessagesToCache(1, kNumMessages));
169
1
  ASSERT_EQ(kNumMessages, cache_->metrics_.num_ops->value());
170
1
  ASSERT_GE(cache_->metrics_.size->value(), 5 * kNumMessages);
171
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
172
173
1
  auto read_result = ASSERT_RESULT(cache_->ReadOps(0, 8_MB));
174
1
  EXPECT_EQ(kNumMessages, read_result.messages.size());
175
1
  EXPECT_EQ(OpIdStrForIndex(0), OpIdToString(read_result.preceding_op));
176
177
  // Get starting in the middle of the cache.
178
1
  read_result = ASSERT_RESULT(cache_->ReadOps(kMessageIndex1, 8_MB));
179
1
  EXPECT_EQ(kNumMessages - kMessageIndex1, read_result.messages.size());
180
1
  EXPECT_EQ(OpIdStrForIndex(kMessageIndex1), OpIdToString(read_result.preceding_op));
181
1
  EXPECT_EQ(OpIdStrForIndex(kMessageIndex1 + 1), OpIdToString(read_result.messages[0]->id()));
182
183
  // Get at the end of the cache.
184
1
  read_result = ASSERT_RESULT(cache_->ReadOps(kNumMessages, 8_MB));
185
1
  EXPECT_EQ(0, read_result.messages.size());
186
1
  EXPECT_EQ(OpIdStrForIndex(kNumMessages), OpIdToString(read_result.preceding_op));
187
188
  // Get messages from the beginning until some point in the middle of the cache.
189
1
  read_result = ASSERT_RESULT(cache_->ReadOps(0, kMessageIndex1, 8_MB));
190
1
  EXPECT_EQ(kMessageIndex1, read_result.messages.size());
191
1
  EXPECT_EQ(OpIdStrForIndex(0), OpIdToString(read_result.preceding_op));
192
1
  EXPECT_EQ(OpIdStrForIndex(1), OpIdToString(read_result.messages[0]->id()));
193
194
  // Get messages from some point in the middle of the cache until another point.
195
1
  read_result = ASSERT_RESULT(cache_->ReadOps(kMessageIndex1, kMessageIndex2, 8_MB));
196
1
  EXPECT_EQ(kMessageIndex2 - kMessageIndex1, read_result.messages.size());
197
1
  EXPECT_EQ(OpIdStrForIndex(kMessageIndex1), OpIdToString(read_result.preceding_op));
198
1
  EXPECT_EQ(OpIdStrForIndex(kMessageIndex1 + 1), OpIdToString(read_result.messages[0]->id()));
199
200
  // Evict some and verify that the eviction took effect.
201
1
  cache_->EvictThroughOp(kNumMessages / 2);
202
1
  ASSERT_EQ(kNumMessages / 2, cache_->metrics_.num_ops->value());
203
204
  // Can still read data that was evicted, since it got written through.
205
1
  int start = (kNumMessages / 2) - 10;
206
1
  read_result = ASSERT_RESULT(cache_->ReadOps(start, 8_MB));
207
1
  EXPECT_EQ(kNumMessages - start, read_result.messages.size());
208
1
  EXPECT_EQ(OpIdStrForIndex(start), OpIdToString(read_result.preceding_op));
209
1
  EXPECT_EQ(OpIdStrForIndex(start + 1), OpIdToString(read_result.messages[0]->id()));
210
1
}
211
212
213
// Ensure that the cache always yields at least one message,
214
// even if that message is larger than the batch size. This ensures
215
// that we don't get "stuck" in the case that a large message enters
216
// the cache.
217
1
TEST_F(LogCacheTest, TestAlwaysYieldsAtLeastOneMessage) {
218
  // generate a 2MB dummy payload
219
1
  const int kPayloadSize = 2_MB;
220
221
  // Append several large ops to the cache
222
1
  ASSERT_OK(AppendReplicateMessagesToCache(1, 4, kPayloadSize));
223
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
224
225
  // We should get one of them, even though we only ask for 100 bytes
226
1
  auto read_result = ASSERT_RESULT(cache_->ReadOps(0, 100));
227
1
  ASSERT_EQ(1, read_result.messages.size());
228
229
  // Should yield one op also in the 'cache miss' case.
230
1
  cache_->EvictThroughOp(50);
231
1
  read_result = ASSERT_RESULT(cache_->ReadOps(0, 100));
232
1
  ASSERT_EQ(1, read_result.messages.size());
233
1
}
234
235
// Tests that the cache returns STATUS(NotFound, "") if queried for messages after an
236
// index that is higher than it's latest, returns an empty set of messages when queried for
237
// the last index and returns all messages when queried for MinimumOpId().
238
1
TEST_F(LogCacheTest, TestCacheEdgeCases) {
239
  // Append 1 message to the cache
240
1
  ASSERT_OK(AppendReplicateMessagesToCache(1, 1));
241
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
242
243
  // Test when the searched index is MinimumOpId().index().
244
1
  auto read_result = ASSERT_RESULT(cache_->ReadOps(0, 100));
245
1
  ASSERT_EQ(1, read_result.messages.size());
246
1
  ASSERT_EQ(yb::OpId(0, 0), read_result.preceding_op);
247
248
  // Test when 'after_op_index' is the last index in the cache.
249
1
  read_result = ASSERT_RESULT(cache_->ReadOps(1, 100));
250
1
  ASSERT_EQ(0, read_result.messages.size());
251
1
  ASSERT_EQ(yb::OpId(0, 1), read_result.preceding_op);
252
253
  // Now test the case when 'after_op_index' is after the last index
254
  // in the cache.
255
1
  auto failed_result = cache_->ReadOps(2, 100);
256
1
  ASSERT_FALSE(failed_result.ok());
257
2
  ASSERT_TRUE(failed_result.status().IsIncomplete())
258
2
      << "unexpected status: " << failed_result.status();
259
260
  // Evict entries from the cache, and ensure that we can still read
261
  // entries at the beginning of the log.
262
1
  cache_->EvictThroughOp(50);
263
1
  read_result = ASSERT_RESULT(cache_->ReadOps(0, 100));
264
1
  ASSERT_EQ(1, read_result.messages.size());
265
1
  ASSERT_EQ(yb::OpId(0, 0), read_result.preceding_op);
266
1
}
267
268
269
1
TEST_F(LogCacheTest, TestMemoryLimit) {
270
1
  FLAGS_log_cache_size_limit_mb = 1;
271
1
  CloseAndReopenCache(MinimumOpId());
272
273
1
  const int kPayloadSize = 400_KB;
274
  // Limit should not be violated.
275
1
  ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize));
276
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
277
1
  ASSERT_EQ(1, cache_->num_cached_ops());
278
279
  // Verify the size is right. It's not exactly kPayloadSize because of in-memory
280
  // overhead, etc.
281
1
  auto size_with_one_msg = cache_->BytesUsed();
282
1
  ASSERT_GT(size_with_one_msg, 300_KB);
283
1
  ASSERT_LT(size_with_one_msg, 500_KB);
284
285
  // Add another operation which fits under the 1MB limit.
286
1
  ASSERT_OK(AppendReplicateMessagesToCache(2, 1, kPayloadSize));
287
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
288
1
  ASSERT_EQ(2, cache_->num_cached_ops());
289
290
1
  auto size_with_two_msgs = cache_->BytesUsed();
291
1
  ASSERT_GT(size_with_two_msgs, 2 * 300_KB);
292
1
  ASSERT_LT(size_with_two_msgs, 2 * 500_KB);
293
294
  // Append a third operation, which will push the cache size above the 1MB limit
295
  // and cause eviction of the first operation.
296
1
  LOG(INFO) << "appending op 3";
297
  // Verify that we have trimmed by appending a message that would
298
  // otherwise be rejected, since the cache max size limit is 2MB.
299
1
  ASSERT_OK(AppendReplicateMessagesToCache(3, 1, kPayloadSize));
300
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
301
1
  ASSERT_EQ(2, cache_->num_cached_ops());
302
1
  ASSERT_EQ(size_with_two_msgs, cache_->BytesUsed());
303
304
  // Test explicitly evicting one of the ops.
305
1
  cache_->EvictThroughOp(2);
306
1
  ASSERT_EQ(1, cache_->num_cached_ops());
307
1
  ASSERT_EQ(size_with_one_msg, cache_->BytesUsed());
308
309
  // Explicitly evict the last op.
310
1
  cache_->EvictThroughOp(3);
311
1
  ASSERT_EQ(0, cache_->num_cached_ops());
312
1
  ASSERT_EQ(cache_->BytesUsed(), 0);
313
1
}
314
315
1
TEST_F(LogCacheTest, TestGlobalMemoryLimitMB) {
316
1
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_global_log_cache_size_limit_mb) = 4;
317
1
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_global_log_cache_size_limit_percentage) = 100;
318
1
  CloseAndReopenCache(MinimumOpId());
319
320
  // Consume all but 1 MB of cache space.
321
1
  ScopedTrackedConsumption consumption(cache_->parent_tracker_, 3_MB);
322
323
1
  const int kPayloadSize = 768_KB;
324
325
  // Should succeed, but only end up caching one of the two ops because of the global limit.
326
1
  ASSERT_OK(AppendReplicateMessagesToCache(1, 2, kPayloadSize));
327
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
328
329
1
  ASSERT_EQ(1, cache_->num_cached_ops());
330
1
  ASSERT_LE(cache_->BytesUsed(), 1_MB);
331
1
}
332
333
1
TEST_F(LogCacheTest, TestGlobalMemoryLimitPercentage) {
334
1
  FLAGS_global_log_cache_size_limit_mb = INT32_MAX;
335
1
  FLAGS_global_log_cache_size_limit_percentage = 5;
336
1
  const int64_t root_mem_limit = MemTracker::GetRootTracker()->limit();
337
338
1
  CloseAndReopenCache(MinimumOpId());
339
340
  // Consume all but 1 MB of cache space.
341
1
  ScopedTrackedConsumption consumption(cache_->parent_tracker_, root_mem_limit * 0.05 - 1_MB);
342
343
1
  const int kPayloadSize = 768_KB;
344
345
  // Should succeed, but only end up caching one of the two ops because of the global limit.
346
1
  ASSERT_OK(AppendReplicateMessagesToCache(1, 2, kPayloadSize));
347
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
348
349
1
  ASSERT_EQ(1, cache_->num_cached_ops());
350
1
  ASSERT_LE(cache_->BytesUsed(), 1_MB);
351
1
}
352
353
// Test that the log cache properly replaces messages when an index
354
// is reused. This is a regression test for a bug where the memtracker's
355
// consumption wasn't properly managed when messages were replaced.
356
1
TEST_F(LogCacheTest, TestReplaceMessages) {
357
1
  const int kPayloadSize = 128_KB;
358
1
  shared_ptr<MemTracker> tracker = cache_->tracker_;;
359
1
  ASSERT_EQ(0, tracker->consumption());
360
361
1
  ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize));
362
1
  auto size_with_one_msg = tracker->consumption();
363
364
11
  for (int i = 0; i < 10; i++) {
365
10
    ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize));
366
10
  }
367
368
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
369
370
1
  EXPECT_EQ(size_with_one_msg, tracker->consumption());
371
1
  EXPECT_EQ(Substitute("Pinned index: 2, LogCacheStats(num_ops=1, bytes=$0, disk_reads=0)",
372
1
                       size_with_one_msg),
373
1
            cache_->ToString());
374
1
}
375
376
1
TEST_F(LogCacheTest, TestMTReadAndWrite) {
377
1
  atomic<bool> stop { false };
378
1
  bool stopped = false;
379
1
  atomic<int64_t> num_appended{0};
380
1
  atomic<int64_t> next_index{0};
381
1
  vector<thread> threads;
382
383
2
  auto stop_workload = [&]() {
384
2
    if (!stopped) {
385
1
      LOG(INFO) << "Stopping workload";
386
1
      stop = true;
387
2
      for (auto& t : threads) {
388
2
        t.join();
389
2
      }
390
1
      stopped = true;
391
1
      LOG(INFO) << "Workload stopped";
392
1
    }
393
2
  };
394
395
1
  auto se = ScopeExit(stop_workload);
396
397
  // Add a writer thread.
398
1
  threads.emplace_back([&] {
399
1
    const int kBatch = 10;
400
1
    int64_t index = 1;
401
6
    while (!stop) {
402
5
      auto append_status = AppendReplicateMessagesToCache(index, kBatch);
403
5
      if (append_status.IsServiceUnavailable()) {
404
0
        std::this_thread::sleep_for(10ms);
405
0
        continue;
406
0
      }
407
5
      index += kBatch;
408
5
      next_index = index;
409
5
      num_appended++;
410
5
    }
411
1
  });
412
413
  // Add a reader thread.
414
1
  threads.emplace_back([&] {
415
1
    int64_t index = 0;
416
784
    while (!stop) {
417
783
      if (index >= next_index) {
418
        // We've gone ahead of the writer.
419
779
        std::this_thread::sleep_for(5ms);
420
779
        continue;
421
779
      }
422
4
      auto read_result = ASSERT_RESULT(cache_->ReadOps(index, 1_MB));
423
4
      index += read_result.messages.size();
424
4
    }
425
1
  });
426
427
1
  LOG(INFO) << "Starting the workload";
428
1
  std::this_thread::sleep_for(5s);
429
1
  stop_workload();
430
1
  ASSERT_GT(num_appended, 0);
431
1
}
432
433
} // namespace consensus
434
} // namespace yb