YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tserver/ts_tablet_manager-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <memory>
34
#include <set>
35
#include <string>
36
#include <vector>
37
38
#include <gtest/gtest.h>
39
40
#include "yb/common/common.pb.h"
41
#include "yb/common/index.h"
42
#include "yb/common/partition.h"
43
#include "yb/common/schema.h"
44
45
#include "yb/consensus/consensus.pb.h"
46
#include "yb/consensus/consensus_round.h"
47
#include "yb/consensus/metadata.pb.h"
48
#include "yb/consensus/raft_consensus.h"
49
50
#include "yb/docdb/docdb_rocksdb_util.h"
51
52
#include "yb/fs/fs_manager.h"
53
54
#include "yb/master/master_heartbeat.pb.h"
55
56
#include "yb/rocksdb/db.h"
57
#include "yb/rocksdb/rate_limiter.h"
58
59
#include "yb/tablet/tablet-harness.h"
60
#include "yb/tablet/tablet.h"
61
#include "yb/tablet/tablet_metadata.h"
62
#include "yb/tablet/tablet_peer.h"
63
64
#include "yb/tserver/mini_tablet_server.h"
65
#include "yb/tserver/tablet_memory_manager.h"
66
#include "yb/tserver/tablet_server.h"
67
#include "yb/tserver/ts_tablet_manager.h"
68
69
#include "yb/util/format.h"
70
#include "yb/util/test_util.h"
71
72
#define ASSERT_REPORT_HAS_UPDATED_TABLET(report, tablet_id) \
73
4
  ASSERT_NO_FATALS(AssertReportHasUpdatedTablet(report, tablet_id))
74
75
#define ASSERT_MONOTONIC_REPORT_SEQNO(report_seqno, tablet_report) \
76
8
  ASSERT_NO_FATALS(AssertMonotonicReportSeqno(report_seqno, tablet_report))
