YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_peer-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <glog/logging.h>
34
#include <gtest/gtest.h>
35
36
#include "yb/common/hybrid_time.h"
37
#include "yb/common/wire_protocol-test-util.h"
38
#include "yb/common/wire_protocol.h"
39
40
#include "yb/consensus/consensus.h"
41
#include "yb/consensus/consensus_fwd.h"
42
#include "yb/consensus/consensus_meta.h"
43
#include "yb/consensus/log.h"
44
#include "yb/consensus/log_anchor_registry.h"
45
#include "yb/consensus/log_reader.h"
46
#include "yb/consensus/log_util.h"
47
#include "yb/consensus/metadata.pb.h"
48
#include "yb/consensus/opid_util.h"
49
#include "yb/consensus/multi_raft_batcher.h"
50
#include "yb/consensus/state_change_context.h"
51
52
#include "yb/gutil/bind.h"
53
#include "yb/gutil/macros.h"
54
55
#include "yb/rpc/messenger.h"
56
#include "yb/rpc/proxy.h"
57
58
#include "yb/server/clock.h"
59
#include "yb/server/logical_clock.h"
60
61
#include "yb/tablet/tablet-test-util.h"
62
#include "yb/tablet/tablet.h"
63
#include "yb/tablet/tablet_metadata.h"
64
#include "yb/tablet/tablet_peer.h"
65
#include "yb/tablet/write_query.h"
66
67
#include "yb/tserver/tserver.pb.h"
68
69
#include "yb/util/metrics.h"
70
#include "yb/util/result.h"
71
#include "yb/util/status_log.h"
72
#include "yb/util/test_macros.h"
73
#include "yb/util/threadpool.h"
74
75
METRIC_DECLARE_entity(table);
76
METRIC_DECLARE_entity(tablet);
77
78
DECLARE_int32(log_min_seconds_to_retain);
79
80
DECLARE_bool(quick_leader_election_on_create);
81
82
namespace yb {
83
namespace tablet {
84
85
using consensus::Consensus;
86
using consensus::ConsensusBootstrapInfo;
87
using consensus::ConsensusMetadata;
88
using consensus::MakeOpId;
89
using consensus::MinimumOpId;
90
using consensus::OpIdEquals;
91
using consensus::RaftPeerPB;
92
using consensus::WRITE_OP;
93
using docdb::KeyValueWriteBatchPB;
94
using log::Log;
95
using log::LogAnchorRegistry;
96
using log::LogOptions;
97
using server::Clock;
98
using server::LogicalClock;
99
using std::shared_ptr;
100
using std::string;
101
using strings::Substitute;
102
using tserver::WriteRequestPB;
103
using tserver::WriteResponsePB;
104
105
4
static Schema GetTestSchema() {
106
4
  return Schema({ ColumnSchema("key", INT32) }, 1);
107
4
}
108
109
class TabletPeerTest : public YBTabletTest {
110
 public:
111
  TabletPeerTest()
112
    : YBTabletTest(GetTestSchema(), YQL_TABLE_TYPE),
113
      insert_counter_(0),
114
4
      delete_counter_(0) {
115
4
  }
116
117
4
  void SetUp() override {
118
4
    YBTabletTest::SetUp();
119
120
4
    ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
121
4
    ASSERT_OK(ThreadPoolBuilder("prepare").Build(&tablet_prepare_pool_));
122
123
4
    rpc::MessengerBuilder builder(CURRENT_TEST_NAME());
124
4
    messenger_ = ASSERT_RESULT(builder.Build());
125
4
    proxy_cache_ = std::make_unique<rpc::ProxyCache>(messenger_.get());
126
127
4
    table_metric_entity_ = METRIC_ENTITY_table.Instantiate(&metric_registry_, "test-table");
128
4
    tablet_metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
129
130
4
    RaftPeerPB config_peer;
131
4
    config_peer.set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
132
4
    config_peer.set_member_type(consensus::PeerMemberType::VOTER);
133
4
    auto addr = config_peer.mutable_last_known_private_addr()->Add();
134
4
    addr->set_host("fake-host");
135
4
    addr->set_port(0);
136
137
4
    multi_raft_manager_ = std::make_unique<consensus::MultiRaftManager>(messenger_.get(),
138
4
                                                                        proxy_cache_.get(),
139
4
                                                                        config_peer.cloud_info());
140
141
    // "Bootstrap" and start the TabletPeer.
142
4
    tablet_peer_.reset(new TabletPeer(
143
4
        make_scoped_refptr(tablet()->metadata()), config_peer, clock(),
144
4
        tablet()->metadata()->fs_manager()->uuid(),
145
4
        Bind(
146
4
            &TabletPeerTest::TabletPeerStateChangedCallback,
147
4
            Unretained(this),
148
4
            tablet()->tablet_id()),
149
4
        &metric_registry_,
150
4
        nullptr, // tablet_splitter
151
4
        std::shared_future<client::YBClient*>()));
152
153
    // Make TabletPeer use the same LogAnchorRegistry as the Tablet created by the harness.
154
    // TODO: Refactor TabletHarness to allow taking a LogAnchorRegistry, while also providing
155
    // RaftGroupMetadata for consumption by TabletPeer before Tablet is instantiated.
156
4
    tablet_peer_->log_anchor_registry_ = tablet()->log_anchor_registry_;
157
158
4
    consensus::RaftConfigPB config;
159
4
    config.add_peers()->CopyFrom(config_peer);
160
4
    config.set_opid_index(consensus::kInvalidOpIdIndex);
161
162
4
    std::unique_ptr<ConsensusMetadata> cmeta;
163
4
    ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(),
164
4
                                        tablet()->tablet_id(),
165
4
                                        tablet()->metadata()->fs_manager()->uuid(),
166
4
                                        config,
167
4
                                        consensus::kMinimumTerm,
168
4
                                        &cmeta));
169
170
4
    ASSERT_OK(ThreadPoolBuilder("log")
171
4
                 .unlimited_threads()
172
4
                 .Build(&log_thread_pool_));
173
4
    scoped_refptr<Log> log;
174
4
    ASSERT_OK(Log::Open(LogOptions(), tablet()->tablet_id(),
175
4
                        tablet()->metadata()->wal_dir(), tablet()->metadata()->fs_manager()->uuid(),
176
4
                        *tablet()->schema(), tablet()->metadata()->schema_version(),
177
4
                        table_metric_entity_.get(), tablet_metric_entity_.get(),
178
4
                        log_thread_pool_.get(), log_thread_pool_.get(),
179
4
                        tablet()->metadata()->cdc_min_replicated_index(), &log));
180
181
4
    ASSERT_OK(tablet_peer_->SetBootstrapping());
182
4
    ASSERT_OK(tablet_peer_->InitTabletPeer(tablet(),
183
4
                                           nullptr /* server_mem_tracker */,
184
4
                                           messenger_.get(),
185
4
                                           proxy_cache_.get(),
186
4
                                           log,
187
4
                                           table_metric_entity_,
188
4
                                           tablet_metric_entity_,
189
4
                                           raft_pool_.get(),
190
4
                                           tablet_prepare_pool_.get(),
191
4
                                           nullptr /* retryable_requests */,
192
4
                                           multi_raft_manager_.get()));
193
4
  }
194
195
3
  CHECKED_STATUS StartPeer(const ConsensusBootstrapInfo& info) {
196
3
    RETURN_NOT_OK(tablet_peer_->Start(info));
197
198
3
    return LoggedWaitFor([&]() -> Result<bool> {
199
3
      if (FLAGS_quick_leader_election_on_create) {
200
0
        return tablet_peer_->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY;
201
0
      }
202
3
      RETURN_NOT_OK(tablet_peer_->consensus()->EmulateElection());
203
3
      return true;
204
3
    }, MonoDelta::FromMilliseconds(500), "If quick leader elections enabled, wait for peer to be a "
205
3
                                         "leader, otherwise emulate.");
206
3
  }
207
208
  void TabletPeerStateChangedCallback(
209
      const string& tablet_id,
210
11
      std::shared_ptr<consensus::StateChangeContext> context) {
211
11
    LOG(INFO) << "Tablet peer state changed for tablet " << tablet_id
212
11
              << ". Reason: " << context->ToString();
213
11
  }
214
215
4
  void TearDown() override {
216
4
    messenger_->Shutdown();
217
4
    WARN_NOT_OK(tablet_peer_->Shutdown(), "Tablet peer shutdown failed");
218
4
    YBTabletTest::TearDown();
219
4
  }
220
221
 protected:
222
  // Generate monotonic sequence of key column integers.
223
11
  void GenerateSequentialInsertRequest(WriteRequestPB* write_req) {
224
11
    write_req->set_tablet_id(tablet()->tablet_id());
225
11
    AddTestRowInsert(insert_counter_++, write_req);
226
11
  }
227
228
  // Generate monotonic sequence of deletions, starting with 0.
229
  // Will assert if you try to delete more rows than you inserted.
230
2
  void GenerateSequentialDeleteRequest(WriteRequestPB* write_req) {
231
2
    CHECK_LT(delete_counter_, insert_counter_);
232
2
    write_req->set_tablet_id(tablet()->tablet_id());
233
2
    AddTestRowDelete(delete_counter_++, write_req);
234
2
  }
235
236
13
  Status ExecuteWriteAndRollLog(TabletPeer* tablet_peer, const WriteRequestPB& req) {
237
13
    WriteResponsePB resp;
238
13
    auto query = std::make_unique<WriteQuery>(
239
13
        /* leader_term */ 1, CoarseTimePoint::max(), tablet_peer, tablet_peer->tablet(), &resp);
240
13
    query->set_client_request(req);
241
242
13
    CountDownLatch rpc_latch(1);
243
13
    query->set_callback(MakeLatchOperationCompletionCallback(&rpc_latch, &resp));
244
245
13
    tablet_peer->WriteAsync(std::move(query));
246
13
    rpc_latch.Wait();
247
0
    CHECK(!resp.has_error())
248
0
        << "\nReq:\n" << req.DebugString() << "Resp:\n" << resp.DebugString();
249
250
13
    Synchronizer synchronizer;
251
13
    CHECK_OK(tablet_peer->log_->TEST_SubmitFuncToAppendToken([&synchronizer, tablet_peer] {
252
13
      synchronizer.StatusCB(tablet_peer->log_->AllocateSegmentAndRollOver());
253
13
    }));
254
13
    return synchronizer.Wait();
255
13
  }
256
257
  // Execute insert requests and roll log after each one.
258
4
  CHECKED_STATUS ExecuteInsertsAndRollLogs(int num_inserts) {
259
15
    for (int i = 0; i < num_inserts; i++) {
260
11
      WriteRequestPB req;
261
11
      GenerateSequentialInsertRequest(&req);
262
11
      RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_peer_.get(), req));
263
11
    }
