YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_peers-test.cc
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <gtest/gtest.h>
34
35
#include "yb/common/schema.h"
36
#include "yb/common/wire_protocol-test-util.h"
37
38
#include "yb/consensus/consensus-test-util.h"
39
#include "yb/consensus/log.h"
40
#include "yb/consensus/log_util.h"
41
#include "yb/consensus/opid_util.h"
42
43
#include "yb/fs/fs_manager.h"
44
45
#include "yb/rpc/messenger.h"
46
47
#include "yb/server/hybrid_clock.h"
48
49
#include "yb/util/logging.h"
50
#include "yb/util/metrics.h"
51
#include "yb/util/opid.h"
52
#include "yb/util/scope_exit.h"
53
#include "yb/util/status_log.h"
54
#include "yb/util/test_macros.h"
55
#include "yb/util/test_util.h"
56
#include "yb/util/threadpool.h"
57
58
using namespace std::chrono_literals;
59
60
METRIC_DECLARE_entity(tablet);
61
62
namespace yb {
63
namespace consensus {
64
65
using log::Log;
66
using log::LogOptions;
67
using log::LogAnchorRegistry;
68
using rpc::Messenger;
69
using rpc::MessengerBuilder;
70
using std::shared_ptr;
71
using std::unique_ptr;
72
73
const char* kTableId = "test-peers-table";
74
const char* kTabletId = "test-peers-tablet";
75
const char* kLeaderUuid = "peer-0";
76
const char* kFollowerUuid = "peer-1";
77
78
class ConsensusPeersTest : public YBTest {
79
 public:
80
  ConsensusPeersTest()
81
      : metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "peer-test")),
82
5
        schema_(GetSimpleTestSchema()) {
83
5
  }
84
85
5
  void SetUp() override {
86
5
    YBTest::SetUp();
87
5
    MessengerBuilder bld("test");
88
5
    messenger_ = ASSERT_RESULT(bld.Build());
89
5
    ASSERT_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_));
90
5
    raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
91
5
    ASSERT_OK(ThreadPoolBuilder("log").Build(&log_thread_pool_));
92
5
    fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test"));
93
94
5
    ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
95
5
    ASSERT_OK(fs_manager_->Open());
96
5
    ASSERT_OK(Log::Open(options_,
97
5
                       kTabletId,
98
5
                       fs_manager_->GetFirstTabletWalDirOrDie(kTableId, kTabletId),
99
5
                       fs_manager_->uuid(),
100
5
                       schema_,
101
5
                       0, // schema_version
102
5
                       nullptr, // table_metric_entity
103
5
                       nullptr, // tablet_metric_entity
104
5
                       log_thread_pool_.get(),
105
5
                       log_thread_pool_.get(),
106
5
                       std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index
107
5
                       &log_));
108
5
    clock_.reset(new server::HybridClock());
109
5
    ASSERT_OK(clock_->Init());
110
111
5
    consensus_.reset(new TestRaftConsensusQueueIface());
112
5
    message_queue_.reset(new PeerMessageQueue(
113
5
        metric_entity_,
114
5
        log_.get(),
115
5
        nullptr /* server_tracker */,
116
5
        nullptr /* parent_tracker */,
117
5
        FakeRaftPeerPB(kLeaderUuid),
118
5
        kTabletId,
119
5
        clock_,
120
5
        nullptr /* consensus_context */,
121
5
        raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
122
5
    message_queue_->RegisterObserver(consensus_.get());
123
124
5
    message_queue_->Init(OpId::Min());
125
5
    message_queue_->SetLeaderMode(OpId::Min(),
126
5
                                  OpId().term,
127
5
                                  OpId::Min(),
128
5
                                  BuildRaftConfigPBForTests(3));
129
5
  }
130
131
5
  void TearDown() override {
132
5
    ASSERT_OK(log_->WaitUntilAllFlushed());
133
5
    log_thread_pool_->Shutdown();
134
5
    raft_pool_->Shutdown();
135
5
    messenger_->Shutdown();
136
5
  }
137
138
  DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer(
139
      const string& peer_name,
140
5
      std::shared_ptr<Peer>* peer) {
141
5
    RaftPeerPB peer_pb;
142
5
    peer_pb.set_permanent_uuid(peer_name);
143
5
    auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>(
144
5
        raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb));
145
5
    *peer = CHECK_RESULT(Peer::NewRemotePeer(
146
5
        peer_pb, kTabletId, kLeaderUuid, PeerProxyPtr(proxy_ptr), message_queue_.get(),
147
5
        nullptr /* multi raft batcher */, raft_pool_token_.get(),
148
5
        nullptr /* consensus */, messenger_.get()));
149
5
    return proxy_ptr;
150
5
  }