77
78
DECLARE_bool(TEST_pretend_memory_exceeded_enforce_flush);
79
DECLARE_bool(TEST_tserver_disable_heartbeat);
80
DECLARE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec);
81
DECLARE_string(rocksdb_compact_flush_rate_limit_sharing_mode);
82
83
namespace yb {
84
namespace tserver {
85
86
using consensus::kInvalidOpIdIndex;
87
using consensus::RaftConfigPB;
88
using consensus::ConsensusRound;
89
using consensus::ConsensusRoundPtr;
90
using consensus::ReplicateMsg;
91
using docdb::RateLimiterSharingMode;
92
using master::ReportedTabletPB;
93
using master::TabletReportPB;
94
using master::TabletReportUpdatesPB;
95
using strings::Substitute;
96
using tablet::TabletPeer;
97
using gflags::FlagSaver;
98
99
static const char* const kTableId = "my-table-id";
100
static const char* const kTabletId = "my-tablet-id";
101
static const int kConsensusRunningWaitMs = 10000;
102
static const int kDrivesNum = 4;
103
104
class TsTabletManagerTest : public YBTest {
105
 public:
106
  TsTabletManagerTest()
107
7
    : schema_({ ColumnSchema("key", UINT32) }, 1) {
108
7
  }
109
110
73
  string GetDrivePath(int index) {
111
73
    return JoinPathSegments(test_data_root_, Substitute("drive-$0", index + 1));
112
73
  }
113
114
18
  void CreateMiniTabletServer() {
115
18
    auto options_result = TabletServerOptions::CreateTabletServerOptions();
116
18
    ASSERT_OK(options_result);
117
18
    std::vector<std::string> paths;
118
90
    for (int i = 0; i < kDrivesNum; ++i) {
119
72
      auto s = GetDrivePath(i);
120
72
      ASSERT_OK(env_->CreateDirs(s));
121
72
      paths.push_back(s);
122
72
    }
123
18
    mini_server_ = std::make_unique<MiniTabletServer>(paths, paths, 0, *options_result, 0);
124
18
  }
125
126
7
  void SetUp() override {
127
7
    YBTest::SetUp();
128
129
    // Requred before tserver creation as using of `mini_server_->FailHeartbeats()`
130
    // does not guarantee the heartbeat events is off immediately and a couple of events
131
    // may happen until heartbeat's thread sees the effect of `mini_server_->FailHeartbeats()`
132
7
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_tserver_disable_heartbeat) = true;
133
134
7
    test_data_root_ = GetTestPath("TsTabletManagerTest-fsroot");
135
7
    CreateMiniTabletServer();
136
7
    ASSERT_OK(mini_server_->Start());
137
7
    mini_server_->FailHeartbeats();
138
139
7
    config_ = mini_server_->CreateLocalConfig();
140
141
7
    tablet_manager_ = mini_server_->server()->tablet_manager();
142
7
    fs_manager_ = mini_server_->server()->fs_manager();
143
7
  }
144
145
6
  void TearDown() override {
146
6
    if (mini_server_) {
147
6
      mini_server_->Shutdown();
148
6
    }
149
6
  }
150
151
  Status CreateNewTablet(const std::string& table_id,
152
                         const std::string& tablet_id,
153
                         const Schema& schema,
154
34
                         std::shared_ptr<tablet::TabletPeer>* out_tablet_peer) {
155
34
    Schema full_schema = SchemaBuilder(schema).Build();
156
34
    std::pair<PartitionSchema, Partition> partition = tablet::CreateDefaultPartition(full_schema);
157
158
34
    auto table_info = std::make_shared<tablet::TableInfo>(
159
34
        table_id, tablet_id, tablet_id, TableType::DEFAULT_TABLE_TYPE, full_schema, IndexMap(),
160
34
        boost::none /* index_info */, 0 /* schema_version */, partition.first);
161
34
    auto tablet_peer = VERIFY_RESULT(tablet_manager_->CreateNewTablet(
162
34
        table_info, tablet_id, partition.second, config_));
163
34
    if (out_tablet_peer) {
164
15
      (*out_tablet_peer) = tablet_peer;
165
15
    }
166
167
34
    RETURN_NOT_OK(tablet_peer->WaitUntilConsensusRunning(
168
34
          MonoDelta::FromMilliseconds(kConsensusRunningWaitMs)));
169
170
33
    return tablet_peer->consensus()->EmulateElection();
171
34
  }
172
173
4
  void Reload() {
174
4
    LOG(INFO) << "Shutting down tablet manager";
175
4
    mini_server_->Shutdown();
176
4
    LOG(INFO) << "Restarting tablet manager";
177
4
    ASSERT_NO_FATAL_FAILURE(CreateMiniTabletServer());
178
4
    ASSERT_OK(mini_server_->Start());
179
3
    ASSERT_OK(mini_server_->WaitStarted());
180
3
    tablet_manager_ = mini_server_->server()->tablet_manager();
181
3
  }
182
183
5
  void AddTablets(size_t num, TSTabletManager::TabletPeers* peers = nullptr) {
184
    // Add series of tablets
185
5
    ASSERT_NE(num, 0);
186
15
    for (size_t i = 0; i < num; ++i) {
187
10
      std::shared_ptr<TabletPeer> peer;
188
10
      const auto tid = Format("tablet-$0", peers->size());
189
10
      ASSERT_OK(CreateNewTablet(kTableId, tid, schema_, &peer));
190
10
      ASSERT_EQ(tid, peer->tablet()->tablet_id());
191
10
      if (peers) {
192
10
        peers->push_back(peer);
193
10
      }
194
10
    }
195
5
  }
196
197
  Result<TSTabletManager::TabletPeers> GetPeers(
198
3
      boost::optional<size_t> expected_count = boost::none) {
199
3
    auto peers = tablet_manager_->GetTabletPeers(nullptr);
200
3
    if (expected_count.has_value()) {
201
3
      SCHECK_EQ(*expected_count, peers.size(), IllegalState, "Unexpected number of peers");
202
3
    }
203
3
    return std::move(peers);
204
3
  }
205
206
 protected:
207
  std::unique_ptr<MiniTabletServer> mini_server_;
208
  FsManager* fs_manager_;
209
  TSTabletManager* tablet_manager_;
210
211
  Schema schema_;
212
  RaftConfigPB config_;
213
214
  string test_data_root_;
215
};
216
217
1
TEST_F(TsTabletManagerTest, TestCreateTablet) {
218
  // Create a new tablet.
219
1
  std::shared_ptr<TabletPeer> peer;
220
1
  ASSERT_OK(CreateNewTablet(kTableId, kTabletId, schema_, &peer));
221
1
  ASSERT_EQ(kTabletId, peer->tablet()->tablet_id());
222
1
  peer.reset();
223
224
  // Re-load the tablet manager from the filesystem.
225
1
  LOG(INFO) << "Shutting down tablet manager";
226
1
  mini_server_->Shutdown();
227
1
  LOG(INFO) << "Restarting tablet manager";
228
1
  CreateMiniTabletServer();
229
1
  ASSERT_OK(mini_server_->Start());
230
1
  ASSERT_OK(mini_server_->WaitStarted());
231
1
  tablet_manager_ = mini_server_->server()->tablet_manager();
232
233
  // Ensure that the tablet got re-loaded and re-opened off disk.
234
1
  ASSERT_TRUE(tablet_manager_->LookupTablet(kTabletId, &peer));
235
1
  ASSERT_EQ(kTabletId, peer->tablet()->tablet_id());
236
1
}
237
238
1
TEST_F(TsTabletManagerTest, TestTombstonedTabletsAreUnregistered) {
239
1
  const std::string kTableId = "my-table-id";
240
1
  const std::string kTabletId1 = "my-tablet-id-1";
241
1
  const std::string kTabletId2 = "my-tablet-id-2";
242
243
3
  auto shutdown_tserver_and_reload_tablet_manager = [this]() {
244
    // Re-load the tablet manager from the filesystem.
245
3
    LOG(INFO) << "Shutting down tablet manager";
246
3
    mini_server_->Shutdown();
247
3
    LOG(INFO) << "Restarting tablet manager";
248
3
    CreateMiniTabletServer();
249
3
    ASSERT_OK(mini_server_->Start());
250
3
    ASSERT_OK(mini_server_->WaitStarted());
251
3
    tablet_manager_ = mini_server_->server()->tablet_manager();
252
3
  };
253
254
1
  auto count_tablet_in_assignment_map =
255
1
      [&kTableId](const TSTabletManager::TableDiskAssignmentMap* table_assignment_map,
256
24
                  const std::string& tablet_id) {
257
24
        auto table_assignment_iter = table_assignment_map->find(kTableId);
258
24
        EXPECT_NE(table_assignment_iter, table_assignment_map->end());
259
        // the number of data directories for this table should be non-empty.
260
24
        EXPECT_GT(table_assignment_iter->second.size(), 0);
261
24
        int tablet_count = 0;
262
96
        for (const auto& tablet_assignment_iter : table_assignment_iter->second) {
263
          // directory_map maps a directory name to a set of tablet ids.
264
32
          for (const TabletId& tablet : tablet_assignment_iter.second) {
265
32
            if (tablet_id == tablet) {
266
16
              tablet_count++;
267
16
            }
268
32
          }
269
96
        }
270
24
        return tablet_count;
271
24
      };
272
273
1
  auto assert_tablet_assignment_count =
274
12
      [this, &count_tablet_in_assignment_map](const std::string& tablet_id, int count) {
275
12
    ASSERT_EQ(
276
12
        count_tablet_in_assignment_map(&tablet_manager_->table_data_assignment_map_, tablet_id),
277
12
        count);
278
12
    ASSERT_EQ(
279
12
        count_tablet_in_assignment_map(&tablet_manager_->table_wal_assignment_map_, tablet_id),
280
12
        count);
281
12
  };
282
283
  // Create a new tablet.
284
1
  std::shared_ptr<TabletPeer> peer;
285
1
  ASSERT_OK(CreateNewTablet(kTableId, kTabletId1, schema_, &peer));
286
1
  ASSERT_EQ(kTabletId1, peer->tablet()->tablet_id());
287
1
  peer.reset();
288
1
  ASSERT_OK(CreateNewTablet(kTableId, kTabletId2, schema_, &peer));
289
1
  ASSERT_EQ(kTabletId2, peer->tablet()->tablet_id());
290
291
1
  assert_tablet_assignment_count(kTabletId1, 1);
292
1
  assert_tablet_assignment_count(kTabletId2, 1);
293
294
1
  shutdown_tserver_and_reload_tablet_manager();
295
296
1
  assert_tablet_assignment_count(kTabletId1, 1);
297
1
  assert_tablet_assignment_count(kTabletId2, 1);
298
299
1
  boost::optional<int64_t> cas_config_opid_index_less_or_equal;
300
1
  boost::optional<TabletServerErrorPB::Code> error_code;
301
1
  ASSERT_OK(tablet_manager_->DeleteTablet(kTabletId1,
302
1
      tablet::TABLET_DATA_TOMBSTONED,
303
1
      cas_config_opid_index_less_or_equal,
304
1
      false,
305
1
      &error_code));
306
307
1
  assert_tablet_assignment_count(kTabletId1, 0);
308
1
  assert_tablet_assignment_count(kTabletId2, 1);
309
310
1
  shutdown_tserver_and_reload_tablet_manager();
311
312
1
  assert_tablet_assignment_count(kTabletId1, 0);
313
1
  assert_tablet_assignment_count(kTabletId2, 1);
314
315
1
  ASSERT_OK(tablet_manager_->DeleteTablet(kTabletId1,
316
1
                                          tablet::TABLET_DATA_DELETED,
317
1
                                          cas_config_opid_index_less_or_equal,
318
1
                                          false,
319
1
                                          &error_code));
320
321
1
  assert_tablet_assignment_count(kTabletId1, 0);
322
1
  assert_tablet_assignment_count(kTabletId2, 1);
323
324
1
  shutdown_tserver_and_reload_tablet_manager();
325
326
1
  assert_tablet_assignment_count(kTabletId1, 0);
327
1
  assert_tablet_assignment_count(kTabletId2, 1);
328
1
}
329
330
1
TEST_F(TsTabletManagerTest, TestProperBackgroundFlushOnStartup) {
331
1
  FlagSaver flag_saver;
332
1
  FLAGS_TEST_pretend_memory_exceeded_enforce_flush = true;
333
334
1
  const int kNumTablets = 2;
335
1
  const int kNumRestarts = 3;
336
337
1
  std::vector<TabletId> tablet_ids;
338
1
  std::vector<ConsensusRoundPtr> consensus_rounds;
339
340
3
  for (int i = 0; i < kNumTablets; ++i) {
341
2
    std::shared_ptr<TabletPeer> peer;
342
2
    const auto tablet_id = Format("my-tablet-$0", i + 1);
343
2
    tablet_ids.emplace_back(tablet_id);
344
2
    ASSERT_OK(CreateNewTablet(kTableId, tablet_id, schema_, &peer));
345
2
    ASSERT_EQ(tablet_id, peer->tablet()->tablet_id());
346
347
2
    auto replicate_ptr = std::make_shared<ReplicateMsg>();
348
2
    replicate_ptr->set_op_type(consensus::NO_OP);
349
2
    replicate_ptr->set_hybrid_time(peer->clock().Now().ToUint64());
350
2
    ConsensusRoundPtr round(new ConsensusRound(peer->consensus(), std::move(replicate_ptr)));
351
2
    consensus_rounds.emplace_back(round);
352
2
    round->BindToTerm(peer->raft_consensus()->TEST_LeaderTerm());
353
2
    round->SetCallback(consensus::MakeNonTrackedRoundCallback(round.get(), [](const Status&){}));
354
2
    ASSERT_OK(peer->consensus()->TEST_Replicate(round));
355
2
  }
356
357
4
  for (int i = 0; i < kNumRestarts; ++i) {
358
3
    LOG(INFO) << "Shutting down tablet manager";
359
3
    mini_server_->Shutdown();
360
3
    LOG(INFO) << "Restarting tablet manager";
361
3
    CreateMiniTabletServer();
362
3
    ASSERT_OK(mini_server_->Start());
363
3
    auto* tablet_manager = mini_server_->server()->tablet_manager();
364
3
    ASSERT_NE(nullptr, tablet_manager);
365
3
    tablet_manager->tablet_memory_manager()->FlushTabletIfLimitExceeded();
366
3
    ASSERT_OK(mini_server_->WaitStarted());
367
6
    for (auto& tablet_id : tablet_ids) {
368
6
      std::shared_ptr<TabletPeer> peer;
369
6
      ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &peer));
370
6
      ASSERT_EQ(tablet_id, peer->tablet()->tablet_id());
371
6
    }
372
3
  }
