YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_queue-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <gtest/gtest.h>
34
35
#include "yb/common/schema.h"
36
#include "yb/common/wire_protocol-test-util.h"
37
38
#include "yb/consensus/consensus-test-util.h"
39
#include "yb/consensus/consensus.pb.h"
40
#include "yb/consensus/consensus_queue.h"
41
#include "yb/consensus/log-test-base.h"
42
#include "yb/consensus/log_anchor_registry.h"
43
#include "yb/consensus/log_reader.h"
44
#include "yb/consensus/log_util.h"
45
#include "yb/consensus/replicate_msgs_holder.h"
46
47
#include "yb/fs/fs_manager.h"
48
49
#include "yb/server/hybrid_clock.h"
50
51
#include "yb/util/metrics.h"
52
#include "yb/util/test_macros.h"
53
#include "yb/util/test_util.h"
54
#include "yb/util/threadpool.h"
55
56
DECLARE_bool(enable_data_block_fsync);
57
DECLARE_uint64(consensus_max_batch_size_bytes);
58
59
METRIC_DECLARE_entity(tablet);
60
61
namespace yb {
62
namespace consensus {
63
64
static const char* kLeaderUuid = "peer-0";
65
static const char* kPeerUuid = "peer-1";
66
static const char* kTestTable = "test-table";
67
static const char* kTestTablet = "test-tablet";
68
69
constexpr int kNumMessages = 100;
70
71
class ConsensusQueueTest : public YBTest {
72
 public:
73
  ConsensusQueueTest()
74
      : schema_(GetSimpleTestSchema()),
75
        metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "queue-test")),
76
10
        registry_(new log::LogAnchorRegistry) {
77
10
    FLAGS_enable_data_block_fsync = false; // Keep unit tests fast.
78
10
  }
79
80
10
  void SetUp() override {
81
10
    YBTest::SetUp();
82
10
    fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test"));
83
10
    ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
84
10
    ASSERT_OK(fs_manager_->Open());
85
10
    ASSERT_OK(ThreadPoolBuilder("log").Build(&log_thread_pool_));
86
10
    ASSERT_OK(log::Log::Open(log::LogOptions(),
87
10
                            kTestTablet,
88
10
                            fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet),
89
10
                            fs_manager_->uuid(),
90
10
                            schema_,
91
10
                            0, // schema_version
92
10
                            nullptr,
93
10
                            nullptr,
94
10
                            log_thread_pool_.get(),
95
10
                            log_thread_pool_.get(),
96
10
                            std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index
97
10
                            &log_));
98
10
    clock_.reset(new server::HybridClock());
99
10
    ASSERT_OK(clock_->Init());
100
101
10
    ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
102
10
    CloseAndReopenQueue();
103
10
  }
104
105
12
  void CloseAndReopenQueue() {
106
    // Blow away the memtrackers before creating the new queue.
107
12
    queue_.reset();
108
12
    auto token = raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
109
12
    queue_.reset(new PeerMessageQueue(metric_entity_,
110
12
                                      log_.get(),
111
12
                                      nullptr /* server_tracker */,
112
12
                                      nullptr /* parent_tracker */,
113
12
                                      FakeRaftPeerPB(kLeaderUuid),
114
12
                                      kTestTablet,
115
12
                                      clock_,
116
12
                                      nullptr /* consensus_context */,
117
12
                                      std::move(token)));
118
12
    consensus_.reset(new TestRaftConsensusQueueIface());
119
12
    queue_->RegisterObserver(consensus_.get());
120
12
  }
121
122
10
  void TearDown() override {
123
10
    ASSERT_OK(log_->WaitUntilAllFlushed());
124
10
    queue_->Close();
125
10
  }
126
127
23
  Status AppendReplicateMsg(int term, int index, int payload_size) {
128
23
    return queue_->TEST_AppendOperation(CreateDummyReplicate(
129
23
        term, index, clock_->Now(), payload_size));
130
23
  }
131
132
  // Updates the peer's watermark in the queue so that it matches
133
  // the operation we want, since the queue always assumes that
134
  // when a peer gets tracked it's always tracked starting at the
135
  // last operation in the queue
136
  bool UpdatePeerWatermarkToOp(ConsensusRequestPB* request,
137
                               ConsensusResponsePB* response,
138
                               const OpIdPB& last_received,
139
                               const OpIdPB& last_received_current_leader,
140
5
                               int64_t last_committed_idx) {
141
5
    queue_->TrackPeer(kPeerUuid);
142
5
    response->set_responder_uuid(kPeerUuid);
143
144
    // Ask for a request. The queue assumes the peer is up-to-date so this should contain no
145
    // operations.
146
5
    ReplicateMsgsHolder refs;
147
5
    bool needs_remote_bootstrap;
148
5
    EXPECT_OK(queue_->RequestForPeer(kPeerUuid, request, &refs, &needs_remote_bootstrap));
149
5
    EXPECT_FALSE(needs_remote_bootstrap);
150
5
    EXPECT_EQ(request->ops_size(), 0);
151
152
    // Refuse saying that the log matching property check failed and
153
    // that our last operation is actually 'last_received'.
154
5
    RefuseWithLogPropertyMismatch(response, last_received, last_received_current_leader);
155
5
    response->mutable_status()->set_last_committed_idx(last_committed_idx);
156
5
    bool result = queue_->ResponseFromPeer(response->responder_uuid(), *response);
157
5
    request->Clear();
158
5
    response->mutable_status()->Clear();
159
5
    return result;
160
5
  }