151
152
1
  void CheckLastLogEntry(int64_t term, int64_t index) {
153
1
    ASSERT_EQ(log_->GetLatestEntryOpId(), OpId(term, index));
154
1
  }
155
156
  void CheckLastRemoteEntry(
157
2
      DelayablePeerProxy<NoOpTestPeerProxy>* proxy, int64_t term, int64_t index) {
158
2
    ASSERT_EQ(OpId::FromPB(proxy->proxy()->last_received()), OpId(term, index));
159
2
  }
160
161
 protected:
162
  unique_ptr<ThreadPool> raft_pool_;
163
  std::unique_ptr<TestRaftConsensusQueueIface> consensus_;
164
  MetricRegistry metric_registry_;
165
  scoped_refptr<MetricEntity> metric_entity_;
166
  std::unique_ptr<FsManager> fs_manager_;
167
  unique_ptr<ThreadPool> log_thread_pool_;
168
  scoped_refptr<Log> log_;
169
  std::unique_ptr<PeerMessageQueue> message_queue_;
170
  const Schema schema_;
171
  LogOptions options_;
172
  unique_ptr<ThreadPoolToken> raft_pool_token_;
173
  scoped_refptr<server::Clock> clock_;
174
  std::unique_ptr<Messenger> messenger_;
175
};
176
177
// Tests that a remote peer is correctly built and tracked
178
// by the message queue.
179
// After the operations are considered done the proxy (which
180
// simulates the other endpoint) should reflect the replicated
181
// messages.
182
1
TEST_F(ConsensusPeersTest, TestRemotePeer) {
183
  // We use a majority size of 2 since we make one fake remote peer
184
  // in addition to our real local log.
185
186
1
  std::shared_ptr<Peer> remote_peer;
187
1
  auto se = ScopeExit([&remote_peer] {
188
    // This guarantees that the Peer object doesn't get destroyed if there is a pending request.
189
1
    remote_peer->Close();
190
1
  });
191
192
1
  DelayablePeerProxy<NoOpTestPeerProxy>* proxy = NewRemotePeer(kFollowerUuid, &remote_peer);
193
194
  // Append a bunch of messages to the queue.
195
1
  AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 20);
196
197
  // The above append ends up appending messages in term 2, so we update the peer's term to match.
198
1
  remote_peer->SetTermForTest(2);
199
200
  // signal the peer there are requests pending.
201
1
  ASSERT_OK(remote_peer->SignalRequest(RequestTriggerMode::kNonEmptyOnly));
202
203
  // Now wait on the status of the last operation. This will complete once the peer has logged all
204
  // requests.
205
1
  consensus_->WaitForMajorityReplicatedIndex(20);
206
  // Verify that the replicated watermark corresponds to the last replicated message.
207
1
  CheckLastRemoteEntry(proxy, 2, 20);