373
1
}
374
375
static void AssertMonotonicReportSeqno(int32_t* report_seqno,
376
9
                                       const TabletReportPB& report) {
377
9
  ASSERT_LT(*report_seqno, report.sequence_number());
378
9
  *report_seqno = report.sequence_number();
379
9
}
380
381
static void AssertReportHasUpdatedTablet(const TabletReportPB& report,
382
4
                                         const string& tablet_id) {
383
4
  ASSERT_GE(report.updated_tablets_size(), 0);
384
4
  bool found_tablet = false;
385
6
  for (ReportedTabletPB reported_tablet : report.updated_tablets()) {
386
6
    if (reported_tablet.tablet_id() == tablet_id) {
387
4
      found_tablet = true;
388
4
      ASSERT_TRUE(reported_tablet.has_committed_consensus_state());
389
8
      ASSERT_TRUE(reported_tablet.committed_consensus_state().has_current_term())
390
8
          << reported_tablet.ShortDebugString();
391
8
      ASSERT_TRUE(reported_tablet.committed_consensus_state().has_leader_uuid())
392
8
          << reported_tablet.ShortDebugString();
393
4
      ASSERT_TRUE(reported_tablet.committed_consensus_state().has_config());
394
4
      const RaftConfigPB& committed_config = reported_tablet.committed_consensus_state().config();
395
4
      ASSERT_EQ(kInvalidOpIdIndex, committed_config.opid_index());
396
4
      ASSERT_EQ(1, committed_config.peers_size());
397
8
      ASSERT_TRUE(committed_config.peers(0).has_permanent_uuid())
398
8
          << reported_tablet.ShortDebugString();
399
8
      ASSERT_EQ(committed_config.peers(0).permanent_uuid(),
400
8
                reported_tablet.committed_consensus_state().leader_uuid())
401
8
          << reported_tablet.ShortDebugString();
402
4
    }
403
6
  }
404
4
  ASSERT_TRUE(found_tablet);
405
4
}
406
407
7
static void CopyReportToUpdates(const TabletReportPB& req, TabletReportUpdatesPB* resp) {
408
7
  resp->Clear();
409
2
  for (const auto & tablet : req.updated_tablets()) {
410
2
    auto new_tablet = resp->add_tablets();
411
2
    new_tablet->set_tablet_id(tablet.tablet_id());
412
2
  }
413
7
}
414
415
1
TEST_F(TsTabletManagerTest, TestTabletReports) {
416
1
  TabletReportPB report;
417
1
  TabletReportUpdatesPB updates;
418
1
  int32_t seqno = -1;
419
420
  // Generate a tablet report before any tablets are loaded. Should be empty.
421
1
  tablet_manager_->StartFullTabletReport(&report);
422
1
  ASSERT_EQ(0, report.updated_tablets().size());
423
1
  ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
424
1
  CopyReportToUpdates(report, &updates);
425
1
  tablet_manager_->MarkTabletReportAcknowledged(seqno, updates);
426
427
  // Another report should now be incremental, but with no changes.
428
1
  tablet_manager_->GenerateTabletReport(&report);
429
1
  ASSERT_EQ(0, report.updated_tablets().size());
430
1
  ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
431
1
  CopyReportToUpdates(report, &updates);
432
1
  tablet_manager_->MarkTabletReportAcknowledged(seqno, updates);
433
434
  // Create a tablet and do another incremental report - should include the tablet.
435
1
  ASSERT_OK(CreateNewTablet(kTableId, "tablet-1", schema_, nullptr));
436
1
  int updated_tablets = 0;
437
2
  while (updated_tablets != 1) {
438
1
    tablet_manager_->GenerateTabletReport(&report);
439
1
    updated_tablets = report.updated_tablets().size();
440
1
    ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
441
1
  }
442
443
1
  ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1");
444
445
  // If we don't acknowledge the report, and ask for another incremental report,
446
  // it should include the tablet again.
447
1
  tablet_manager_->GenerateTabletReport(&report);
448
1
  ASSERT_EQ(1, report.updated_tablets().size());
449
1
  ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1");
450
1
  ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
451
452
  // Now acknowledge the last report, and further incrementals should be empty.
453
1
  CopyReportToUpdates(report, &updates);
454
1
  tablet_manager_->MarkTabletReportAcknowledged(seqno, updates);
455
1
  tablet_manager_->GenerateTabletReport(&report);
456
1
  ASSERT_EQ(0, report.updated_tablets().size());
457
1
  ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
458
1
  CopyReportToUpdates(report, &updates);
459
1
  tablet_manager_->MarkTabletReportAcknowledged(seqno, updates);
460
461
  // Create a second tablet, and ensure the incremental report shows it.
462
1
  ASSERT_OK(CreateNewTablet(kTableId, "tablet-2", schema_, nullptr));
463
464
  // Wait up to 10 seconds to get a tablet report from tablet-2.
465
  // TabletPeer does not mark tablets dirty until after it commits the
466
  // initial configuration change, so there is also a window for tablet-1 to
467
  // have been marked dirty since the last report.
468
1
  MonoDelta timeout(MonoDelta::FromSeconds(10));
469
1
  MonoTime start(MonoTime::Now());
470
1
  report.Clear();
471
1
  while (true) {
472
1
    bool found_tablet_2 = false;
473
1
    tablet_manager_->GenerateTabletReport(&report);
474
2
    ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report) << report.ShortDebugString();
475
1
    for (const ReportedTabletPB& reported_tablet : report.updated_tablets()) {
476
1
      if (reported_tablet.tablet_id() == "tablet-2") {
477
1
        found_tablet_2  = true;
478
1
        break;
479
1
      }
480
1
    }
481
1
    if (found_tablet_2) break;
482
0
    MonoDelta elapsed(MonoTime::Now().GetDeltaSince(start));
483
0
    ASSERT_TRUE(elapsed.LessThan(timeout)) << "Waited too long for tablet-2 to be marked dirty: "
484
0
                                           << elapsed.ToString() << ". "
485
0
                                           << "Latest report: " << report.ShortDebugString();
486
0
    SleepFor(MonoDelta::FromMilliseconds(10));
487
0
  }