161
162
  // Like the above but uses the last received index as the commtited index.
163
  bool UpdatePeerWatermarkToOp(ConsensusRequestPB* request,
164
                               ConsensusResponsePB* response,
165
                               const OpIdPB& last_received,
166
4
                               const OpIdPB& last_received_current_leader) {
167
4
    return UpdatePeerWatermarkToOp(request, response, last_received,
168
4
                                   last_received_current_leader,
169
4
                                   last_received.index());
170
4
  }
171
172
  void RefuseWithLogPropertyMismatch(ConsensusResponsePB* response,
173
                                     const OpIdPB& last_received,
174
5
                                     const OpIdPB& last_received_current_leader) {
175
5
    ConsensusStatusPB* status = response->mutable_status();
176
5
    status->mutable_last_received()->CopyFrom(last_received);
177
5
    status->mutable_last_received_current_leader()->CopyFrom(last_received_current_leader);
178
5
    ConsensusErrorPB* error = status->mutable_error();
179
5
    error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
180
5
    StatusToPB(STATUS(IllegalState, "LMP failed."), error->mutable_status());
181
5
  }
182
183
6
  void WaitForLocalPeerToAckIndex(int64_t index) {
184
11
    while (true) {
185
11
      PeerMessageQueue::TrackedPeer leader = queue_->GetTrackedPeerForTests(kLeaderUuid);
186
11
      if (leader.last_received.index >= index) {
187
6
        break;
188
6
      }
189
5
      SleepFor(MonoDelta::FromMilliseconds(10));
190
5
    }
191
6
  }
192
193
  // Sets the last received op on the response, as well as the last committed index.
194
  void SetLastReceivedAndLastCommitted(ConsensusResponsePB* response,
195
                                       const OpId& last_received,
196
                                       const OpId& last_received_current_leader,
197
22
                                       int64_t last_committed_idx) {
198
22
    last_received.ToPB(response->mutable_status()->mutable_last_received());
199
22
    last_received_current_leader.ToPB(
200
22
        response->mutable_status()->mutable_last_received_current_leader());
201
22
    response->mutable_status()->set_last_committed_idx(last_committed_idx);
202
22
  }
203
204
  // Like the above but uses the same last_received for current term.
205
  void SetLastReceivedAndLastCommitted(ConsensusResponsePB* response,
206
                                       const OpId& last_received,
207
19
                                       int64_t last_committed_idx) {
208
19
    SetLastReceivedAndLastCommitted(response, last_received, last_received, last_committed_idx);
209
19
  }
210
211
  // Like the above but just sets the last committed index to have the same index
212
  // as the last received op.
213
  void SetLastReceivedAndLastCommitted(ConsensusResponsePB* response,
214
16
                                       const OpId& last_received) {
215
16
    SetLastReceivedAndLastCommitted(response, last_received, last_received.index);
216
16
  }
217
218
 protected:
219
  std::unique_ptr<TestRaftConsensusQueueIface> consensus_;
220
  const Schema schema_;
221
  std::unique_ptr<FsManager> fs_manager_;
222
  MetricRegistry metric_registry_;
223
  scoped_refptr<MetricEntity> metric_entity_;
224
  std::unique_ptr<ThreadPool> log_thread_pool_;
225
  scoped_refptr<log::Log> log_;
226
  std::unique_ptr<ThreadPool> raft_pool_;
227
  std::unique_ptr<PeerMessageQueue> queue_;
228
  scoped_refptr<log::LogAnchorRegistry> registry_;
229
  scoped_refptr<server::Clock> clock_;
230
};
231
232
// Tests that the queue is able to track a peer when it starts tracking a peer
233
// after the initial message in the queue. In particular this creates a queue
234
// with several messages and then starts to track a peer whose watermark
235
// falls in the middle of the current messages in the queue.
236
1
TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
237
1
  queue_->Init(OpId::Min());
238
1
  queue_->SetLeaderMode(
239
1
      OpId::Min(), OpId::Min().term, OpId::Min(), BuildRaftConfigPBForTests(2));
240
1
  AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
241
242
1
  ConsensusRequestPB request;
243
1
  ConsensusResponsePB response;
244
1
  response.set_responder_uuid(kPeerUuid);
245
246
  // Peer already has some messages, last one being index (kNumMessages / 2)
247
1
  OpIdPB last_received = MakeOpIdPbForIndex(kNumMessages / 2);
248
1
  OpIdPB last_received_current_leader = MinimumOpId();
249
250
1
  ASSERT_TRUE(UpdatePeerWatermarkToOp(
251
1
      &request, &response, last_received, last_received_current_leader));
252
253
  // Getting a new request should get all operations after 7.50
254
1
  ReplicateMsgsHolder refs;
255
1
  bool needs_remote_bootstrap;
256
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
257
1
  ASSERT_FALSE(needs_remote_bootstrap);
258
1
  ASSERT_EQ(kNumMessages / 2, request.ops_size());
259
260
1
  SetLastReceivedAndLastCommitted(
261
1
      &response, OpId::FromPB(request.ops((kNumMessages / 2) - 1).id()));