264
265
4
    return Status::OK();
266
4
  }
267
268
  // Execute delete requests and roll log after each one.
269
1
  Status ExecuteDeletesAndRollLogs(int num_deletes) {
270
3
    for (int i = 0; i < num_deletes; i++) {
271
2
      WriteRequestPB req;
272
2
      GenerateSequentialDeleteRequest(&req);
273
2
      CHECK_OK(ExecuteWriteAndRollLog(tablet_peer_.get(), req));
274
2
    }
275
276
1
    return Status::OK();
277
1
  }
278
279
  // Assert that the Log GC() anchor is earlier than the latest OpId in the Log.
280
2
  void AssertLogAnchorEarlierThanLogLatest() {
281
2
    int64_t earliest_index = ASSERT_RESULT(tablet_peer_->GetEarliestNeededLogIndex());
282
2
    auto last_log_opid = tablet_peer_->log_->GetLatestEntryOpId();
283
4
    ASSERT_LE(earliest_index, last_log_opid.index)
284
4
      << "Expected valid log anchor, got earliest opid: " << earliest_index
285
4
      << " (expected any value earlier than last log id: " << last_log_opid << ")";
286
2
  }
287
288
  // We disable automatic log GC. Don't leak those changes.
289
  google::FlagSaver flag_saver_;