488
489
1
  CopyReportToUpdates(report, &updates);
490
1
  tablet_manager_->MarkTabletReportAcknowledged(seqno, updates);
491
492
  // Asking for a full tablet report should re-report both tablets
493
1
  tablet_manager_->StartFullTabletReport(&report);
494
1
  ASSERT_EQ(2, report.updated_tablets().size());
495
1
  ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1");
496
1
  ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-2");
497
1
  ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
498
1
}
499
500
1
TEST_F(TsTabletManagerTest, TestTabletReportLimit) {
501
1
  TabletReportPB report;
502
1
  TabletReportUpdatesPB updates;
503
1
  int32_t seqno = -1;
504
505
  // Generate a tablet report before any tablets are loaded. Should be empty.
506
1
  tablet_manager_->StartFullTabletReport(&report);
507
1
  ASSERT_EQ(0, report.updated_tablets().size());
508
1
  ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
509
1
  CopyReportToUpdates(report, &updates);
510
1
  tablet_manager_->MarkTabletReportAcknowledged(seqno, updates);
511
512
  // Another report should now be incremental, but with no changes.
513
1
  tablet_manager_->GenerateTabletReport(&report);
514
1
  ASSERT_EQ(0, report.updated_tablets().size());
515
1
  ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
516
1
  CopyReportToUpdates(report, &updates);
517
1
  tablet_manager_->MarkTabletReportAcknowledged(seqno, updates);
518
519
  // Set a report limit and create a set of tablets clearly over that limit.
520
1
  const int32_t limit = 10, total_tablets = 25;
521
1
  tablet_manager_->SetReportLimit(limit);
522
1
  std::set<std::string> tablet_ids, tablet_ids_full;
523
17
  for (int i = 0; i < total_tablets; ++i) {
524
17
    auto id = "tablet-" + std::to_string(i);
525
17
    ASSERT_OK(CreateNewTablet(kTableId, id, schema_, nullptr));
526
16
    tablet_ids.insert(id);
527
16
    tablet_ids_full.insert(id);
528
16
    LOG(INFO) << "Adding " << id;
529
16
  }
530
531
  // Ensure that incremental report requests returns all in batches.
532
0
  for (int n = limit, left = total_tablets; left > 0; left -= n, n = std::min(limit, left)) {
533
0
    tablet_manager_->GenerateTabletReport(&report);
534
0
    ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
535
0
    ASSERT_EQ(n, report.updated_tablets().size());
536
0
    for (auto& t : report.updated_tablets()) {
537
0
      LOG(INFO) << "Erasing " << t.tablet_id();
538
0
      ASSERT_EQ(1, tablet_ids.erase(t.tablet_id()));
539
0
    }
540
0
    CopyReportToUpdates(report, &updates);
541
0
    tablet_manager_->MarkTabletReportAcknowledged(seqno, updates);
542
0
  }
543
544
  // Generate a Full Report and ensure that the same batching occurs.
545
0
  tablet_manager_->StartFullTabletReport(&report);
546
0
  for (int n = limit, left = total_tablets; left > 0; left -= n, n = std::min(limit, left)) {
547
0
    ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
548
0
    ASSERT_EQ(n, report.updated_tablets().size());
549
0
    for (auto& t : report.updated_tablets()) {
550
0
      ASSERT_EQ(1, tablet_ids_full.erase(t.tablet_id()));
551
0
    }
552
0
    CopyReportToUpdates(report, &updates);
553
0
    tablet_manager_->MarkTabletReportAcknowledged(seqno, updates);
554
0
    tablet_manager_->GenerateTabletReport(&report);
555
0
  }
556
0
  ASSERT_EQ(0, report.updated_tablets().size()); // Last incremental report is empty.
557
0
}
558
559
namespace {
560
561
2
void SetRateLimiterSharingMode(RateLimiterSharingMode mode) {
562
2
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_sharing_mode) = ToString(mode);
563
2
}
564
565
Result<size_t> CountUniqueLimiters(const TSTabletManager::TabletPeers& peers,
566
7
                                   const size_t start_idx = 0) {
567
7
  SCHECK_LT(start_idx, peers.size(), IllegalState,
568
7
            "Start index must be less than number of peers");
569
7
  std::unordered_set<rocksdb::RateLimiter*> unique;
570
47
  for (size_t i = start_idx; i < peers.size(); ++i) {
571
40
    auto db = peers[i]->tablet()->TEST_db();
572
40
    SCHECK_NOTNULL(db);
573
40
    auto rl = db->GetDBOptions().rate_limiter.get();
574
40
    if (rl) {
575
24
      unique.insert(rl);
576
24
    }
577
40
  }
578
7
  return unique.size();
579
7
}
580
581
} // namespace
582
583
1
TEST_F(TsTabletManagerTest, RateLimiterSharing) {
584
  // The test checks rocksdb::RateLimiter is correctly shared between RocksDB instances
585
  // depending on the flags `FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec` and
586
  // `FLAGS_rocksdb_compact_flush_rate_limit_sharing_mode`, inlcuding possible effect
587
  // of changing flags on-the-fly (emulating forced changed)
588
589
  // No tablets exist, reset flags and reload
590
1
  size_t peers_num = 0;
591
1
  constexpr auto kBPS = 128_MB;
592
1
  SetRateLimiterSharingMode(RateLimiterSharingMode::NONE);
593
1
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = kBPS;
594
1
  ASSERT_NO_FATAL_FAILURE(Reload());
595
1
  TSTabletManager::TabletPeers peers = ASSERT_RESULT(GetPeers(peers_num));
596
597
  // `NONE`: add tablets and make sure they have unique limiters
598
1
  ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers));