262
2
  ASSERT_FALSE(queue_->ResponseFromPeer(response.responder_uuid(), response))
263
2
      << "Queue still had requests pending";
264
265
  // if we ask for a new request, it should come back empty
266
1
  refs.Reset();
267
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
268
1
  ASSERT_FALSE(needs_remote_bootstrap);
269
1
  ASSERT_EQ(0, request.ops_size());
270
1
}
271
272
// Tests that the peers gets the messages pages, with the size of a page being
273
// 'consensus_max_batch_size_bytes'
274
1
TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
275
1
  queue_->Init(OpId::Min());
276
1
  queue_->SetLeaderMode(
277
1
      OpId::Min(), OpId::Min().term, OpId::Min(), BuildRaftConfigPBForTests(2));
278
279
1
  const int kOpsPerRequest = 9;
280
1
  int32_t page_size_estimate = 0;
281
1
  {
282
    // Helper to estimate request size so that we can set the max batch size appropriately.
283
1
    ConsensusRequestPB page_size_estimator;
284
1
    page_size_estimator.set_caller_term(14);
285
1
    OpIdPB* committed_index = page_size_estimator.mutable_committed_op_id();
286
1
    OpIdPB* preceding_id = page_size_estimator.mutable_preceding_id();
287
288
    // The actual leader lease duration does not matter here, we just want it to be set.
289
1
    page_size_estimator.set_leader_lease_duration_ms(kDefaultLeaderLeaseDurationMs);
290
1
    page_size_estimator.set_ht_lease_expiration(1000);
291
1
    const HybridTime ht = clock_->Now();
292
1
    page_size_estimator.set_propagated_hybrid_time(ht.ToUint64());
293
1
    committed_index->CopyFrom(MinimumOpId());
294
1
    preceding_id->CopyFrom(MinimumOpId());
295
296
    // We're going to add 100 messages to the queue so we make each page fetch 9 of those,
297
    // for a total of 12 pages. The last page should have a single op.
298
1
    ReplicateMsgs replicates;
299
10
    for (int i = 0; i < kOpsPerRequest; i++) {
300
9
      replicates.push_back(CreateDummyReplicate(
301
9
          0 /* term */, 0 /* index */, ht, 0 /* payload_size */));
302
9
      page_size_estimator.mutable_ops()->AddAllocated(replicates.back().get());
303
9
    }
304
305
1
    page_size_estimate = page_size_estimator.ByteSize();
306
1
    LOG(INFO) << "page_size_estimate=" << page_size_estimate;
307
1
    page_size_estimator.mutable_ops()->ExtractSubrange(0,
308
1
                                                       page_size_estimator.ops_size(),
309
1
                                                       /* elements */ nullptr);
310
1
  }
311
312
  // Save the current flag state.
313
1
  google::FlagSaver saver;
314
1
  FLAGS_consensus_max_batch_size_bytes = page_size_estimate;
315
316
1
  ConsensusRequestPB request;
317
1
  ConsensusResponsePB response;
318
1
  response.set_responder_uuid(kPeerUuid);
319
320
1
  ASSERT_TRUE(UpdatePeerWatermarkToOp(&request, &response, MinimumOpId(), MinimumOpId()));
321
322
  // Append the messages after the queue is tracked. Otherwise the ops might
323
  // get evicted from the cache immediately and the requests below would
324
  // result in async log reads instead of cache hits.
325
1
  AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
326
327
1
  OpIdPB last;
328
12
  for (int i = 0; i < 11; i++) {
329
0
    VLOG(1) << "Making request " << i;
330
11
    ReplicateMsgsHolder refs;
331
11
    bool needs_remote_bootstrap;
332
11
    ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
333
334
11
    ASSERT_FALSE(needs_remote_bootstrap);
335
11
    LOG(INFO) << "Number of ops in request: " << request.ops_size();
336
11
    ASSERT_EQ(kOpsPerRequest, request.ops_size());
337
11
    last = request.ops(request.ops_size() - 1).id();
338
11
    SetLastReceivedAndLastCommitted(&response, OpId::FromPB(last));
339
0
    VLOG(1) << "Faking received up through " << last;
340
11
    ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response));
341
11
  }
342
1
  ReplicateMsgsHolder refs;
343
1
  bool needs_remote_bootstrap;
344
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
345
1
  ASSERT_FALSE(needs_remote_bootstrap);
346
1
  ASSERT_EQ(1, request.ops_size());
347
1
  last = request.ops(request.ops_size() - 1).id();
348
1
  SetLastReceivedAndLastCommitted(&response, OpId::FromPB(last));
349
1
  ASSERT_FALSE(queue_->ResponseFromPeer(response.responder_uuid(), response));
350
1
}
351
352
1
TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
353
1
  queue_->Init(OpId::Min());
354
1
  queue_->SetLeaderMode(
355
1
      OpId::Min(), OpId::Min().term, OpId::Min(), BuildRaftConfigPBForTests(3));
356
1
  AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, kNumMessages);
357
358
  // Wait for the local peer to append all messages
359
1
  WaitForLocalPeerToAckIndex(kNumMessages);
360
361
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), OpId::Min());
362
  // Since we're tracking a single peer still this should have moved the all
363
  // replicated watermark to the last op appended to the local log.
364
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), MakeOpIdForIndex(kNumMessages));
365
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), OpId::Min());
366
1
  ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min());