290
291
  int32_t insert_counter_;
292
  int32_t delete_counter_;
293
  MetricRegistry metric_registry_;
294
  scoped_refptr<MetricEntity> table_metric_entity_;
295
  scoped_refptr<MetricEntity> tablet_metric_entity_;
296
  std::unique_ptr<rpc::Messenger> messenger_;
297
  std::unique_ptr<rpc::ProxyCache> proxy_cache_;
298
  std::unique_ptr<ThreadPool> raft_pool_;
299
  std::unique_ptr<ThreadPool> tablet_prepare_pool_;
300
  std::unique_ptr<ThreadPool> log_thread_pool_;
301
  std::shared_ptr<TabletPeer> tablet_peer_;
302
  std::unique_ptr<consensus::MultiRaftManager> multi_raft_manager_;
303
};
304
305
// Ensure that Log::GC() doesn't delete logs with anchors.
306
1
TEST_F(TabletPeerTest, TestLogAnchorsAndGC) {
307
1
  FLAGS_log_min_seconds_to_retain = 0;
308
1
  ConsensusBootstrapInfo info;
309
1
  ASSERT_OK(StartPeer(info));
310
311
1
  Log* log = tablet_peer_->log();
312
1
  int32_t num_gced;
313
314
1
  log::SegmentSequence segments;
315
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
316
317
1
  ASSERT_EQ(1, segments.size());
318
1
  ASSERT_OK(ExecuteInsertsAndRollLogs(3));
319
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
320
1
  ASSERT_EQ(4, segments.size());
321
322
1
  ASSERT_NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
323
324
  // Ensure nothing gets deleted.
325
1
  int64_t min_log_index = ASSERT_RESULT(tablet_peer_->GetEarliestNeededLogIndex());
326
1
  ASSERT_OK(log->GC(min_log_index, &num_gced));
327
2
  ASSERT_EQ(2, num_gced) << "Earliest needed: " << min_log_index;
328
329
  // Flush RocksDB to ensure that we don't have OpId in anchors.
330
1
  ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync));