208
1
}
209
210
1
TEST_F(ConsensusPeersTest, TestLocalAppendAndRemotePeerDelay) {
211
  // Create a set of remote peers.
212
1
  std::shared_ptr<Peer> remote_peer1;
213
1
  NewRemotePeer("peer-1", &remote_peer1);
214
215
1
  std::shared_ptr<Peer> remote_peer2;
216
1
  DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer2_proxy =
217
1
      NewRemotePeer("peer-2", &remote_peer2);
218
219
1
  auto se = ScopeExit([&remote_peer1, &remote_peer2] {
220
    // This guarantees that the Peer objects don't get destroyed if there is a pending request.
221
1
    remote_peer1->Close();
222
1
    remote_peer2->Close();
223
1
  });
224
225
  // Delay the response from the second remote peer.
226
1
  const auto kAppendDelayTime = 1s;
227
1
  log_->TEST_SetSleepDuration(kAppendDelayTime);
228
1
  remote_peer2_proxy->DelayResponse();
229
1
  auto se2 = ScopeExit([this, &remote_peer2_proxy] {
230
1
    log_->TEST_SetSleepDuration(0s);
231
1
    remote_peer2_proxy->Respond(TestPeerProxy::kUpdate);
232
1
  });
233
234
  // Append one message to the queue.
235
1
  const auto start_time = MonoTime::Now();
236
1
  OpIdPB first = MakeOpId(0, 1);
237
1
  AppendReplicateMessagesToQueue(message_queue_.get(), clock_, first.index(), 1);
238
239
1
  ASSERT_OK(remote_peer1->SignalRequest(RequestTriggerMode::kNonEmptyOnly));
240
1
  ASSERT_OK(remote_peer2->SignalRequest(RequestTriggerMode::kNonEmptyOnly));
241
242
  // Replication should time out, because of the delayed response.
243
1
  consensus_->WaitForMajorityReplicatedIndex(first.index());
244
1
  const auto elapsed_time = MonoTime::Now() - start_time;
245
1
  LOG(INFO) << "Replication elapsed time: " << elapsed_time;
246
  // Replication should take at least as much time as it takes the local peer to append, because
247
  // there is only one remote peer that is responding.
248
1
  ASSERT_GE(elapsed_time, kAppendDelayTime);
249
1
}
250
251
1
TEST_F(ConsensusPeersTest, TestRemotePeers) {
252
  // Create a set of remote peers.
253
1
  std::shared_ptr<Peer> remote_peer1;
254
1
  DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer1_proxy =
255
1
      NewRemotePeer("peer-1", &remote_peer1);
256
257
1
  std::shared_ptr<Peer> remote_peer2;
258
1
  DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer2_proxy =
259
1
      NewRemotePeer("peer-2", &remote_peer2);
260
261
1
  auto se = ScopeExit([&remote_peer1, &remote_peer2] {
262
    // This guarantees that the Peer objects don't get destroyed if there is a pending request.
263
1
    remote_peer1->Close();
264
1
    remote_peer2->Close();
265
1
  });
266
267
  // Delay the response from the second remote peer.
268
1
  remote_peer2_proxy->DelayResponse();
269
270
  // Append one message to the queue.
271
1
  OpId first(0, 1);
272
273
1
  AppendReplicateMessagesToQueue(message_queue_.get(), clock_, first.index, 1);
274
275
1
  ASSERT_OK(remote_peer1->SignalRequest(RequestTriggerMode::kNonEmptyOnly));
276
1
  ASSERT_OK(remote_peer2->SignalRequest(RequestTriggerMode::kNonEmptyOnly));
277
278
  // Now wait for the message to be replicated, this should succeed since
279
  // majority = 2 and only one peer was delayed. The majority is made up
280
  // of remote-peer1 and the local log.
281
1
  consensus_->WaitForMajorityReplicatedIndex(first.index);
282
283
1
  SCOPED_TRACE(Format(
284
1
      "Written to log locally: $0, Received by peer1: {$1}, by peer2: {$2}",
285
1
      log_->GetLatestEntryOpId(),
286
1
      remote_peer1_proxy->proxy()->last_received(),
287
1
      remote_peer2_proxy->proxy()->last_received()));
288
289
1
  CheckLastLogEntry(first.term, first.index);
290
1
  CheckLastRemoteEntry(remote_peer1_proxy, first.term, first.index);
291
292
1
  remote_peer2_proxy->Respond(TestPeerProxy::kUpdate);
293
  // Wait until all peers have replicated the message, otherwise
294
  // when we add the next one remote_peer2 might find the next message
295
  // in the queue and will replicate it, which is not what we want.
296
2
  while (message_queue_->TEST_GetAllReplicatedIndex() != first) {
297
1
    std::this_thread::sleep_for(1ms);
298
1
  }
299
300
  // Now append another message to the queue.
301
1
  AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 2, 1);
302
303
  // We should not see it replicated, even after 10ms,
304
  // since only the local peer replicates the message.
305
1
  SleepFor(MonoDelta::FromMilliseconds(10));
306
1
  ASSERT_FALSE(consensus_->IsMajorityReplicated(2));
307
308
  // Signal one of the two remote peers.
309
1
  ASSERT_OK(remote_peer1->SignalRequest(RequestTriggerMode::kNonEmptyOnly));
310
  // We should now be able to wait for it to replicate, since two peers (a majority)
311
  // have replicated the message.
312
1
  consensus_->WaitForMajorityReplicatedIndex(2);
313
1
}
314
315
// Regression test for KUDU-699: even if a peer isn't making progress,
316
// and thus always has data pending, we should be able to close the peer.
317
1
TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
318
1
  auto mock_proxy = new MockedPeerProxy(raft_pool_.get());
319
1
  auto peer = ASSERT_RESULT(Peer::NewRemotePeer(
320
1
      FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, PeerProxyPtr(mock_proxy),
321
1
      message_queue_.get(), nullptr /* multi raft batcher */,
322
1
      raft_pool_token_.get(), nullptr /* consensus */,
323
1
      messenger_.get()));
324
325
  // Make the peer respond without making any progress -- it always returns
326
  // that it has only replicated op 0.0. When we see the response, we always
327
  // decide that more data is pending, and we want to send another request.
328
1
  ConsensusResponsePB peer_resp;
329
1
  peer_resp.set_responder_uuid(kFollowerUuid);
330
1
  peer_resp.set_responder_term(0);