367
368
  // Start to track the peer after the queue has some messages in it
369
  // at a point that is halfway through the current messages in the queue.
370
1
  OpIdPB first_msg = MakeOpIdPbForIndex(kNumMessages / 2);
371
372
1
  ConsensusRequestPB request;
373
1
  ConsensusResponsePB response;
374
1
  response.set_responder_uuid(kPeerUuid);
375
376
1
  ASSERT_TRUE(UpdatePeerWatermarkToOp(&request, &response, first_msg, MinimumOpId()));
377
378
  // Tracking a peer a new peer should have moved the all replicated watermark back.
379
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), OpId::Min());
380
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), OpId::Min());
381
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), OpId::Min());
382
1
  ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min());
383
384
1
  ReplicateMsgsHolder refs;
385
1
  bool needs_remote_bootstrap;
386
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
387
1
  ASSERT_FALSE(needs_remote_bootstrap);
388
1
  ASSERT_EQ(kNumMessages / 2, request.ops_size());
389
390
1
  AppendReplicateMessagesToQueue(queue_.get(), clock_, 101, kNumMessages);
391
392
1
  SetLastReceivedAndLastCommitted(
393
1
      &response, OpId::FromPB(request.ops((kNumMessages / 2) - 1).id()));
394
1
  response.set_responder_term(28);
395
396
2
  ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response))
397
2
      << "Queue didn't have anymore requests pending";
398
399
1
  auto expected_op_id = MakeOpIdForIndex(kNumMessages);
400
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_op_id);
401
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_op_id);
402
1
  consensus_->WaitForMajorityReplicatedIndex(expected_op_id.index);
403
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_op_id);
404
405
  // if we ask for a new request, it should come back with the rest of the messages
406
1
  refs.Reset();
407
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
408
1
  ASSERT_FALSE(needs_remote_bootstrap);
409
1
  ASSERT_EQ(kNumMessages, request.ops_size());
410
411
1
  OpId expected = OpId::FromPB(request.ops(kNumMessages - 1).id());
412
413
1
  SetLastReceivedAndLastCommitted(&response, expected);
414
1
  response.set_responder_term(expected.term);
415
2
  ASSERT_FALSE(queue_->ResponseFromPeer(response.responder_uuid(), response))
416
2
      << "Queue didn't have anymore requests pending";
417
418
1
  WaitForLocalPeerToAckIndex(expected.index);
419
420
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected);
421
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected);
422
1
  consensus_->WaitForMajorityReplicatedIndex(expected.index);
423
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected);
424
1
}
425
426
1
TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
427
1
  queue_->Init(OpId::Min());
428
1
  queue_->SetLeaderMode(
429
1
      OpId::Min(), OpId::Min().term, OpId::Min(), BuildRaftConfigPBForTests(5));
430
  // Track 4 additional peers (in addition to the local peer)
431
1
  queue_->TrackPeer("peer-1");
432
1
  queue_->TrackPeer("peer-2");
433
1
  queue_->TrackPeer("peer-3");
434
1
  queue_->TrackPeer("peer-4");
435
436
  // Append 10 messages to the queue with a majority of 2 for a total of 3 peers.
437
  // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
438
1
  AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
439
1
  WaitForLocalPeerToAckIndex(10);
440
441
  // Since only the local log might have ACKed at this point,
442
  // the committed_index should be MinimumOpId().
443
1
  queue_->raft_pool_observers_token_->Wait();
444
1
  ASSERT_EQ(queue_->TEST_GetCommittedIndex(), OpId::Min());
445
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), OpId::Min());
446
1
  ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min());
447
448
  // NOTE: We don't need to get operations from the queue. The queue
449
  // only cares about what the peer reported as received, not what was sent.
450
1
  ConsensusResponsePB response;
451
1
  response.set_responder_term(1);
452
453
1
  OpId last_sent = MakeOpIdForIndex(5);
454
455
  // Ack the first five operations for peer-1
456
1
  response.set_responder_uuid("peer-1");
457
1
  SetLastReceivedAndLastCommitted(&response, last_sent, MinimumOpId().index());
458
459
1
  ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response));
460
461
  // Committed index should be the same
462
1
  queue_->raft_pool_observers_token_->Wait();
463
1
  ASSERT_EQ(queue_->TEST_GetCommittedIndex(), OpId::Min());
464
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), OpId::Min());
465
1
  ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min());
466
467
  // Ack the first five operations for peer-2
468
1
  response.set_responder_uuid("peer-2");
469
1
  ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response));
470
471
  // A majority has now replicated up to 0.5.
472
1
  queue_->raft_pool_observers_token_->Wait();
473
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), MakeOpIdForIndex(5));
474
475
1
  string up_to_date_peer = queue_->GetUpToDatePeer();
476
1
  ASSERT_TRUE((up_to_date_peer == "peer-2") || (up_to_date_peer == "peer-1"));
477
478
  // Ack all operations for peer-3
479
1
  response.set_responder_uuid("peer-3");
480
1
  last_sent = MakeOpIdForIndex(10);
481
1
  SetLastReceivedAndLastCommitted(&response, last_sent, OpId::Min().index);
482
483
  // The committed index moved so 'more_pending' should be true so that the peer is
484
  // notified.
485
1
  ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response));
486
487
1
  up_to_date_peer.clear();
488
1
  up_to_date_peer = queue_->GetUpToDatePeer();