331
332
  // The first two segments should be deleted.
333
  // The last is anchored due to the commit in the last segment being the last
334
  // OpId in the log.
335
1
  int32_t earliest_needed = 0;
336
1
  auto total_segments = log->GetLogReader()->num_segments();
337
1
  min_log_index = ASSERT_RESULT(tablet_peer_->GetEarliestNeededLogIndex());
338
1
  ASSERT_OK(log->GC(min_log_index, &num_gced));
339
2
  ASSERT_EQ(earliest_needed, num_gced) << "earliest needed: " << min_log_index;
340
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
341
1
  ASSERT_EQ(total_segments - earliest_needed, segments.size());
342
1
}
343
344
// Ensure that Log::GC() doesn't delete logs when the DMS has an anchor.
345
1
TEST_F(TabletPeerTest, TestDMSAnchorPreventsLogGC) {
346
1
  FLAGS_log_min_seconds_to_retain = 0;
347
1
  ConsensusBootstrapInfo info;
348
1
  ASSERT_OK(StartPeer(info));
349
350
1
  Log* log = tablet_peer_->log_.get();
351
1
  int32_t num_gced;
352
353
1
  log::SegmentSequence segments;
354
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
355
356
1
  ASSERT_EQ(1, segments.size());
357
1
  ASSERT_OK(ExecuteInsertsAndRollLogs(2));
358
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
359
1
  ASSERT_EQ(3, segments.size());
360
361
  // Flush RocksDB so the next mutation goes into a DMS.
362
1
  ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync));
363
364
1
  int32_t earliest_needed = 1;
365
1
  auto total_segments = log->GetLogReader()->num_segments();
366
1
  int64_t min_log_index = ASSERT_RESULT(tablet_peer_->GetEarliestNeededLogIndex());
