/Users/deen/code/yugabyte-db/src/yb/consensus/log_cache-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <atomic> |
34 | | #include <chrono> |
35 | | #include <memory> |
36 | | #include <string> |
37 | | #include <thread> |
38 | | #include <vector> |
39 | | |
40 | | #include <gtest/gtest.h> |
41 | | |
42 | | #include "yb/common/wire_protocol-test-util.h" |
43 | | |
44 | | #include "yb/consensus/consensus-test-util.h" |
45 | | #include "yb/consensus/log.h" |
46 | | #include "yb/consensus/log_cache.h" |
47 | | |
48 | | #include "yb/fs/fs_manager.h" |
49 | | |
50 | | #include "yb/gutil/bind.h" |
51 | | #include "yb/gutil/stl_util.h" |
52 | | |
53 | | #include "yb/server/hybrid_clock.h" |
54 | | |
55 | | #include "yb/util/mem_tracker.h" |
56 | | #include "yb/util/metrics.h" |
57 | | #include "yb/util/monotime.h" |
58 | | #include "yb/util/scope_exit.h" |
59 | | #include "yb/util/size_literals.h" |
60 | | #include "yb/util/test_util.h" |
61 | | |
62 | | using std::atomic; |
63 | | using std::shared_ptr; |
64 | | using std::thread; |
65 | | |
66 | | DECLARE_int32(log_cache_size_limit_mb); |
67 | | DECLARE_int32(global_log_cache_size_limit_mb); |
68 | | DECLARE_int32(global_log_cache_size_limit_percentage); |
69 | | |
70 | | METRIC_DECLARE_entity(tablet); |
71 | | |
72 | | using std::atomic; |
73 | | using std::vector; |
74 | | using std::thread; |
75 | | using namespace std::chrono_literals; |
76 | | |
77 | | namespace yb { |
78 | | namespace consensus { |
79 | | |
80 | | static const char* kPeerUuid = "leader"; |
81 | | static const char* kTestTable = "test-table"; |
82 | | static const char* kTestTablet = "test-tablet"; |
83 | | |
84 | | constexpr int kNumMessages = 100; |
85 | | constexpr int kMessageIndex1 = 60; |
86 | | constexpr int kMessageIndex2 = 80; |
87 | | |
88 | 6 | std::string OpIdToString(const yb::OpId& opid) { |
89 | 6 | return Format("$0.$1", opid.term, opid.index); |
90 | 6 | } |
91 | | |
92 | | class LogCacheTest : public YBTest { |
93 | | public: |
94 | | LogCacheTest() |
95 | | : schema_(GetSimpleTestSchema()), |
96 | 8 | metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "LogCacheTest")) { |
97 | 8 | } |
98 | | |
99 | 8 | void SetUp() override { |
100 | 8 | YBTest::SetUp(); |
101 | 8 | fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test")); |
102 | 8 | ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout()); |
103 | 8 | ASSERT_OK(fs_manager_->Open()); |
104 | 8 | ASSERT_OK(ThreadPoolBuilder("log").Build(&log_thread_pool_)); |
105 | 8 | ASSERT_OK(log::Log::Open(log::LogOptions(), |
106 | 8 | kTestTablet, |
107 | 8 | fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet), |
108 | 8 | fs_manager_->uuid(), |
109 | 8 | schema_, |
110 | 8 | 0, // schema_version |
111 | 8 | nullptr, // table_metrics_entity |
112 | 8 | nullptr, // tablet_metrics_entity |
113 | 8 | log_thread_pool_.get(), |
114 | 8 | log_thread_pool_.get(), |
115 | 8 | std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index |
116 | 8 | &log_)); |
117 | | |
118 | 8 | CloseAndReopenCache(MinimumOpId()); |
119 | 8 | clock_.reset(new server::HybridClock()); |
120 | 8 | ASSERT_OK(clock_->Init()); |
121 | 8 | } |
122 | | |
123 | 8 | void TearDown() override { |
124 | 8 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
125 | 8 | } |
126 | | |
127 | 11 | void CloseAndReopenCache(const OpIdPB& preceding_id) { |
128 | | // Blow away the memtrackers before creating the new cache. |
129 | 11 | cache_.reset(); |
130 | | |
131 | 11 | cache_.reset(new LogCache( |
132 | 11 | metric_entity_, log_.get(), nullptr /* mem_tracker */, kPeerUuid, kTestTablet)); |
133 | 11 | cache_->Init(preceding_id); |
134 | 11 | } |
135 | | |
136 | | protected: |
137 | 173 | static void FatalOnError(const Status& s) { |
138 | 173 | ASSERT_OK(s); |
139 | 173 | } |
140 | | |
141 | 24 | Status AppendReplicateMessagesToCache(int64_t first, int64_t count, size_t payload_size = 0) { |
142 | 197 | for (int64_t cur_index = first; cur_index < first + count; cur_index++) { |
143 | 173 | int64_t term = cur_index / kTermDivisor; |
144 | 173 | int64_t index = cur_index; |
145 | 173 | ReplicateMsgs msgs = { CreateDummyReplicate(term, index, clock_->Now(), payload_size) }; |
146 | 173 | RETURN_NOT_OK(cache_->AppendOperations( |
147 | 173 | msgs, yb::OpId() /* committed_op_id */, RestartSafeCoarseMonoClock().Now(), |
148 | 173 | Bind(&FatalOnError))); |
149 | 173 | cache_->TrackOperationsMemory({yb::OpId::FromPB(msgs[0]->id())}); |
150 | 173 | std::this_thread::sleep_for(100ms); |
151 | 173 | } |
152 | 24 | return Status::OK(); |
153 | 24 | } |
154 | | |
155 | | const Schema schema_; |
156 | | MetricRegistry metric_registry_; |
157 | | scoped_refptr<MetricEntity> metric_entity_; |
158 | | std::unique_ptr<FsManager> fs_manager_; |
159 | | std::unique_ptr<ThreadPool> log_thread_pool_; |
160 | | std::unique_ptr<LogCache> cache_; |
161 | | scoped_refptr<log::Log> log_; |
162 | | scoped_refptr<server::Clock> clock_; |
163 | | }; |
164 | | |
165 | 1 | TEST_F(LogCacheTest, TestAppendAndGetMessages) { |
166 | 1 | ASSERT_EQ(0, cache_->metrics_.num_ops->value()); |
167 | 1 | ASSERT_EQ(0, cache_->metrics_.size->value()); |
168 | 1 | ASSERT_OK(AppendReplicateMessagesToCache(1, kNumMessages)); |
169 | 1 | ASSERT_EQ(kNumMessages, cache_->metrics_.num_ops->value()); |
170 | 1 | ASSERT_GE(cache_->metrics_.size->value(), 5 * kNumMessages); |
171 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
172 | | |
173 | 1 | auto read_result = ASSERT_RESULT(cache_->ReadOps(0, 8_MB)); |
174 | 1 | EXPECT_EQ(kNumMessages, read_result.messages.size()); |
175 | 1 | EXPECT_EQ(OpIdStrForIndex(0), OpIdToString(read_result.preceding_op)); |
176 | | |
177 | | // Get starting in the middle of the cache. |
178 | 1 | read_result = ASSERT_RESULT(cache_->ReadOps(kMessageIndex1, 8_MB)); |
179 | 1 | EXPECT_EQ(kNumMessages - kMessageIndex1, read_result.messages.size()); |
180 | 1 | EXPECT_EQ(OpIdStrForIndex(kMessageIndex1), OpIdToString(read_result.preceding_op)); |
181 | 1 | EXPECT_EQ(OpIdStrForIndex(kMessageIndex1 + 1), OpIdToString(read_result.messages[0]->id())); |
182 | | |
183 | | // Get at the end of the cache. |
184 | 1 | read_result = ASSERT_RESULT(cache_->ReadOps(kNumMessages, 8_MB)); |
185 | 1 | EXPECT_EQ(0, read_result.messages.size()); |
186 | 1 | EXPECT_EQ(OpIdStrForIndex(kNumMessages), OpIdToString(read_result.preceding_op)); |
187 | | |
188 | | // Get messages from the beginning until some point in the middle of the cache. |
189 | 1 | read_result = ASSERT_RESULT(cache_->ReadOps(0, kMessageIndex1, 8_MB)); |
190 | 1 | EXPECT_EQ(kMessageIndex1, read_result.messages.size()); |
191 | 1 | EXPECT_EQ(OpIdStrForIndex(0), OpIdToString(read_result.preceding_op)); |
192 | 1 | EXPECT_EQ(OpIdStrForIndex(1), OpIdToString(read_result.messages[0]->id())); |
193 | | |
194 | | // Get messages from some point in the middle of the cache until another point. |
195 | 1 | read_result = ASSERT_RESULT(cache_->ReadOps(kMessageIndex1, kMessageIndex2, 8_MB)); |
196 | 1 | EXPECT_EQ(kMessageIndex2 - kMessageIndex1, read_result.messages.size()); |
197 | 1 | EXPECT_EQ(OpIdStrForIndex(kMessageIndex1), OpIdToString(read_result.preceding_op)); |
198 | 1 | EXPECT_EQ(OpIdStrForIndex(kMessageIndex1 + 1), OpIdToString(read_result.messages[0]->id())); |
199 | | |
200 | | // Evict some and verify that the eviction took effect. |
201 | 1 | cache_->EvictThroughOp(kNumMessages / 2); |
202 | 1 | ASSERT_EQ(kNumMessages / 2, cache_->metrics_.num_ops->value()); |
203 | | |
204 | | // Can still read data that was evicted, since it got written through. |
205 | 1 | int start = (kNumMessages / 2) - 10; |
206 | 1 | read_result = ASSERT_RESULT(cache_->ReadOps(start, 8_MB)); |
207 | 1 | EXPECT_EQ(kNumMessages - start, read_result.messages.size()); |
208 | 1 | EXPECT_EQ(OpIdStrForIndex(start), OpIdToString(read_result.preceding_op)); |
209 | 1 | EXPECT_EQ(OpIdStrForIndex(start + 1), OpIdToString(read_result.messages[0]->id())); |
210 | 1 | } |
211 | | |
212 | | |
213 | | // Ensure that the cache always yields at least one message, |
214 | | // even if that message is larger than the batch size. This ensures |
215 | | // that we don't get "stuck" in the case that a large message enters |
216 | | // the cache. |
217 | 1 | TEST_F(LogCacheTest, TestAlwaysYieldsAtLeastOneMessage) { |
218 | | // generate a 2MB dummy payload |
219 | 1 | const int kPayloadSize = 2_MB; |
220 | | |
221 | | // Append several large ops to the cache |
222 | 1 | ASSERT_OK(AppendReplicateMessagesToCache(1, 4, kPayloadSize)); |
223 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
224 | | |
225 | | // We should get one of them, even though we only ask for 100 bytes |
226 | 1 | auto read_result = ASSERT_RESULT(cache_->ReadOps(0, 100)); |
227 | 1 | ASSERT_EQ(1, read_result.messages.size()); |
228 | | |
229 | | // Should yield one op also in the 'cache miss' case. |
230 | 1 | cache_->EvictThroughOp(50); |
231 | 1 | read_result = ASSERT_RESULT(cache_->ReadOps(0, 100)); |
232 | 1 | ASSERT_EQ(1, read_result.messages.size()); |
233 | 1 | } |
234 | | |
235 | | // Tests that the cache returns STATUS(NotFound, "") if queried for messages after an |
236 | | // index that is higher than it's latest, returns an empty set of messages when queried for |
237 | | // the last index and returns all messages when queried for MinimumOpId(). |
238 | 1 | TEST_F(LogCacheTest, TestCacheEdgeCases) { |
239 | | // Append 1 message to the cache |
240 | 1 | ASSERT_OK(AppendReplicateMessagesToCache(1, 1)); |
241 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
242 | | |
243 | | // Test when the searched index is MinimumOpId().index(). |
244 | 1 | auto read_result = ASSERT_RESULT(cache_->ReadOps(0, 100)); |
245 | 1 | ASSERT_EQ(1, read_result.messages.size()); |
246 | 1 | ASSERT_EQ(yb::OpId(0, 0), read_result.preceding_op); |
247 | | |
248 | | // Test when 'after_op_index' is the last index in the cache. |
249 | 1 | read_result = ASSERT_RESULT(cache_->ReadOps(1, 100)); |
250 | 1 | ASSERT_EQ(0, read_result.messages.size()); |
251 | 1 | ASSERT_EQ(yb::OpId(0, 1), read_result.preceding_op); |
252 | | |
253 | | // Now test the case when 'after_op_index' is after the last index |
254 | | // in the cache. |
255 | 1 | auto failed_result = cache_->ReadOps(2, 100); |
256 | 1 | ASSERT_FALSE(failed_result.ok()); |
257 | 2 | ASSERT_TRUE(failed_result.status().IsIncomplete()) |
258 | 2 | << "unexpected status: " << failed_result.status(); |
259 | | |
260 | | // Evict entries from the cache, and ensure that we can still read |
261 | | // entries at the beginning of the log. |
262 | 1 | cache_->EvictThroughOp(50); |
263 | 1 | read_result = ASSERT_RESULT(cache_->ReadOps(0, 100)); |
264 | 1 | ASSERT_EQ(1, read_result.messages.size()); |
265 | 1 | ASSERT_EQ(yb::OpId(0, 0), read_result.preceding_op); |
266 | 1 | } |
267 | | |
268 | | |
269 | 1 | TEST_F(LogCacheTest, TestMemoryLimit) { |
270 | 1 | FLAGS_log_cache_size_limit_mb = 1; |
271 | 1 | CloseAndReopenCache(MinimumOpId()); |
272 | | |
273 | 1 | const int kPayloadSize = 400_KB; |
274 | | // Limit should not be violated. |
275 | 1 | ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize)); |
276 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
277 | 1 | ASSERT_EQ(1, cache_->num_cached_ops()); |
278 | | |
279 | | // Verify the size is right. It's not exactly kPayloadSize because of in-memory |
280 | | // overhead, etc. |
281 | 1 | auto size_with_one_msg = cache_->BytesUsed(); |
282 | 1 | ASSERT_GT(size_with_one_msg, 300_KB); |
283 | 1 | ASSERT_LT(size_with_one_msg, 500_KB); |
284 | | |
285 | | // Add another operation which fits under the 1MB limit. |
286 | 1 | ASSERT_OK(AppendReplicateMessagesToCache(2, 1, kPayloadSize)); |
287 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
288 | 1 | ASSERT_EQ(2, cache_->num_cached_ops()); |
289 | | |
290 | 1 | auto size_with_two_msgs = cache_->BytesUsed(); |
291 | 1 | ASSERT_GT(size_with_two_msgs, 2 * 300_KB); |
292 | 1 | ASSERT_LT(size_with_two_msgs, 2 * 500_KB); |
293 | | |
294 | | // Append a third operation, which will push the cache size above the 1MB limit |
295 | | // and cause eviction of the first operation. |
296 | 1 | LOG(INFO) << "appending op 3"; |
297 | | // Verify that we have trimmed by appending a message that would |
298 | | // otherwise be rejected, since the cache max size limit is 2MB. |
299 | 1 | ASSERT_OK(AppendReplicateMessagesToCache(3, 1, kPayloadSize)); |
300 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
301 | 1 | ASSERT_EQ(2, cache_->num_cached_ops()); |
302 | 1 | ASSERT_EQ(size_with_two_msgs, cache_->BytesUsed()); |
303 | | |
304 | | // Test explicitly evicting one of the ops. |
305 | 1 | cache_->EvictThroughOp(2); |
306 | 1 | ASSERT_EQ(1, cache_->num_cached_ops()); |
307 | 1 | ASSERT_EQ(size_with_one_msg, cache_->BytesUsed()); |
308 | | |
309 | | // Explicitly evict the last op. |
310 | 1 | cache_->EvictThroughOp(3); |
311 | 1 | ASSERT_EQ(0, cache_->num_cached_ops()); |
312 | 1 | ASSERT_EQ(cache_->BytesUsed(), 0); |
313 | 1 | } |
314 | | |
315 | 1 | TEST_F(LogCacheTest, TestGlobalMemoryLimitMB) { |
316 | 1 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_global_log_cache_size_limit_mb) = 4; |
317 | 1 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_global_log_cache_size_limit_percentage) = 100; |
318 | 1 | CloseAndReopenCache(MinimumOpId()); |
319 | | |
320 | | // Consume all but 1 MB of cache space. |
321 | 1 | ScopedTrackedConsumption consumption(cache_->parent_tracker_, 3_MB); |
322 | | |
323 | 1 | const int kPayloadSize = 768_KB; |
324 | | |
325 | | // Should succeed, but only end up caching one of the two ops because of the global limit. |
326 | 1 | ASSERT_OK(AppendReplicateMessagesToCache(1, 2, kPayloadSize)); |
327 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
328 | | |
329 | 1 | ASSERT_EQ(1, cache_->num_cached_ops()); |
330 | 1 | ASSERT_LE(cache_->BytesUsed(), 1_MB); |
331 | 1 | } |
332 | | |
333 | 1 | TEST_F(LogCacheTest, TestGlobalMemoryLimitPercentage) { |
334 | 1 | FLAGS_global_log_cache_size_limit_mb = INT32_MAX; |
335 | 1 | FLAGS_global_log_cache_size_limit_percentage = 5; |
336 | 1 | const int64_t root_mem_limit = MemTracker::GetRootTracker()->limit(); |
337 | | |
338 | 1 | CloseAndReopenCache(MinimumOpId()); |
339 | | |
340 | | // Consume all but 1 MB of cache space. |
341 | 1 | ScopedTrackedConsumption consumption(cache_->parent_tracker_, root_mem_limit * 0.05 - 1_MB); |
342 | | |
343 | 1 | const int kPayloadSize = 768_KB; |
344 | | |
345 | | // Should succeed, but only end up caching one of the two ops because of the global limit. |
346 | 1 | ASSERT_OK(AppendReplicateMessagesToCache(1, 2, kPayloadSize)); |
347 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
348 | | |
349 | 1 | ASSERT_EQ(1, cache_->num_cached_ops()); |
350 | 1 | ASSERT_LE(cache_->BytesUsed(), 1_MB); |
351 | 1 | } |
352 | | |
353 | | // Test that the log cache properly replaces messages when an index |
354 | | // is reused. This is a regression test for a bug where the memtracker's |
355 | | // consumption wasn't properly managed when messages were replaced. |
356 | 1 | TEST_F(LogCacheTest, TestReplaceMessages) { |
357 | 1 | const int kPayloadSize = 128_KB; |
358 | 1 | shared_ptr<MemTracker> tracker = cache_->tracker_;; |
359 | 1 | ASSERT_EQ(0, tracker->consumption()); |
360 | | |
361 | 1 | ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize)); |
362 | 1 | auto size_with_one_msg = tracker->consumption(); |
363 | | |
364 | 11 | for (int i = 0; i < 10; i++) { |
365 | 10 | ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize)); |
366 | 10 | } |
367 | | |
368 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
369 | | |
370 | 1 | EXPECT_EQ(size_with_one_msg, tracker->consumption()); |
371 | 1 | EXPECT_EQ(Substitute("Pinned index: 2, LogCacheStats(num_ops=1, bytes=$0, disk_reads=0)", |
372 | 1 | size_with_one_msg), |
373 | 1 | cache_->ToString()); |
374 | 1 | } |
375 | | |
376 | 1 | TEST_F(LogCacheTest, TestMTReadAndWrite) { |
377 | 1 | atomic<bool> stop { false }; |
378 | 1 | bool stopped = false; |
379 | 1 | atomic<int64_t> num_appended{0}; |
380 | 1 | atomic<int64_t> next_index{0}; |
381 | 1 | vector<thread> threads; |
382 | | |
383 | 2 | auto stop_workload = [&]() { |
384 | 2 | if (!stopped) { |
385 | 1 | LOG(INFO) << "Stopping workload"; |
386 | 1 | stop = true; |
387 | 2 | for (auto& t : threads) { |
388 | 2 | t.join(); |
389 | 2 | } |
390 | 1 | stopped = true; |
391 | 1 | LOG(INFO) << "Workload stopped"; |
392 | 1 | } |
393 | 2 | }; |
394 | | |
395 | 1 | auto se = ScopeExit(stop_workload); |
396 | | |
397 | | // Add a writer thread. |
398 | 1 | threads.emplace_back([&] { |
399 | 1 | const int kBatch = 10; |
400 | 1 | int64_t index = 1; |
401 | 6 | while (!stop) { |
402 | 5 | auto append_status = AppendReplicateMessagesToCache(index, kBatch); |
403 | 5 | if (append_status.IsServiceUnavailable()) { |
404 | 0 | std::this_thread::sleep_for(10ms); |
405 | 0 | continue; |
406 | 0 | } |
407 | 5 | index += kBatch; |
408 | 5 | next_index = index; |
409 | 5 | num_appended++; |
410 | 5 | } |
411 | 1 | }); |
412 | | |
413 | | // Add a reader thread. |
414 | 1 | threads.emplace_back([&] { |
415 | 1 | int64_t index = 0; |
416 | 784 | while (!stop) { |
417 | 783 | if (index >= next_index) { |
418 | | // We've gone ahead of the writer. |
419 | 779 | std::this_thread::sleep_for(5ms); |
420 | 779 | continue; |
421 | 779 | } |
422 | 4 | auto read_result = ASSERT_RESULT(cache_->ReadOps(index, 1_MB)); |
423 | 4 | index += read_result.messages.size(); |
424 | 4 | } |
425 | 1 | }); |
426 | | |
427 | 1 | LOG(INFO) << "Starting the workload"; |
428 | 1 | std::this_thread::sleep_for(5s); |
429 | 1 | stop_workload(); |
430 | 1 | ASSERT_GT(num_appended, 0); |
431 | 1 | } |
432 | | |
433 | | } // namespace consensus |
434 | | } // namespace yb |