489
1
  ASSERT_EQ(up_to_date_peer, "peer-3");
490
491
  // Majority replicated watermark should be the same
492
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), MakeOpIdForIndex(5));
493
494
  // Ack the remaining operations for peer-4
495
1
  response.set_responder_uuid("peer-4");
496
1
  ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response));
497
498
1
  up_to_date_peer.clear();
499
1
  up_to_date_peer = queue_->GetUpToDatePeer();
500
1
  ASSERT_TRUE((up_to_date_peer == "peer-3") || (up_to_date_peer == "peer-4"));
501
502
  // Now that a majority of peers have replicated an operation in the queue's
503
  // term the committed index should advance.
504
1
  queue_->raft_pool_observers_token_->Wait();
505
1
  const auto expected_op_id = MakeOpIdForIndex(10);
506
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_op_id);
507
1
  ASSERT_EQ(queue_->TEST_GetCommittedIndex(), expected_op_id);
508
1
  consensus_->WaitForMajorityReplicatedIndex(expected_op_id.index);
509
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_op_id);
510
1
}
511
512
// In this test we append a sequence of operations to a log
513
// and then start tracking a peer whose first required operation
514
// is before the first operation in the queue.
515
1
TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
516
1
  OpIdPB opid = MakeOpId(1, 1);
517
518
101
  for (int i = 1; i <= 100; i++) {
519
100
    ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid));
520
    // Roll the log every 10 ops
521
100
    if (i % 10 == 0) {
522
10
      ASSERT_OK(log_->AllocateSegmentAndRollOver());
523
10
    }
524
100
  }
525
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
526
527
1
  OpIdPB queues_last_op = opid;
528
1
  queues_last_op.set_index(queues_last_op.index() - 1);
529
530
  // Now reset the queue so that we can pass a new committed index,
531
  // the last operation in the log.
532
1
  CloseAndReopenQueue();
533
534
1
  OpId committed_index(1, 100);
535
1
  const auto last_applied_op = committed_index;
536
1
  queue_->Init(committed_index);
537
1
  queue_->SetLeaderMode(
538
1
      committed_index, committed_index.term, last_applied_op, BuildRaftConfigPBForTests(3));
539
540
1
  ConsensusRequestPB request;
541
1
  ConsensusResponsePB response;
542
1
  response.set_responder_uuid(kPeerUuid);
543
544
  // The peer will actually be behind the first operation in the queue
545
  // in this case about 50 operations before.
546
1
  OpIdPB peers_last_op;
547
1
  peers_last_op.set_term(1);
548
1
  peers_last_op.set_index(50);
549
550
  // Now we start tracking the peer, this negotiation round should let
551
  // the queue know how far along the peer is.
552
  // The queue should reply that there are more messages for the peer.
553
1
  ASSERT_TRUE(UpdatePeerWatermarkToOp(
554
1
      &request, &response, peers_last_op, MinimumOpId()));
555
556
  // When we get another request for the peer the queue should load
557
  // the missing operations.
558
1
  ReplicateMsgsHolder refs;
559
1
  bool needs_remote_bootstrap;
560
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
561
1
  ASSERT_FALSE(needs_remote_bootstrap);
562
1
  ASSERT_EQ(request.ops_size(), 50);
563
1
}
564
565
// This tests that the queue is able to handle operation overwriting, i.e. when a
566
// newly tracked peer reports the last received operations as some operation that
567
// doesn't exist in the leader's log. In particular it tests the case where a
568
// new leader starts at term 2 with only a part of the operations of the previous
569
// leader having been committed.
570
1
TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
571
1
  OpIdPB opid = MakeOpId(1, 1);
572
  // Append 10 messages in term 1 to the log.
573
11
  for (int i = 1; i <= 10; i++) {
574
10
    ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid));
575
    // Roll the log every 3 ops
576
10
    if (i % 3 == 0) {
577
3
      ASSERT_OK(log_->AllocateSegmentAndRollOver());
578
3
    }
579
10
  }
580
581
1
  opid = MakeOpId(2, 11);
582
  // Now append 10 more messages in term 2.
583
11
  for (int i = 11; i <= 20; i++) {
584
10
    ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid));
585
    // Roll the log every 3 ops
586
10
    if (i % 3 == 0) {
587
3
      ASSERT_OK(log_->AllocateSegmentAndRollOver());
588
3
    }
589
10
  }
590
591
592
  // Now reset the queue so that we can pass a new committed index,
593
  // op, 2.15.
594
1
  CloseAndReopenQueue();
595
596
1
  const OpId committed_op_id(2, 15);
597
1
  queue_->Init(OpId(2, 20));
598
1
  queue_->SetLeaderMode(
599
1
      committed_op_id, committed_op_id.term, committed_op_id,
600
1
      BuildRaftConfigPBForTests(3));
601
602
  // Now get a request for a simulated old leader, which contains more operations
603
  // in term 1 than the new leader has.
604
  // The queue should realize that the old leader's last received doesn't exist
605
  // and send it operations starting at the old leader's committed index.
606
1
  ConsensusRequestPB request;
607
1
  ConsensusResponsePB response;
608
1
  response.set_responder_uuid(kPeerUuid);
609
610
1
  queue_->TrackPeer(kPeerUuid);
611
612
  // Ask for a request. The queue assumes the peer is up-to-date so