367
1
  ASSERT_OK(log->GC(min_log_index, &num_gced));
368
  // We will only GC 1, and have 1 left because the earliest needed OpId falls
369
  // back to the latest OpId written to the Log if no anchors are set.
370
1
  ASSERT_EQ(earliest_needed, num_gced);
371
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
372
1
  ASSERT_EQ(total_segments - earliest_needed, segments.size());
373
374
1
  auto id = log->GetLatestEntryOpId();
375
1
  LOG(INFO) << "Before: " << id;
376
377
  // We currently have no anchors and the last operation in the log is 0.3
378
  // Before the below was ExecuteDeletesAndRollLogs(1) but that was breaking
379
  // what I think is a wrong assertion.
380
  // I.e. since 0.4 is the last operation that we know is in memory 0.4 is the
381
  // last anchor we expect _and_ it's the last op in the log.
382
  // Only if we apply two operations is the last anchored operation and the
383
  // last operation in the log different.
384
385
  // Execute a mutation.
386
1
  ASSERT_OK(ExecuteDeletesAndRollLogs(2));
387
1
  ASSERT_NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
388
389
1
  total_segments += 1;
390
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
391
1
  ASSERT_EQ(total_segments, segments.size());
392
393
  // Execute another couple inserts, but Flush it so it doesn't anchor.
394
1
  ASSERT_OK(ExecuteInsertsAndRollLogs(2));
395
1
  total_segments += 2;
396
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
397
1
  ASSERT_EQ(total_segments, segments.size());
398
399
  // Ensure the delta and last insert remain in the logs, anchored by the delta.
400
  // Note that this will allow GC of the 2nd insert done above.
401
1
  earliest_needed = 4;
402
1
  min_log_index = ASSERT_RESULT(tablet_peer_->GetEarliestNeededLogIndex());
403
1
  ASSERT_OK(log->GC(min_log_index, &num_gced));
404
1
  ASSERT_EQ(earliest_needed, num_gced);
405
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
406
1
  ASSERT_EQ(total_segments - earliest_needed, segments.size());
407
408
1
  earliest_needed = 0;
409
1
  total_segments = log->GetLogReader()->num_segments();
410
  // We should only hang onto one segment due to no anchors.
411
  // The last log OpId is the commit in the last segment, so it only anchors
412
  // that segment, not the previous, because it's not the first OpId in the
413
  // segment.
414
1
  min_log_index = ASSERT_RESULT(tablet_peer_->GetEarliestNeededLogIndex());
415
1
  ASSERT_OK(log->GC(min_log_index, &num_gced));
416
1
  ASSERT_EQ(earliest_needed, num_gced);
417
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
418
1
  ASSERT_EQ(total_segments - earliest_needed, segments.size());
419
1
}
420
421
// Ensure that Log::GC() doesn't compact logs with OpIds of active transactions.
422
1
TEST_F(TabletPeerTest, TestActiveOperationPreventsLogGC) {
423
1
  FLAGS_log_min_seconds_to_retain = 0;
424
1
  ConsensusBootstrapInfo info;
425
1
  ASSERT_OK(StartPeer(info));
426
427
1
  Log* log = tablet_peer_->log_.get();
428
429
1
  log::SegmentSequence segments;
430
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
431
432
1
  ASSERT_EQ(1, segments.size());
433
1
  ASSERT_OK(ExecuteInsertsAndRollLogs(4));
434
1
  ASSERT_OK(log->GetLogReader()->GetSegmentsSnapshot(&segments));
435
1
  ASSERT_EQ(5, segments.size());
436
1
}
437
438
1
TEST_F(TabletPeerTest, TestGCEmptyLog) {
439
1
  ConsensusBootstrapInfo info;
440
1
  ASSERT_OK(tablet_peer_->Start(info));
441
  // We don't wait on consensus on purpose.
442
1
  ASSERT_OK(tablet_peer_->RunLogGC());
443
1
}
444
445
} // namespace tablet
446
} // namespace yb