331
1
  peer_resp.mutable_status()->mutable_last_received()->CopyFrom(
332
1
      MakeOpId(0, 0));
333
1
  peer_resp.mutable_status()->mutable_last_received_current_leader()->CopyFrom(
334
1
      MakeOpId(0, 0));
335
1
  peer_resp.mutable_status()->set_last_committed_idx(0);
336
337
1
  mock_proxy->set_update_response(peer_resp);
338
339
  // Add an op to the queue and start sending requests to the peer.
340
1
  AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 1);
341
1
  ASSERT_OK(peer->SignalRequest(RequestTriggerMode::kAlwaysSend));
342
343
  // We should be able to close the peer even though it has more data pending.
344
1
  peer->Close();
345
1
}
346
347
1
TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
348
1
  auto mock_proxy = new MockedPeerProxy(raft_pool_.get());
349
1
  auto peer = ASSERT_RESULT(Peer::NewRemotePeer(
350
1
      FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, PeerProxyPtr(mock_proxy),
351
1
      message_queue_.get(), nullptr /* multi raft batcher */, raft_pool_token_.get(),
352
1
      nullptr /* consensus */,
353
1
      messenger_.get()));
354
355
1
  auto se = ScopeExit([&peer] {
356
    // This guarantees that the Peer object doesn't get destroyed if there is a pending request.
357
1
    peer->Close();
358
1
  });
359
360
  // Initial response has to be successful -- otherwise we'll consider the peer "new" and only send
361
  // heartbeat RPCs.
362
1
  ConsensusResponsePB initial_resp;
363
1
  initial_resp.set_responder_uuid(kFollowerUuid);
364
1
  initial_resp.set_responder_term(0);
365
1
  initial_resp.mutable_status()->mutable_last_received()->CopyFrom(
366
1
      MakeOpId(1, 1));
367
1
  initial_resp.mutable_status()->mutable_last_received_current_leader()->CopyFrom(
368
1
      MakeOpId(1, 1));
369
1
  initial_resp.mutable_status()->set_last_committed_idx(0);
370
1
  mock_proxy->set_update_response(initial_resp);
371
372
1
  AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 1);
373
1
  LOG(INFO) << "Initial SignalRequest";
374
1
  ASSERT_OK(peer->SignalRequest(RequestTriggerMode::kAlwaysSend));
375
1
  LOG(INFO) << "Initial SignalRequest done";
376
377
  // Now wait for the message to be replicated, this should succeed since the local (leader) peer
378
  // always acks and the follower also acked this time.
379
1
  consensus_->WaitForMajorityReplicatedIndex(1);
380
1
  LOG(INFO) << "Message replicated, setting error response";
381
382
  // Set up the peer to respond with an error.
383
1
  ConsensusResponsePB error_resp;
384
1
  error_resp.mutable_error()->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR);
385
1
  StatusToPB(STATUS(NotFound, "fake error"), error_resp.mutable_error()->mutable_status());
386
1
  mock_proxy->set_update_response(error_resp);
387
388
  // Up to this point we might have sent a lot of updates: we get the response from the fake peer
389
  // that it accepted our entry, we consider it replicated, and we are trying to tell the fake peer
390
  // about that, but it replies with the same canned response without bumping up the committed
391
  // index. We can keep spinning in this loop for a few hundred times until we replace the response
392
  // with an error. At the end of the test we should just consider the UpdateAsync calls made after
393
  // this point.
394
1
  int initial_update_count = mock_proxy->update_count();
395
396
  // Add a bunch of messages to the queue.
397
100
  for (int i = 2; i <= 100; i++) {
398
99
    AppendReplicateMessagesToQueue(message_queue_.get(), clock_, i, /* count */ 1);
399
99
    ASSERT_OK(peer->SignalRequest(RequestTriggerMode::kNonEmptyOnly));
400
    // Sleep for a longer time during the first iteration so we have a higher chance of handling
401
    // the response and incrementing failed_attempts_.
402
99
    std::this_thread::sleep_for(i == 2 ? 100ms : 2ms);
403
99
  }
404
405
1
  LOG(INFO) << EXPR_VALUE_FOR_LOG(mock_proxy->update_count());
406
1
  LOG(INFO) << EXPR_VALUE_FOR_LOG(initial_update_count);
407
  // Check that we didn't attempt to send one UpdateConsensus call per
408
  // Write. 100 writes might have taken a second or two, though, so it's
409
  // OK to have called UpdateConsensus() a few times due to regularly
410
  // scheduled heartbeats.
411
1
  ASSERT_LT(mock_proxy->update_count() - initial_update_count, 5);
412
1
}
413
414
}  // namespace consensus
415
}  // namespace yb