613
  // this should contain no operations.
614
1
  ReplicateMsgsHolder refs;
615
1
  bool needs_remote_bootstrap;
616
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
617
1
  ASSERT_FALSE(needs_remote_bootstrap);
618
1
  ASSERT_EQ(request.ops_size(), 0);
619
1
  ASSERT_OPID_EQ(request.preceding_id(), MakeOpId(2, 20));
620
1
  ASSERT_EQ(OpId::FromPB(request.committed_op_id()), committed_op_id);
621
622
  // The old leader was still in term 1 but it increased its term with our request.
623
1
  response.set_responder_term(2);
624
625
  // We emulate that the old leader had 25 total operations in Term 1 (15 more than we knew about)
626
  // which were never committed, and that its last known committed index was 5.
627
1
  ConsensusStatusPB* status = response.mutable_status();
628
1
  status->mutable_last_received()->CopyFrom(MakeOpId(1, 25));
629
1
  status->mutable_last_received_current_leader()->CopyFrom(MinimumOpId());
630
1
  status->set_last_committed_idx(5);
631
1
  ConsensusErrorPB* error = status->mutable_error();
632
1
  error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
633
1
  StatusToPB(STATUS(IllegalState, "LMP failed."), error->mutable_status());
634
635
  // The queue should reply that there are more operations pending.
636
1
  ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response));
637
1
  request.Clear();
638
639
  // We're waiting for a two nodes. The all committed watermark should be
640
  // 0.0 since we haven't had a successful exchange with the 'remote' peer.
641
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), OpId::Min());
642
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), committed_op_id);
643
1
  ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min());
644
645
  // Test even when a correct peer responds (meaning we actually get to execute
646
  // watermark advancement) we sill have the same all-replicated watermark.
647
1
  auto replicate = CreateDummyReplicate(2, 21, clock_->Now(), 0);
648
1
  ASSERT_OK(queue_->TEST_AppendOperation(replicate));
649
1
  WaitForLocalPeerToAckIndex(21);
650
651
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), OpId::Min());
652
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), committed_op_id);
653
1
  ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min());
654
655
  // Generate another request for the remote peer, which should include
656
  // all of the ops since the peer's last-known committed index.
657
1
  refs.Reset();
658
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
659
1
  ASSERT_FALSE(needs_remote_bootstrap);
660
1
  ASSERT_OPID_EQ(MakeOpId(1, 5), request.preceding_id());
661
1
  ASSERT_EQ(16, request.ops_size());
662
663
  // Now when we respond the watermarks should advance.
664
1
  response.mutable_status()->clear_error();
665
1
  SetLastReceivedAndLastCommitted(&response, OpId(2, 21), 5);
666
1
  queue_->ResponseFromPeer(response.responder_uuid(), response);
667
668
  // Now the watermark should have advanced.
669
1
  const auto expected_op_id = OpId(2, 21);
670
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_op_id);
671
1
  consensus_->WaitForMajorityReplicatedIndex(expected_op_id.index);
672
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_op_id);
673
1
}
674
675
// Test for a bug where we wouldn't move any watermark back, when overwriting
676
// operations, which would cause a check failure on the write immediately
677
// following the overwriting write.
678
1
TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
679
1
  RestartSafeCoarseMonoClock restart_safe_coarse_mono_clock;
680
1
  queue_->Init(OpId::Min());
681
1
  queue_->SetNonLeaderMode();
682
  // Append a bunch of messages.
683
1
  AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
684
1
  ASSERT_OK(log_->WaitUntilAllFlushed());
685
1
  ASSERT_EQ(queue_->TEST_GetLastAppended(), OpId(1, 10));
686
  // Now rewrite some of the operations and wait for the log to append.
687
1
  ASSERT_OK(queue_->AppendOperations(
688
1
      { CreateDummyReplicate(2, 5, clock_->Now(), 0) }, yb::OpId() /* committed_op_id */,
689
1
        restart_safe_coarse_mono_clock.Now()));
690
691
  // Wait for the operation to be in the log.
692
1
  log_->WaitForSafeOpIdToApply(yb::OpId(2, 5));
693
694
  // Without the fix the following append would trigger a check failure
695
  // in log cache.
696
1
  ASSERT_OK(queue_->AppendOperations(
697
1
      { CreateDummyReplicate(2, 6, clock_->Now(), 0) }, yb::OpId() /* committed_op_id */,
698
1
      restart_safe_coarse_mono_clock.Now()));
699
700
  // Wait for the operation to be in the log.
701
1
  log_->WaitForSafeOpIdToApply(yb::OpId(2, 6));
702
703
  // Now the last appended should have moved backward.
704
1
  ASSERT_OK(WaitFor(
705
1
        [this]() { return queue_->TEST_GetLastAppended() == OpId(2, 6); },
706
1
        1s, "AllReplicatedOpIdForTests", 10ms));