599
1
  ASSERT_EQ(peers_num + 2, ASSERT_RESULT(CountUniqueLimiters(peers)));
600
1
  peers_num = peers.size();
601
602
  // `NONE`: emulating forced change for bps flag: make sure new unique limiters are created
603
1
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = kBPS / 2;
604
1
  ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers));
605
1
  ASSERT_EQ(peers_num + 2, ASSERT_RESULT(CountUniqueLimiters(peers)));
606
1
  peers_num = peers.size();
607
608
  // `NONE`: emulating forced reset for bps flag: make sure new tablets are added with no limiters
609
1
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = 0;
610
1
  ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers));
611
1
  ASSERT_EQ(0, ASSERT_RESULT(CountUniqueLimiters(peers, peers_num)));
612
1
  peers_num = peers.size();
613
614
  // `NONE` + no bps: reload the cluster with bps flag unset to make sure no limiters are created
615
1
  ASSERT_NO_FATAL_FAILURE(Reload());
616
1
  peers = ASSERT_RESULT(GetPeers(peers_num));
617
1
  ASSERT_EQ(0, ASSERT_RESULT(CountUniqueLimiters(peers)));
618
619
  // `NONE` + no bps: add tablets and make sure limiters are not created
620
1
  ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers));
621
1
  ASSERT_EQ(0, ASSERT_RESULT(CountUniqueLimiters(peers)));
622
1
  peers_num = peers.size();
623
624
  // `NONE`: reload the cluster with bps flag set and make sure all limiter are unique
625
1
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = kBPS;
626
1
  ASSERT_NO_FATAL_FAILURE(Reload());
627
1
  peers = ASSERT_RESULT(GetPeers(peers_num));
628
1
  ASSERT_EQ(peers_num, ASSERT_RESULT(CountUniqueLimiters(peers)));
629
630
  // `NONE`: emulating forced change for mode flag: should act as if `NONE` is still set
631
1
  SetRateLimiterSharingMode(RateLimiterSharingMode::TSERVER);
632
1
  ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers));
633
1
  ASSERT_EQ(peers_num + 2, ASSERT_RESULT(CountUniqueLimiters(peers)));
634
1
  peers_num = peers.size();
635
636
  // `TSERVER`: reload the cluster to apply `TSERVER` sharing mode
637
  // and make sure all tablets share the same rate limiter
638
1
  ASSERT_NO_FATAL_FAILURE(Reload());
639
0
  peers = ASSERT_RESULT(GetPeers(peers_num));
640
0
  ASSERT_EQ(1, ASSERT_RESULT(CountUniqueLimiters(peers)));
641
642
  // `TSERVER`: emulating forced change for bps flag: make sure this has no effect on sharing