707
1
}
708
709
// Tests that we're advancing the watermarks properly and only when the peer
710
// has a prefix of our log. This also tests for a specific bug that we had. Here's
711
// the scenario:
712
// Peer would report:
713
//   - last received 75.49
714
//   - last committed 72.31
715
//
716
// Queue has messages:
717
// 72.31-72.45
718
// 73.46-73.51
719
// 76.52-76.53
720
//
721
// The queue has more messages than the peer, but the peer has messages
722
// that the queue doesn't and which will be overwritten.
723
//
724
// In the first round of negotiation the peer would report LMP mismatch.
725
// In the second round the queue would try to send it messages starting at 75.49
726
// but since that message didn't exist in the queue's log it would instead send
727
// messages starting at 72.31. However, because the batches were big it was only
728
// able to send a few messages (e.g. up to 72.40).
729
//
730
// Since in this last exchange everything went ok (the peer still doesn't know
731
// that messages will be overwritten later), the queue would mark the exchange
732
// as successful and the peer's last received would be taken into account when
733
// calculating watermarks, which was incorrect.
734
1
TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog) {
735
1
  FLAGS_consensus_max_batch_size_bytes = 1024 * 10;
736
737
1
  queue_->Init(OpId(72, 30));
738
1
  queue_->SetLeaderMode(OpId(72, 31), 76, OpId(72, 31), BuildRaftConfigPBForTests(3));
739
740
1
  ConsensusRequestPB request;
741
1
  ConsensusResponsePB response;
742
743
  // We expect the majority replicated watermark to star at the committed index.
744
1
  OpId expected_majority_replicated(72, 31);
745
  // We expect the all replicated watermark to be reset when we track a new peer.
746
1
  OpId expected_all_replicated = OpId::Min();
747
1
  auto expected_last_applied = expected_majority_replicated;
748
749
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_majority_replicated);
750
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_all_replicated);
751
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_last_applied);
752
753
1
  ASSERT_TRUE(UpdatePeerWatermarkToOp(&request, &response, MakeOpId(75, 49), MinimumOpId(), 31));
754
755
24
  for (int i = 31; i <= 53; i++) {
756
23
    if (i <= 45) {
757
15
      ASSERT_OK(AppendReplicateMsg(72, i, 1024));
758
15
      continue;
759
8
    }
760
8
    if (i <= 51) {
761
6
      ASSERT_OK(AppendReplicateMsg(73, i, 1024));
762
6
      continue;
763
2
    }
764
2
    ASSERT_OK(AppendReplicateMsg(76, i, 1024));
765
2
  }
766
767
1
  WaitForLocalPeerToAckIndex(53);
768
769
  // When we get operations for this peer we should get them starting immediately after
770
  // the committed index, for a total of 9 operations.
771
1
  ReplicateMsgsHolder refs;
772
1
  bool needs_remote_bootstrap;
773
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
774
1
  ASSERT_FALSE(needs_remote_bootstrap);
775
1
  ASSERT_EQ(request.ops_size(), 9);
776
1
  ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(72, 32));
777
1
  const OpIdPB* last_op = &request.ops(request.ops_size() - 1).id();
778
779
  // When the peer acks that it received an operation that is not in our current
780
  // term, it gets ignored in terms of watermark advancement.
781
1
  SetLastReceivedAndLastCommitted(&response, OpId(75, 49), OpId::FromPB(*last_op), 31);
782
1
  ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response));
783
784
  // We've sent (and received and ack) up to 72.40 from the remote peer
785
1
  expected_majority_replicated = OpId(72, 40);
786
1
  expected_all_replicated = expected_majority_replicated;
787
1
  expected_last_applied = expected_majority_replicated;
788
789
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_majority_replicated);
790
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_all_replicated);
791
1
  consensus_->WaitForMajorityReplicatedIndex(expected_last_applied.index);
792
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_last_applied);
793
794
  // Another request for this peer should get another page of messages. Still not
795
  // on the queue's term (and thus without advancing watermarks).
796
1
  refs.Reset();
797
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
798
1
  ASSERT_FALSE(needs_remote_bootstrap);
799
1
  ASSERT_EQ(request.ops_size(), 9);
800
1
  ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(72, 41));
801
1
  last_op = &request.ops(request.ops_size() - 1).id();
802
803
1
  SetLastReceivedAndLastCommitted(&response, OpId(75, 49), OpId::FromPB(*last_op), 31);
804
1
  queue_->ResponseFromPeer(response.responder_uuid(), response);
805
806
  // We've now sent (and received an ack) up to 73.39
807
1
  expected_majority_replicated = OpId(73, 49);
808
1
  expected_all_replicated = expected_majority_replicated;
809
1
  expected_last_applied = expected_majority_replicated;
810
811
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_majority_replicated);
812
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_all_replicated);
813
1
  consensus_->WaitForMajorityReplicatedIndex(expected_last_applied.index);
814
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_last_applied);
815
816
  // The last page of request should overwrite the peer's operations and the
817
  // response should finally advance the watermarks.
818
1
  refs.Reset();
819
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
820
1
  ASSERT_FALSE(needs_remote_bootstrap);
821
1
  ASSERT_EQ(request.ops_size(), 4);
822
1
  ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(73, 50));
823
824
  // We're done, both watermarks should be at the end.
825
1
  expected_majority_replicated = OpId(76, 53);
826
1
  expected_all_replicated = expected_majority_replicated;
827
1
  expected_last_applied = expected_majority_replicated;
828
829
1
  SetLastReceivedAndLastCommitted(&response, expected_majority_replicated,
830
1
                                  expected_majority_replicated, 31);
831
1
  queue_->ResponseFromPeer(response.responder_uuid(), response);
832
833
1
  ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_majority_replicated);
834
1
  ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_all_replicated);