643
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = kBPS / 2;
644
0
  ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers));
645
0
  ASSERT_EQ(1, ASSERT_RESULT(CountUniqueLimiters(peers)));
646
0
  peers_num = peers.size();
647
648
  // `TSERVER`: emulating forced reset for bps flag: make sure this has no effect on sharing
649
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = 0;
650
0
  ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers));
651
0
  ASSERT_EQ(1, ASSERT_RESULT(CountUniqueLimiters(peers)));
652
0
  peers_num = peers.size();
653
654
  // `TSERVER`: emulating forced change for mode flag:
655
  // should act as if `TSERVER` is still set
656
0
  SetRateLimiterSharingMode(RateLimiterSharingMode::NONE);
657
0
  ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers));
658
0
  ASSERT_EQ(1, ASSERT_RESULT(CountUniqueLimiters(peers)));
659
0
  peers_num = peers.size();
660
661
  // `TSERVER` + no bps: reload the cluster
662
  //  with bps flag unset to make sure no limiters are created
663
0
  SetRateLimiterSharingMode(RateLimiterSharingMode::TSERVER);
664
0
  ASSERT_NO_FATAL_FAILURE(Reload());
665
0
  peers = ASSERT_RESULT(GetPeers(peers_num));
666
0
  ASSERT_EQ(0, ASSERT_RESULT(CountUniqueLimiters(peers)));
667
668
  // `TSERVER` + no bps: add tablets and make sure no limiters are created
669
0
  ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers));
670
0
  ASSERT_EQ(0, ASSERT_RESULT(CountUniqueLimiters(peers)));
671
0
  peers_num = peers.size();
672
673
  // `TSERVER` + no bps: emulating forced change for both flags:
674
  // should act as a `NONE` is applied with some bps set
675
0
  SetRateLimiterSharingMode(RateLimiterSharingMode::NONE);
676
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec) = kBPS;
677
0
  ASSERT_NO_FATAL_FAILURE(AddTablets(2, &peers));
678
0
  ASSERT_EQ(2, ASSERT_RESULT(CountUniqueLimiters(peers, peers_num)));
679
0
  peers_num = peers.size();
680
0
}
681
682
1
TEST_F(TsTabletManagerTest, DataAndWalFilesLocations) {
683
1
  std::string wal;
684
1
  std::string data;
685
1
  auto drive_path_len = GetDrivePath(0).size();
686
5
  for (int i = 0; i < kDrivesNum; ++i) {
687
4
    tablet_manager_->GetAndRegisterDataAndWalDir(fs_manager_,
688
4
                                                 kTableId,
689
4
                                                 Substitute("tablet-$0", i + 1),
690
4
                                                 &data,
691
4
                                                 &wal);
692
4
    ASSERT_EQ(data.substr(0, drive_path_len), wal.substr(0, drive_path_len));
693
4
  }
694
1
}
695
696
} // namespace tserver
697
} // namespace yb