835
1
  consensus_->WaitForMajorityReplicatedIndex(expected_last_applied.index);
836
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_last_applied);
837
838
1
  request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr);
839
1
}
840
841
// Test that remote bootstrap is triggered when a "tablet not found" error occurs.
842
1
TEST_F(ConsensusQueueTest, TestTriggerRemoteBootstrapIfTabletNotFound) {
843
1
  queue_->Init(OpId::Min());
844
1
  queue_->SetLeaderMode(
845
1
      OpId::Min(), OpId::Min().term, OpId::Min(), BuildRaftConfigPBForTests(3));
846
1
  AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
847
848
1
  ConsensusRequestPB request;
849
1
  ConsensusResponsePB response;
850
1
  response.set_responder_uuid(kPeerUuid);
851
1
  queue_->TrackPeer(kPeerUuid);
852
853
  // Create request for new peer.
854
1
  ReplicateMsgsHolder refs;
855
1
  bool needs_remote_bootstrap;
856
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
857
1
  ASSERT_FALSE(needs_remote_bootstrap);
858
859
  // Peer responds with tablet not found.
860
1
  response.mutable_error()->set_code(tserver::TabletServerErrorPB::TABLET_NOT_FOUND);
861
1
  StatusToPB(STATUS(NotFound, "No such tablet"), response.mutable_error()->mutable_status());
862
863
  // If the peer needs remote bootstrap, more_pending should be set to true.
864
1
  ASSERT_TRUE(queue_->ResponseFromPeer(kPeerUuid, response));
865
866
  // On the next request, we should find out that the queue wants us to remotely bootstrap.
867
1
  request.Clear();
868
1
  refs.Reset();
869
1
  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap));
870
1
  ASSERT_TRUE(needs_remote_bootstrap);
871
872
1
  StartRemoteBootstrapRequestPB rb_req;
873
1
  ASSERT_OK(queue_->GetRemoteBootstrapRequestForPeer(kPeerUuid, &rb_req));
874
875
2
  ASSERT_TRUE(rb_req.IsInitialized()) << rb_req.ShortDebugString();
876
1
  ASSERT_EQ(kTestTablet, rb_req.tablet_id());
877
1
  ASSERT_EQ(kLeaderUuid, rb_req.bootstrap_peer_uuid());
878
1
  ASSERT_EQ(FakeRaftPeerPB(kLeaderUuid).last_known_private_addr()[0].ShortDebugString(),
879
1
            rb_req.source_private_addr()[0].ShortDebugString());
880
1
}
881
882
// Tests that ReadReplicatedMessagesForCDC() only reads messages until the last known
883
// committed index.
884
1
TEST_F(ConsensusQueueTest, TestReadReplicatedMessagesForCDC) {
885
1
  auto start_op_id = MakeOpIdForIndex(3); // Starting after the normal first index.
886
1
  queue_->Init(start_op_id);
887
1
  queue_->SetLeaderMode(
888
1
      start_op_id, start_op_id.term, start_op_id, BuildRaftConfigPBForTests(2));
889
1
  queue_->TrackPeer(kPeerUuid);
890
891
1
  AppendReplicateMessagesToQueue(queue_.get(), clock_, start_op_id.index, kNumMessages);
892
893
  // Wait for the local peer to append all messages.
894
1
  WaitForLocalPeerToAckIndex(kNumMessages);
895
896
  // Since only the local log might have ACKed at this point,
897
  // the committed_index should be MinimumOpId().
898
1
  queue_->raft_pool_observers_token_->Wait();
899
1
  ASSERT_EQ(queue_->TEST_GetCommittedIndex(), start_op_id);
900
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), start_op_id);
901
902
1
  ConsensusResponsePB response;
903
1
  response.set_responder_uuid(kPeerUuid);
904
905
1
  int last_committed_index = kNumMessages - 20;
906
  // Ack last_committed_index messages.
907
1
  SetLastReceivedAndLastCommitted(&response, MakeOpIdForIndex(last_committed_index));
908
1
  ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response));
909
1
  queue_->raft_pool_observers_token_->Wait();
910
1
  const auto expected_op_id = MakeOpIdForIndex(last_committed_index);
911
1
  ASSERT_EQ(queue_->TEST_GetCommittedIndex(), expected_op_id);
912
1
  consensus_->WaitForMajorityReplicatedIndex(expected_op_id.index);
913
1
  ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_op_id);
914
915
  // Read from the start_op_id
916
1
  auto read_result = ASSERT_RESULT(queue_->ReadReplicatedMessagesForCDC(MakeOpIdForIndex(3)));
917
1
  ASSERT_EQ(last_committed_index - start_op_id.index, read_result.messages.size());
918
919
  // Start reading from 0.0 and ensure that we get the first known OpID.
920
1
  read_result = ASSERT_RESULT(queue_->ReadReplicatedMessagesForCDC(MakeOpIdForIndex(0)));
921
1
  ASSERT_EQ(last_committed_index - start_op_id.index, read_result.messages.size());
922
923
  // Read from some index > 0
924
1
  int start = 10;
925
1
  read_result = ASSERT_RESULT(queue_->ReadReplicatedMessagesForCDC(MakeOpIdForIndex(start)));
926
1
  ASSERT_EQ(last_committed_index - start, read_result.messages.size());
927
1
}
928
929
}  // namespace consensus
930
}  // namespace yb