YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/ql-tablet-test.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include <shared_mutex>
17
#include <thread>
18
19
#include <boost/optional/optional.hpp>
20
21
#include "yb/client/client-test-util.h"
22
#include "yb/client/error.h"
23
#include "yb/client/ql-dml-test-base.h"
24
#include "yb/client/schema.h"
25
#include "yb/client/session.h"
26
#include "yb/client/table_handle.h"
27
#include "yb/client/yb_op.h"
28
29
#include "yb/common/ql_type.h"
30
#include "yb/common/ql_value.h"
31
#include "yb/common/schema.h"
32
33
#include "yb/consensus/consensus.h"
34
#include "yb/consensus/consensus.pb.h"
35
#include "yb/consensus/log.h"
36
#include "yb/consensus/raft_consensus.h"
37
38
#include "yb/docdb/consensus_frontier.h"
39
#include "yb/docdb/doc_key.h"
40
41
#include "yb/gutil/casts.h"
42
43
#include "yb/integration-tests/test_workload.h"
44
45
#include "yb/master/catalog_entity_info.h"
46
#include "yb/master/catalog_manager_if.h"
47
#include "yb/master/master_defaults.h"
48
#include "yb/master/master_ddl.proxy.h"
49
50
#include "yb/rocksdb/db.h"
51
#include "yb/rocksdb/types.h"
52
53
#include "yb/rpc/rpc_controller.h"
54
55
#include "yb/server/skewed_clock.h"
56
57
#include "yb/tablet/tablet.h"
58
#include "yb/tablet/tablet_bootstrap_if.h"
59
#include "yb/tablet/tablet_metadata.h"
60
#include "yb/tablet/tablet_peer.h"
61
#include "yb/tablet/tablet_retention_policy.h"
62
63
#include "yb/tserver/mini_tablet_server.h"
64
#include "yb/tserver/tablet_server.h"
65
#include "yb/tserver/ts_tablet_manager.h"
66
#include "yb/tserver/tserver_service.proxy.h"
67
68
#include "yb/util/random_util.h"
69
#include "yb/util/shared_lock.h"
70
#include "yb/util/status_format.h"
71
#include "yb/util/stopwatch.h"
72
#include "yb/util/tsan_util.h"
73
74
#include "yb/yql/cql/ql/util/statement_result.h"
75
76
using namespace std::literals; // NOLINT
77
78
DECLARE_uint64(initial_seqno);
79
DECLARE_int32(leader_lease_duration_ms);
80
DECLARE_int64(db_write_buffer_size);
81
DECLARE_string(time_source);
82
DECLARE_int32(TEST_delay_execute_async_ms);
83
DECLARE_int32(retryable_request_timeout_secs);
84
DECLARE_bool(enable_lease_revocation);
85
DECLARE_bool(rocksdb_disable_compactions);
86
DECLARE_int32(rocksdb_level0_slowdown_writes_trigger);
87
DECLARE_int32(rocksdb_level0_stop_writes_trigger);
88
DECLARE_bool(flush_rocksdb_on_shutdown);
89
DECLARE_int32(memstore_size_mb);
90
DECLARE_int64(global_memstore_size_mb_max);
91
DECLARE_bool(TEST_allow_stop_writes);
92
DECLARE_int32(yb_num_shards_per_tserver);
93
DECLARE_int32(ysql_num_shards_per_tserver);
94
DECLARE_int32(transaction_table_num_tablets);
95
DECLARE_int32(transaction_table_num_tablets_per_tserver);
96
DECLARE_int32(TEST_tablet_inject_latency_on_apply_write_txn_ms);
97
DECLARE_bool(TEST_log_cache_skip_eviction);
98
DECLARE_uint64(sst_files_hard_limit);
99
DECLARE_uint64(sst_files_soft_limit);
100
DECLARE_int32(timestamp_history_retention_interval_sec);
101
DECLARE_int32(raft_heartbeat_interval_ms);
102
DECLARE_int32(history_cutoff_propagation_interval_ms);
103
DECLARE_int32(TEST_preparer_batch_inject_latency_ms);
104
DECLARE_double(leader_failure_max_missed_heartbeat_periods);
105
DECLARE_int32(TEST_backfill_sabotage_frequency);
106
DECLARE_string(regular_tablets_data_block_key_value_encoding);
107
DECLARE_string(compression_type);
108
109
namespace yb {
110
namespace client {
111
112
using ql::RowsResult;
113
114
namespace {
115
116
const std::string kKeyColumn = "key"s;
117
const std::string kRangeKey1Column = "range_key1"s;
118
const std::string kRangeKey2Column = "range_key2"s;
119
const std::string kValueColumn = "int_val"s;
120
const YBTableName kTable1Name(YQL_DATABASE_CQL, "my_keyspace", "ql_client_test_table1");
121
const YBTableName kTable2Name(YQL_DATABASE_CQL, "my_keyspace", "ql_client_test_table2");
122
123
0
int32_t ValueForKey(int32_t key) {
124
0
  return key * 2;
125
0
}
126
127
const int kTotalKeys = 250;
128
const int kBigSeqNo = 100500;
129
130
} // namespace
131
132
class QLTabletTest : public QLDmlTestBase<MiniCluster> {
133
 protected:
134
29
  void SetUp() override {
135
29
    server::SkewedClock::Register();
136
29
    FLAGS_time_source = server::SkewedClock::kName;
137
29
    QLDmlTestBase::SetUp();
138
29
  }
139
140
0
  void CreateTables(uint64_t initial_seqno1, uint64_t initial_seqno2) {
141
0
    google::FlagSaver saver;
142
0
    FLAGS_initial_seqno = initial_seqno1;
143
0
    CreateTable(kTable1Name, &table1_);
144
0
    FLAGS_initial_seqno = initial_seqno2;
145
0
    CreateTable(kTable2Name, &table2_);
146
0
  }
147
148
  std::shared_ptr<YBqlWriteOp> CreateSetValueOp(
149
0
      int32_t key, int32_t value, const TableHandle& table) {
150
0
    const auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
151
0
    auto* const req = op->mutable_request();
152
0
    QLAddInt32HashValue(req, key);
153
0
    table.AddInt32ColumnValue(req, kValueColumn, value);
154
0
    return op;
155
0
  }
156
157
0
  void SetValue(const YBSessionPtr& session, int32_t key, int32_t value, const TableHandle& table) {
158
0
    auto op = CreateSetValueOp(key, value, table);
159
0
    ASSERT_OK(session->ApplyAndFlush(op));
160
0
    ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status())
161
0
        << op->response().error_message();
162
0
  }
163
164
  boost::optional<int32_t> GetValue(
165
0
      const YBSessionPtr& session, int32_t key, const TableHandle& table) {
166
0
    const auto op = CreateReadOp(key, table);
167
0
    EXPECT_OK(session->ApplyAndFlush(op));
168
0
    auto rowblock = RowsResult(op.get()).GetRowBlock();
169
0
    if (rowblock->row_count() == 0) {
170
0
      return boost::none;
171
0
    }
172
0
    EXPECT_EQ(1, rowblock->row_count());
173
0
    const auto& value = rowblock->row(0).column(0);
174
0
    EXPECT_TRUE(value.value().has_int32_value()) << "Value: " << value.value().ShortDebugString();
175
0
    return value.int32_value();
176
0
  }
177
178
0
  std::shared_ptr<YBqlReadOp> CreateReadOp(int32_t key, const TableHandle& table) {
179
0
    return client::CreateReadOp(key, table, kValueColumn);
180
0
  }
181
182
  void CreateTable(
183
      const YBTableName& table_name, TableHandle* table, int num_tablets = 0,
184
0
      bool transactional = false) {
185
0
    YBSchemaBuilder builder;
186
0
    builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull();
187
0
    builder.AddColumn(kValueColumn)->Type(INT32);
188
189
0
    if (num_tablets == 0) {
190
0
      num_tablets = CalcNumTablets(3);
191
0
    }
192
0
    if (transactional) {
193
0
      TableProperties table_properties;
194
0
      table_properties.SetTransactional(true);
195
0
      builder.SetTableProperties(table_properties);
196
0
    }
197
0
    ASSERT_OK(table->Create(table_name, num_tablets, client_.get(), &builder));
198
0
  }
199
200
0
  YBSessionPtr CreateSession() {
201
0
    auto session = client_->NewSession();
202
0
    session->SetTimeout(15s);
203
0
    return session;
204
0
  }
205
206
0
  void FillTable(int begin, int end, const TableHandle& table) {
207
0
    {
208
0
      auto session = CreateSession();
209
0
      for (int i = begin; i != end; ++i) {
210
0
        SetValue(session, i, ValueForKey(i), table);
211
0
      }
212
0
    }
213
0
    VerifyTable(begin, end, table);
214
0
    ASSERT_OK(WaitSync(begin, end, table));
215
0
  }
216
217
  CHECKED_STATUS BatchedFillTable(
218
0
      const int begin, const int end, const int batch_size, const TableHandle& table) {
219
0
    {
220
0
      auto session = CreateSession();
221
0
      for (int i = begin; i != end; ++i) {
222
0
        auto op = CreateSetValueOp(i, ValueForKey(i), table);
223
0
        if ((i - begin + 1) % batch_size == 0 || i == end) {
224
0
          RETURN_NOT_OK(session->ApplyAndFlush(op));
225
0
        } else {
226
0
          session->Apply(op);
227
0
        }
228
0
      }
229
0
    }
230
0
    return WaitSync(begin, end, table);
231
0
  }
232
233
0
  void VerifyTable(int begin, int end, const TableHandle& table) {
234
0
    auto session = CreateSession();
235
0
    for (int i = begin; i != end; ++i) {
236
0
      auto value = GetValue(session, i, table);
237
0
      ASSERT_TRUE(value.is_initialized()) << "i: " << i << ", table: " << table->name().ToString();
238
0
      ASSERT_EQ(ValueForKey(i), *value) << "i: " << i << ", table: " << table->name().ToString();
239
0
    }
240
0
  }
241
242
  typedef std::pair<std::vector<std::string>, std::unordered_set<std::string>> TabletIdsAndReplicas;
243
244
0
  Result<TabletIdsAndReplicas> GetTabletIdsAndReplicas(const TableHandle& table) {
245
0
    std::vector<std::string> tablet_ids;
246
0
    std::unordered_set<std::string> replicas;
247
248
0
    auto tablet_infos = GetTabletInfos(table.name());
249
0
    if (!tablet_infos) {
250
0
      return STATUS_FORMAT(NotFound,
251
0
                           "No tablet information found for $0",
252
0
                           table->name());
253
0
    }
254
0
    for (auto tablet_info : *tablet_infos) {
255
0
      tablet_ids.emplace_back(tablet_info->tablet_id());
256
0
      auto replica_map = tablet_info->GetReplicaLocations();
257
0
      for (auto it = replica_map->begin(); it != replica_map->end(); it++) {
258
0
        replicas.insert(it->first);
259
0
      }
260
0
    }
261
0
    return TabletIdsAndReplicas(tablet_ids, replicas);
262
0
  }
263
264
0
  CHECKED_STATUS WaitSync(int begin, int end, const TableHandle& table) {
265
0
    auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(30);
266
0
    TabletIdsAndReplicas info = VERIFY_RESULT(GetTabletIdsAndReplicas(table));
267
0
    std::vector<std::string> tablet_ids = info.first;
268
0
    std::unordered_set<std::string> replicas = info.second;
269
0
    for (const auto& replica : replicas) {
270
0
      RETURN_NOT_OK(DoWaitSync(deadline, tablet_ids, replica, begin, end, table));
271
0
    }
272
0
    return Status::OK();
273
0
  }
274
275
  CHECKED_STATUS DoWaitSync(
276
      const MonoTime& deadline,
277
      const std::vector<std::string>& tablet_ids,
278
      const std::string& replica,
279
      int begin,
280
      int end,
281
0
      const TableHandle& table) {
282
0
    auto tserver = cluster_->find_tablet_server(replica);
283
0
    if (!tserver) {
284
0
      return STATUS_FORMAT(NotFound, "Tablet server for $0 not found", replica);
285
0
    }
286
0
    auto endpoint = tserver->server()->rpc_server()->GetBoundAddresses().front();
287
0
    auto proxy = std::make_unique<tserver::TabletServerServiceProxy>(
288
0
        &tserver->server()->proxy_cache(), HostPort::FromBoundEndpoint(endpoint));
289
290
0
    auto condition = [&]() -> Result<bool> {
291
0
      for (int i = begin; i != end; ++i) {
292
0
        bool found = false;
293
0
        for (const std::string& tablet_id : tablet_ids) {
294
0
          tserver::ReadRequestPB req;
295
0
          {
296
0
            std::string partition_key;
297
0
            auto op = CreateReadOp(i, table);
298
0
            RETURN_NOT_OK(op->GetPartitionKey(&partition_key));
299
0
            auto* ql_batch = req.add_ql_batch();
300
0
            *ql_batch = op->request();
301
0
            const auto& hash_code = PartitionSchema::DecodeMultiColumnHashValue(partition_key);
302
0
            ql_batch->set_hash_code(hash_code);
303
0
            ql_batch->set_max_hash_code(hash_code);
304
0
          }
305
306
0
          rpc::RpcController controller;
307
0
          controller.set_timeout(MonoDelta::FromSeconds(1));
308
0
          req.set_tablet_id(tablet_id);
309
0
          req.set_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX);
310
311
0
          tserver::ReadResponsePB resp;
312
0
          RETURN_NOT_OK(proxy->Read(req, &resp, &controller));
313
314
0
          const auto& ql_batch = resp.ql_batch(0);
315
0
          if (ql_batch.status() != QLResponsePB_QLStatus_YQL_STATUS_OK) {
316
0
            return STATUS_FORMAT(RemoteError,
317
0
                                 "Bad resp status: $0",
318
0
                                 QLResponsePB_QLStatus_Name(ql_batch.status()));
319
0
          }
320
0
          Schema projection;
321
0
          vector<ColumnId> column_refs;
322
0
          column_refs.emplace_back(table.ColumnId(kValueColumn));
323
0
          Schema total_schema = client::internal::GetSchema(table->schema());
324
0
          RETURN_NOT_OK(total_schema.CreateProjectionByIdsIgnoreMissing(column_refs, &projection));
325
0
          std::shared_ptr<std::vector<ColumnSchema>> columns =
326
0
              std::make_shared<std::vector<ColumnSchema>>(YBSchema(projection).columns());
327
328
0
          Slice data = VERIFY_RESULT(controller.GetSidecar(ql_batch.rows_data_sidecar()));
329
0
          yb::ql::RowsResult result(table->name(), columns, data.ToBuffer());
330
0
          auto row_block = result.GetRowBlock();
331
0
          if (row_block->row_count() == 1) {
332
0
            if (found) {
333
0
              return STATUS_FORMAT(Corruption, "Key found twice: $0", i);
334
0
            }
335
0
            auto value = row_block->row(0).column(0).int32_value();
336
0
            if (value != ValueForKey(i)) {
337
0
              return STATUS_FORMAT(Corruption,
338
0
                                   "Wrong value for key: $0, expected: $1",
339
0
                                   value,
340
0
                                   ValueForKey(i));
341
0
            }
342
0
            found = true;
343
0
          }
344
0
        }
345
0
        if (!found) {
346
0
          LOG(INFO) << "Key not found: " << i;
347
0
          return false;
348
0
        }
349
0
      }
350
0
      return true;
351
0
    };
352
353
0
    return Wait(condition, deadline, "Waiting for replication");
354
0
  }
355
356
  CHECKED_STATUS VerifyConsistency(
357
0
      int begin, int end, const TableHandle& table, int expected_rows_mismatched = 0) {
358
0
    auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(30);
359
0
    TabletIdsAndReplicas info = VERIFY_RESULT(GetTabletIdsAndReplicas(table));
360
0
    std::vector<std::string> tablet_ids = info.first;
361
0
    std::unordered_set<std::string> replicas = info.second;
362
363
0
    for (const auto& replica : replicas) {
364
0
      RETURN_NOT_OK(
365
0
          DoVerify(deadline, tablet_ids, replica, begin, end, table, expected_rows_mismatched));
366
0
    }
367
0
    return Status::OK();
368
0
  }
369
370
  CHECKED_STATUS DoVerify(
371
      const MonoTime& deadline,
372
      const std::vector<std::string>& tablet_ids,
373
      const std::string& replica,
374
      const int begin,
375
      const int end,
376
      const TableHandle& table,
377
0
      const size_t expected_rows_mismatched) {
378
0
    auto tserver = cluster_->find_tablet_server(replica);
379
0
    if (!tserver) {
380
0
      return STATUS_FORMAT(NotFound, "Tablet server for $0 not found", replica);
381
0
    }
382
0
    auto endpoint = tserver->server()->rpc_server()->GetBoundAddresses().front();
383
0
    auto proxy = std::make_unique<tserver::TabletServerServiceProxy>(
384
0
        &tserver->server()->proxy_cache(), HostPort::FromBoundEndpoint(endpoint));
385
386
0
    CHECK_EQ(tablet_ids.size(), 1);
387
0
    for (const string& tablet_id : tablet_ids) {
388
0
      tserver::VerifyTableRowRangeRequestPB req;
389
0
      tserver::VerifyTableRowRangeResponsePB resp;
390
391
0
      req.set_tablet_id(tablet_id);
392
0
      req.set_num_rows(end - begin + 1);
393
0
      req.clear_start_key();  // empty string indicates start scan from start
394
      // read_time: if left empty, the safe time retrieved will be used instead
395
396
0
      rpc::RpcController controller;
397
0
      controller.set_deadline(deadline);
398
0
      RETURN_NOT_OK(proxy->VerifyTableRowRange(req, &resp, &controller));
399
400
0
      if (resp.consistency_stats().size() == 0) {
401
0
        return STATUS_FORMAT(
402
0
            NotFound,
403
0
            "Individual index consistency state missing for table $0.",
404
0
            table.table()->id());
405
0
      }
406
0
      for (auto it = resp.consistency_stats().begin(); it != resp.consistency_stats().end(); it++) {
407
0
        if (it->second != expected_rows_mismatched) {
408
0
          return STATUS_FORMAT(
409
0
              Corruption,
410
0
              "Incorrect number of rows reported to be dropped for index $0 built on table $1:"
411
0
              "found $2 rows reported when $3 rows mismatched was expected.",
412
0
              it->first, table.table()->id(), it->second, expected_rows_mismatched);
413
0
        }
414
0
      }
415
0
    }
416
417
0
    return Status::OK();
418
0
  }
419
420
0
  CHECKED_STATUS Import() {
421
0
    std::this_thread::sleep_for(1s); // Wait until all tablets a synced and flushed.
422
0
    EXPECT_OK(cluster_->FlushTablets());
423
424
0
    auto source_infos = VERIFY_RESULT(GetTabletInfos(kTable1Name));
425
0
    auto dest_infos = VERIFY_RESULT(GetTabletInfos(kTable2Name));
426
0
    EXPECT_EQ(source_infos.size(), dest_infos.size());
427
0
    for (size_t i = 0; i != source_infos.size(); ++i) {
428
0
      std::string start1, end1, start2, end2;
429
0
      {
430
0
        auto& metadata = source_infos[i]->metadata();
431
0
        SharedLock<std::remove_reference<decltype(metadata)>::type> lock(metadata);
432
0
        const auto& partition = metadata.state().pb.partition();
433
0
        start1 = partition.partition_key_start();
434
0
        end1 = partition.partition_key_end();
435
0
      }
436
0
      {
437
0
        auto& metadata = dest_infos[i]->metadata();
438
0
        SharedLock<std::remove_reference<decltype(metadata)>::type> lock(metadata);
439
0
        const auto& partition = metadata.state().pb.partition();
440
0
        start2 = partition.partition_key_start();
441
0
        end2 = partition.partition_key_end();
442
0
      }
443
0
      EXPECT_EQ(start1, start2);
444
0
      EXPECT_EQ(end1, end2);
445
0
    }
446
0
    for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
447
0
      auto* tablet_manager = cluster_->mini_tablet_server(i)->server()->tablet_manager();
448
0
      for (size_t j = 0; j != source_infos.size(); ++j) {
449
0
        tablet::TabletPeerPtr source_peer, dest_peer;
450
0
        tablet_manager->LookupTablet(source_infos[j]->id(), &source_peer);
451
0
        EXPECT_NE(nullptr, source_peer);
452
0
        auto source_dir = source_peer->tablet()->metadata()->rocksdb_dir();
453
0
        tablet_manager->LookupTablet(dest_infos[j]->id(), &dest_peer);
454
0
        EXPECT_NE(nullptr, dest_peer);
455
0
        auto status = dest_peer->tablet()->ImportData(source_dir);
456
0
        if (!status.ok() && !status.IsNotFound()) {
457
0
          return status;
458
0
        }
459
0
      }
460
0
    }
461
0
    return Status::OK();
462
0
  }
463
464
0
  Result<scoped_refptr<master::TableInfo>> GetTableInfo(const YBTableName& table_name) {
465
0
    auto& catalog_manager =
466
0
        VERIFY_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager();
467
0
    auto all_tables = catalog_manager.GetTables(master::GetTablesMode::kAll);
468
0
    for (const auto& table : all_tables) {
469
0
      if (table->name() == table_name.table_name()) {
470
0
        return table;
471
0
      }
472
0
    }
473
0
    return STATUS_FORMAT(NotFound, "Table $0 not found", table_name);
474
0
  }
475
476
0
  Result<master::TabletInfos> GetTabletInfos(const YBTableName& table_name) {
477
0
    return VERIFY_RESULT(GetTableInfo(table_name))->GetTablets();
478
0
  }
479
480
  Status WaitForTableCreation(const YBTableName& table_name,
481
0
                              master::IsCreateTableDoneResponsePB *resp) {
482
0
    return LoggedWaitFor([=]() -> Result<bool> {
483
0
      master::IsCreateTableDoneRequestPB req;
484
0
      req.mutable_table()->set_table_name(table_name.table_name());
485
0
      req.mutable_table()->mutable_namespace_()->set_name(table_name.namespace_name());
486
0
      resp->Clear();
487
488
0
      master::MasterDdlProxy master_proxy(
489
0
          &client_->proxy_cache(), VERIFY_RESULT(cluster_->GetLeaderMasterBoundRpcAddr()));
490
0
      rpc::RpcController rpc;
491
0
      rpc.set_timeout(MonoDelta::FromSeconds(30));
492
493
0
      Status s = master_proxy.IsCreateTableDone(req, resp, &rpc);
494
0
      return s.ok() && !resp->has_error();
495
0
    }, MonoDelta::FromSeconds(30), "Table Creation");
496
0
  }
497
498
  void TestDeletePartialKey(int num_range_keys_in_delete);
499
500
  void CreateAndVerifyIndexConsistency(int expected_number_rows_mismatched);
501
502
  TableHandle table1_;
503
  TableHandle table2_;
504
};
505
506
0
TEST_F(QLTabletTest, ImportToEmpty) {
507
0
  CreateTables(0, kBigSeqNo);
508
509
0
  FillTable(0, kTotalKeys, table1_);
510
0
  ASSERT_OK(Import());
511
0
  VerifyTable(0, kTotalKeys, table1_);
512
0
  VerifyTable(0, kTotalKeys, table2_);
513
0
}
514
515
0
TEST_F(QLTabletTest, ImportToNonEmpty) {
516
0
  CreateTables(0, kBigSeqNo);
517
518
0
  FillTable(0, kTotalKeys, table1_);
519
0
  FillTable(kTotalKeys, 2 * kTotalKeys, table2_);
520
0
  ASSERT_OK(Import());
521
0
  VerifyTable(0, 2 * kTotalKeys, table2_);
522
0
}
523
524
0
TEST_F(QLTabletTest, ImportToEmptyAndRestart) {
525
0
  CreateTables(0, kBigSeqNo);
526
527
0
  FillTable(0, kTotalKeys, table1_);
528
0
  ASSERT_OK(Import());
529
0
  VerifyTable(0, kTotalKeys, table2_);
530
531
0
  ASSERT_OK(cluster_->RestartSync());
532
0
  VerifyTable(0, kTotalKeys, table1_);
533
0
  VerifyTable(0, kTotalKeys, table2_);
534
0
}
535
536
0
TEST_F(QLTabletTest, ImportToNonEmptyAndRestart) {
537
0
  CreateTables(0, kBigSeqNo);
538
539
0
  FillTable(0, kTotalKeys, table1_);
540
0
  FillTable(kTotalKeys, 2 * kTotalKeys, table2_);
541
542
0
  ASSERT_OK(Import());
543
0
  VerifyTable(0, 2 * kTotalKeys, table2_);
544
545
0
  ASSERT_OK(cluster_->RestartSync());
546
0
  VerifyTable(0, kTotalKeys, table1_);
547
0
  VerifyTable(0, 2 * kTotalKeys, table2_);
548
0
}
549
550
0
TEST_F(QLTabletTest, LateImport) {
551
0
  CreateTables(kBigSeqNo, 0);
552
553
0
  FillTable(0, kTotalKeys, table1_);
554
0
  ASSERT_NOK(Import());
555
0
}
556
557
0
TEST_F(QLTabletTest, OverlappedImport) {
558
0
  CreateTables(kBigSeqNo - 2, kBigSeqNo);
559
560
0
  FillTable(0, kTotalKeys, table1_);
561
0
  FillTable(kTotalKeys, 2 * kTotalKeys, table2_);
562
0
  ASSERT_NOK(Import());
563
0
}
564
565
0
void QLTabletTest::CreateAndVerifyIndexConsistency(const int expected_number_rows_mismatched) {
566
0
  CreateTable(kTable1Name, &table1_, 1, true);
567
0
  FillTable(0, kTotalKeys, table1_);
568
569
0
  TableHandle index_table;
570
0
  kv_table_test::CreateIndex(
571
0
      yb::client::Transactional::kTrue, 1, false, table1_, client_.get(), &index_table);
572
573
0
  ASSERT_OK(client_->WaitUntilIndexPermissionsAtLeast(
574
0
      kTable1Name, index_table.name(), INDEX_PERM_READ_WRITE_AND_DELETE, 100ms /* max_wait */));
575
0
  ASSERT_OK(VerifyConsistency(
576
0
      0, kTotalKeys - 1, table1_,
577
0
      expected_number_rows_mismatched));  // no missing indexed rows check
578
0
  ASSERT_OK(VerifyConsistency(
579
0
      0, kTotalKeys - 1, index_table, expected_number_rows_mismatched));  // no orphans check
580
0
}
581
582
0
TEST_F(QLTabletTest, VerifyIndexRange) { CreateAndVerifyIndexConsistency(0); }
583
584
0
TEST_F(QLTabletTest, VerifyIndexRangeWithInconsistentTable) {
585
0
  FLAGS_TEST_backfill_sabotage_frequency = 10;
586
0
  const int kRowsDropped = kTotalKeys / FLAGS_TEST_backfill_sabotage_frequency;
587
0
  CreateAndVerifyIndexConsistency(kRowsDropped);
588
0
}
589
590
// Test expected number of tablets for transactions table - added for #2293.
591
0
TEST_F(QLTabletTest, TransactionsTableTablets) {
592
0
  FLAGS_yb_num_shards_per_tserver = 1;
593
0
  FLAGS_ysql_num_shards_per_tserver = 2;
594
0
  FLAGS_transaction_table_num_tablets = 0;
595
0
  FLAGS_transaction_table_num_tablets_per_tserver = 4;
596
597
0
  YBSchemaBuilder builder;
598
0
  builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull();
599
0
  builder.AddColumn(kValueColumn)->Type(INT32);
600
601
  // Create transactional table.
602
0
  TableProperties table_properties;
603
0
  table_properties.SetTransactional(true);
604
0
  builder.SetTableProperties(table_properties);
605
606
0
  TableHandle table;
607
0
  ASSERT_OK(table.Create(kTable1Name, 8, client_.get(), &builder));
608
609
  // Wait for transactions table to be created.
610
0
  YBTableName table_name(
611
0
      YQL_DATABASE_CQL, master::kSystemNamespaceName, kGlobalTransactionsTableName);
612
0
  master::IsCreateTableDoneResponsePB resp;
613
0
  ASSERT_OK(WaitForTableCreation(table_name, &resp));
614
0
  ASSERT_TRUE(resp.done());
615
616
0
  auto tablets = ASSERT_RESULT(GetTabletInfos(table_name));
617
0
  ASSERT_EQ(
618
0
      tablets.size(),
619
0
      cluster_->num_tablet_servers() * FLAGS_transaction_table_num_tablets_per_tserver);
620
0
}
621
622
0
void DoStepDowns(MiniCluster* cluster) {
623
0
  for (int j = 0; j != 5; ++j) {
624
0
    StepDownAllTablets(cluster);
625
0
    std::this_thread::sleep_for(5s);
626
0
  }
627
0
}
628
629
0
void VerifyLogIndicies(MiniCluster* cluster) {
630
0
  for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
631
0
    auto peers = cluster->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers();
632
633
0
    for (const auto& peer : peers) {
634
0
      int64_t index = ASSERT_RESULT(peer->GetEarliestNeededLogIndex());
635
0
      ASSERT_EQ(peer->consensus()->GetLastCommittedOpId().index, index);
636
0
    }
637
0
  }
638
0
}
639
640
namespace {
641
642
constexpr auto kRetryableRequestTimeoutSecs = 4;
643
644
} // namespace
645
646
0
TEST_F(QLTabletTest, GCLogWithoutWrites) {
647
0
  SetAtomicFlag(kRetryableRequestTimeoutSecs, &FLAGS_retryable_request_timeout_secs);
648
649
0
  TableHandle table;
650
0
  CreateTable(kTable1Name, &table);
651
652
0
  FillTable(0, kTotalKeys, table);
653
654
0
  std::this_thread::sleep_for(1s * (kRetryableRequestTimeoutSecs + 1));
655
0
  ASSERT_OK(cluster_->FlushTablets());
656
0
  DoStepDowns(cluster_.get());
657
0
  VerifyLogIndicies(cluster_.get());
658
0
}
659
660
0
TEST_F(QLTabletTest, GCLogWithRestartWithoutWrites) {
661
0
  SetAtomicFlag(kRetryableRequestTimeoutSecs, &FLAGS_retryable_request_timeout_secs);
662
663
0
  TableHandle table;
664
0
  CreateTable(kTable1Name, &table);
665
666
0
  FillTable(0, kTotalKeys, table);
667
668
0
  std::this_thread::sleep_for(1s * (kRetryableRequestTimeoutSecs + 1));
669
0
  ASSERT_OK(cluster_->FlushTablets());
670
671
0
  ASSERT_OK(cluster_->RestartSync());
672
673
0
  DoStepDowns(cluster_.get());
674
0
  VerifyLogIndicies(cluster_.get());
675
0
}
676
677
0
TEST_F(QLTabletTest, LeaderLease) {
678
0
  SetAtomicFlag(false, &FLAGS_enable_lease_revocation);
679
680
0
  TableHandle table;
681
0
  CreateTable(kTable1Name, &table);
682
683
0
  LOG(INFO) << "Filling table";
684
0
  FillTable(0, kTotalKeys, table);
685
686
0
  auto old_lease_ms = GetAtomicFlag(&FLAGS_leader_lease_duration_ms);
687
0
  SetAtomicFlag(60 * 1000, &FLAGS_leader_lease_duration_ms);
688
  // Wait for lease to sync.
689
0
  std::this_thread::sleep_for(2ms * old_lease_ms);
690
691
0
  LOG(INFO) << "Step down";
692
0
  StepDownAllTablets(cluster_.get());
693
694
0
  LOG(INFO) << "Write value";
695
0
  auto session = CreateSession();
696
0
  const auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
697
0
  auto* const req = op->mutable_request();
698
0
  QLAddInt32HashValue(req, 1);
699
0
  table.AddInt32ColumnValue(req, kValueColumn, 1);
700
0
  auto status = session->ApplyAndFlush(op);
701
0
  ASSERT_TRUE(status.IsIOError()) << "Status: " << status;
702
0
}
703
704
// This test tries to catch situation when some entries were applied and flushed in RocksDB,
705
// but is not present in persistent logs.
706
//
707
// If that happens than we would get situation that after restart some node has records
708
// in RocksDB, but does not have log records for it. And would not be able to restore last
709
// hybrid time, also this node would not be able to remotely bootstrap other nodes.
710
//
711
// So we just delay one of follower logs and write a random key.
712
// Checking that flushed op id in RocksDB does not exceed last op id in logs.
713
0
TEST_F(QLTabletTest, WaitFlush) {
714
0
  google::FlagSaver saver;
715
716
0
  constexpr int kNumTablets = 1; // Use single tablet to increase chance of bad scenario.
717
0
  FLAGS_db_write_buffer_size = 10; // Use small memtable to induce background flush on each write.
718
719
0
  TestWorkload workload(cluster_.get());
720
0
  workload.set_table_name(kTable1Name);
721
0
  workload.set_write_timeout_millis(30000);
722
0
  workload.set_num_tablets(kNumTablets);
723
0
  workload.set_num_write_threads(1);
724
0
  workload.set_write_batch_size(1);
725
0
  workload.set_payload_bytes(128);
726
0
  workload.Setup();
727
0
  workload.Start();
728
729
0
  std::vector<tablet::TabletPeerPtr> peers;
730
731
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
732
0
    auto tserver_peers =
733
0
        cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers();
734
0
    ASSERT_EQ(tserver_peers.size(), 1);
735
0
    peers.push_back(tserver_peers.front());
736
0
  }
737
738
0
  bool leader_found = false;
739
0
  while (!leader_found) {
740
0
    for (size_t i = 0; i != peers.size(); ++i) {
741
0
      if (peers[i]->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
742
0
        peers[(i + 1) % peers.size()]->log()->TEST_SetSleepDuration(500ms);
743
0
        leader_found = true;
744
0
        break;
745
0
      }
746
0
    }
747
0
  }
748
749
0
  auto deadline = std::chrono::steady_clock::now() + 20s;
750
0
  while (std::chrono::steady_clock::now() <= deadline) {
751
0
    for (const auto& peer : peers) {
752
0
      auto flushed_op_id = ASSERT_RESULT(peer->tablet()->MaxPersistentOpId()).regular;
753
0
      auto latest_entry_op_id = peer->log()->GetLatestEntryOpId();
754
0
      ASSERT_LE(flushed_op_id.index, latest_entry_op_id.index);
755
0
    }
756
0
  }
757
758
0
  for (const auto& peer : peers) {
759
0
    auto flushed_op_id = ASSERT_RESULT(peer->tablet()->MaxPersistentOpId()).regular;
760
0
    ASSERT_GE(flushed_op_id.index, 100);
761
0
  }
762
763
0
  workload.StopAndJoin();
764
0
}
765
766
767
0
TEST_F(QLTabletTest, BoundaryValues) {
768
0
  constexpr size_t kTotalThreads = 8;
769
0
  constexpr int kTotalRows = 10000;
770
771
0
  TableHandle table;
772
0
  CreateTable(kTable1Name, &table, 1);
773
774
0
  std::vector<std::thread> threads;
775
0
  std::atomic<int> idx(0);
776
0
  for (size_t t = 0; t != kTotalThreads; ++t) {
777
0
    threads.emplace_back([this, &idx, &table] {
778
0
      auto session = CreateSession();
779
0
      for (;;) {
780
0
        auto i = idx++;
781
0
        if (i >= kTotalRows) {
782
0
          break;
783
0
        }
784
785
0
        SetValue(session, i, -i, table);
786
0
      }
787
0
    });
788
0
  }
789
0
  const auto kSleepTime = NonTsanVsTsan(5s, 1s);
790
0
  std::this_thread::sleep_for(kSleepTime);
791
0
  LOG(INFO) << "Flushing tablets";
792
0
  ASSERT_OK(cluster_->FlushTablets());
793
0
  std::this_thread::sleep_for(kSleepTime);
794
0
  LOG(INFO) << "GC logs";
795
0
  ASSERT_OK(cluster_->CleanTabletLogs());
796
0
  LOG(INFO) << "Wait for threads";
797
0
  for (auto& thread : threads) {
798
0
    thread.join();
799
0
  }
800
0
  std::this_thread::sleep_for(kSleepTime * 5);
801
0
  ASSERT_OK(cluster_->RestartSync());
802
803
0
  size_t total_rows = 0;
804
0
  for (const auto& row : TableRange(table)) {
805
0
    EXPECT_EQ(row.column(0).int32_value(), -row.column(1).int32_value());
806
0
    ++total_rows;
807
0
  }
808
0
  ASSERT_EQ(kTotalRows, total_rows);
809
810
0
  ASSERT_OK(cluster_->FlushTablets());
811
0
  std::this_thread::sleep_for(kSleepTime);
812
813
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
814
0
    auto peers = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers();
815
0
    ASSERT_EQ(1, peers.size());
816
0
    auto& peer = *peers[0];
817
0
    auto op_id = peer.log()->GetLatestEntryOpId();
818
0
    auto* db = peer.tablet()->TEST_db();
819
0
    int64_t max_index = 0;
820
0
    int64_t min_index = std::numeric_limits<int64_t>::max();
821
0
    for (const auto& file : db->GetLiveFilesMetaData()) {
822
0
      LOG(INFO) << "File: " << yb::ToString(file);
823
0
      max_index = std::max(
824
0
          max_index,
825
0
          down_cast<docdb::ConsensusFrontier&>(*file.largest.user_frontier).op_id().index);
826
0
      min_index = std::min(
827
0
          min_index,
828
0
          down_cast<docdb::ConsensusFrontier&>(*file.smallest.user_frontier).op_id().index);
829
0
    }
830
831
    // Allow several entries for non write ops.
832
0
    ASSERT_GE(max_index, op_id.index - 5);
833
0
    ASSERT_LE(min_index, 5);
834
0
  }
835
0
}
836
837
// There was bug with MvccManager when clocks were skewed.
838
// Client tries to read from follower and max safe time is requested w/o any limits,
839
// so new operations could be added with HT lower than returned.
840
0
TEST_F(QLTabletTest, SkewedClocks) {
841
0
  google::FlagSaver saver;
842
843
0
  auto delta_changers = SkewClocks(cluster_.get(), 50ms);
844
845
0
  TestWorkload workload(cluster_.get());
846
0
  workload.set_table_name(kTable1Name);
847
0
  workload.set_write_timeout_millis(30000);
848
0
  workload.set_num_tablets(12);
849
0
  workload.set_num_write_threads(2);
850
0
  workload.set_write_batch_size(1);
851
0
  workload.set_payload_bytes(128);
852
0
  workload.Setup();
853
0
  workload.Start();
854
855
0
  while (workload.rows_inserted() < 100) {
856
0
    std::this_thread::sleep_for(10ms);
857
0
  }
858
859
0
  TableHandle table;
860
0
  ASSERT_OK(table.Open(kTable1Name, client_.get()));
861
0
  auto session = CreateSession();
862
863
0
  for (int i = 0; i != 1000; ++i) {
864
0
    auto op = table.NewReadOp();
865
0
    auto req = op->mutable_request();
866
0
    QLAddInt32HashValue(req, i);
867
0
    auto value_column_id = table.ColumnId(kValueColumn);
868
0
    req->add_selected_exprs()->set_column_id(value_column_id);
869
0
    req->mutable_column_refs()->add_ids(value_column_id);
870
871
0
    QLRSColDescPB *rscol_desc = req->mutable_rsrow_desc()->add_rscol_descs();
872
0
    rscol_desc->set_name(kValueColumn);
873
0
    table.ColumnType(kValueColumn)->ToQLTypePB(rscol_desc->mutable_ql_type());
874
0
    op->set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX);
875
0
    ASSERT_OK(session->ApplyAndFlush(op));
876
0
    ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status());
877
0
  }
878
879
0
  workload.StopAndJoin();
880
881
0
  cluster_->Shutdown(); // Need to shutdown cluster before resetting clock back.
882
0
  cluster_.reset();
883
0
}
884
885
0
TEST_F(QLTabletTest, LeaderChange) {
886
0
  const int32_t kKey = 1;
887
0
  const int32_t kValue1 = 2;
888
0
  const int32_t kValue2 = 3;
889
0
  const int32_t kValue3 = 4;
890
0
  const int kNumTablets = 1;
891
892
0
  TableHandle table;
893
0
  CreateTable(kTable1Name, &table, kNumTablets);
894
0
  auto session = client_->NewSession();
895
0
  session->SetTimeout(60s);
896
897
  // Write kValue1
898
0
  SetValue(session, kKey, kValue1, table);
899
900
0
  std::string leader_id;
901
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
902
0
    auto server = cluster_->mini_tablet_server(i)->server();
903
0
    auto peers = server->tablet_manager()->GetTabletPeers();
904
0
    for (const auto& peer : peers) {
905
0
      if (peer->LeaderStatus() != consensus::LeaderStatus::NOT_LEADER) {
906
0
        leader_id = server->permanent_uuid();
907
0
        break;
908
0
      }
909
0
    }
910
0
  }
911
912
0
  LOG(INFO) << "Current leader: " << leader_id;
913
914
0
  ASSERT_NE(leader_id, "");
915
916
0
  LOG(INFO) << "CAS " << kValue1 << " => " << kValue2;
917
0
  const auto write_op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
918
0
  auto* const req = write_op->mutable_request();
919
0
  QLAddInt32HashValue(req, kKey);
920
0
  table.AddInt32ColumnValue(req, kValueColumn, kValue2);
921
922
0
  table.SetColumn(req->add_column_values(), kValueColumn);
923
0
  table.SetInt32Condition(
924
0
    req->mutable_if_expr()->mutable_condition(), kValueColumn, QL_OP_EQUAL, kValue1);
925
0
  req->mutable_column_refs()->add_ids(table.ColumnId(kValueColumn));
926
0
  session->Apply(write_op);
927
928
0
  SetAtomicFlag(30000, &FLAGS_TEST_delay_execute_async_ms);
929
0
  auto flush_future = session->FlushFuture();
930
0
  std::this_thread::sleep_for(2s);
931
932
0
  SetAtomicFlag(0, &FLAGS_TEST_delay_execute_async_ms);
933
934
0
  LOG(INFO) << "Step down old leader";
935
0
  StepDownAllTablets(cluster_.get());
936
937
  // Write other key to refresh leader cache.
938
  // Otherwise we would hang of locking the key.
939
0
  LOG(INFO) << "Write other key";
940
0
  SetValue(session, kKey + 1, kValue1, table);
941
942
0
  LOG(INFO) << "Write " << kValue3;
943
0
  SetValue(session, kKey, kValue3, table);
944
945
0
  ASSERT_EQ(GetValue(session, kKey, table), kValue3);
946
947
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
948
0
    auto server = cluster_->mini_tablet_server(i)->server();
949
0
    auto peers = server->tablet_manager()->GetTabletPeers();
950
0
    bool found = false;
951
0
    for (const auto& peer : peers) {
952
0
      if (peer->LeaderStatus() != consensus::LeaderStatus::NOT_LEADER) {
953
0
        LOG(INFO) << "Request step down: " << server->permanent_uuid() << " => " << leader_id;
954
0
        consensus::LeaderStepDownRequestPB req;
955
0
        req.set_tablet_id(peer->tablet_id());
956
0
        req.set_new_leader_uuid(leader_id);
957
0
        consensus::LeaderStepDownResponsePB resp;
958
0
        ASSERT_OK(peer->consensus()->StepDown(&req, &resp));
959
0
        found = true;
960
0
        break;
961
0
      }
962
0
    }
963
0
    if (found) {
964
0
      break;
965
0
    }
966
0
  }
967
968
0
  ASSERT_OK(flush_future.get().status);
969
0
  ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, write_op->response().status());
970
971
0
  ASSERT_EQ(GetValue(session, kKey, table), kValue3);
972
0
}
973
974
0
void QLTabletTest::TestDeletePartialKey(int num_range_keys_in_delete) {
975
0
  YBSchemaBuilder builder;
976
0
  builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull();
977
0
  builder.AddColumn(kRangeKey1Column)->Type(INT32)->PrimaryKey()->NotNull();
978
0
  builder.AddColumn(kRangeKey2Column)->Type(INT32)->PrimaryKey()->NotNull();
979
0
  builder.AddColumn(kValueColumn)->Type(INT32);
980
981
0
  TableHandle table;
982
0
  ASSERT_OK(table.Create(kTable1Name, 1 /* num_tablets */, client_.get(), &builder));
983
984
0
  const auto kValue1 = 2;
985
0
  const auto kValue2 = 3;
986
0
  const auto kTotalKeys = 200;
987
988
0
  auto session1 = CreateSession();
989
0
  auto session2 = CreateSession();
990
0
  for (int key = 1; key != kTotalKeys; ++key) {
991
0
    {
992
0
      const auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
993
0
      auto* const req = op->mutable_request();
994
0
      QLAddInt32HashValue(req, key);
995
0
      QLAddInt32RangeValue(req, key);
996
0
      QLAddInt32RangeValue(req, key);
997
0
      table.AddInt32ColumnValue(req, kValueColumn, kValue1);
998
0
      ASSERT_OK(session1->ApplyAndFlush(op));
999
0
      ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status());
1000
0
    }
1001
1002
0
    const auto op_del = table.NewWriteOp(QLWriteRequestPB::QL_STMT_DELETE);
1003
0
    {
1004
0
      auto* const req = op_del->mutable_request();
1005
0
      QLAddInt32HashValue(req, key);
1006
0
      for (int i = 0; i != num_range_keys_in_delete; ++i) {
1007
0
        QLAddInt32RangeValue(req, key);
1008
0
      }
1009
0
      session1->Apply(op_del);
1010
0
    }
1011
1012
0
    const auto op_update = table.NewWriteOp(QLWriteRequestPB::QL_STMT_UPDATE);
1013
0
    {
1014
0
      auto* const req = op_update->mutable_request();
1015
0
      QLAddInt32HashValue(req, key);
1016
0
      QLAddInt32RangeValue(req, key);
1017
0
      QLAddInt32RangeValue(req, key);
1018
0
      table.AddInt32ColumnValue(req, kValueColumn, kValue2);
1019
0
      req->mutable_if_expr()->mutable_condition()->set_op(QL_OP_EXISTS);
1020
0
      session2->Apply(op_update);
1021
0
    }
1022
0
    auto future_del = session1->FlushFuture();
1023
0
    auto future_update = session2->FlushFuture();
1024
0
    ASSERT_OK(future_del.get().status);
1025
0
    ASSERT_OK(future_update.get().status);
1026
0
    ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op_del->response().status());
1027
0
    ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op_update->response().status());
1028
1029
0
    auto stored_value = GetValue(session1, key, table);
1030
0
    ASSERT_TRUE(!stored_value) << "Key: " << key << ", value: " << *stored_value;
1031
0
  }
1032
0
}
1033
1034
0
TEST_F(QLTabletTest, DeleteByHashKey) {
1035
0
  TestDeletePartialKey(0);
1036
0
}
1037
1038
0
TEST_F(QLTabletTest, DeleteByHashAndPartialRangeKey) {
1039
0
  TestDeletePartialKey(1);
1040
0
}
1041
1042
0
TEST_F(QLTabletTest, ManySstFilesBootstrap) {
1043
0
  FLAGS_flush_rocksdb_on_shutdown = false;
1044
1045
0
  int key = 0;
1046
0
  {
1047
0
    google::FlagSaver flag_saver;
1048
1049
0
    size_t original_rocksdb_level0_stop_writes_trigger = 48;
1050
0
    FLAGS_sst_files_hard_limit = std::numeric_limits<uint64_t>::max() / 4;
1051
0
    FLAGS_sst_files_soft_limit = FLAGS_sst_files_hard_limit;
1052
0
    FLAGS_rocksdb_level0_stop_writes_trigger = 10000;
1053
0
    FLAGS_rocksdb_level0_slowdown_writes_trigger = 10000;
1054
0
    FLAGS_rocksdb_disable_compactions = true;
1055
0
    CreateTable(kTable1Name, &table1_, 1);
1056
1057
0
    auto session = CreateSession();
1058
0
    auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
1059
0
    ASSERT_EQ(peers.size(), 1);
1060
0
    LOG(INFO) << "Leader: " << peers[0]->permanent_uuid();
1061
0
    int stop_key = 0;
1062
0
    for (;;) {
1063
0
      auto meta = peers[0]->tablet()->TEST_db()->GetLiveFilesMetaData();
1064
0
      LOG(INFO) << "Total files: " << meta.size();
1065
1066
0
      ++key;
1067
0
      SetValue(session, key, ValueForKey(key), table1_);
1068
0
      if (meta.size() <= original_rocksdb_level0_stop_writes_trigger) {
1069
0
        ASSERT_OK(peers[0]->tablet()->Flush(tablet::FlushMode::kSync));
1070
0
        stop_key = key + 10;
1071
0
      } else if (key >= stop_key) {
1072
0
        break;
1073
0
      }
1074
0
    }
1075
0
  }
1076
1077
0
  cluster_->Shutdown();
1078
1079
0
  LOG(INFO) << "Starting cluster";
1080
1081
0
  ASSERT_OK(cluster_->StartSync());
1082
1083
0
  LOG(INFO) << "Verify table";
1084
1085
0
  VerifyTable(1, key, table1_);
1086
0
}
1087
1088
class QLTabletTestSmallMemstore : public QLTabletTest {
1089
 public:
1090
1
  void SetUp() override {
1091
1
    FLAGS_memstore_size_mb = 1;
1092
1
    FLAGS_global_memstore_size_mb_max = 1;
1093
1
    QLTabletTest::SetUp();
1094
1
  }
1095
};
1096
1097
0
TEST_F_EX(QLTabletTest, DoubleFlush, QLTabletTestSmallMemstore) {
1098
0
  SetAtomicFlag(false, &FLAGS_TEST_allow_stop_writes);
1099
1100
0
  TestWorkload workload(cluster_.get());
1101
0
  workload.set_table_name(kTable1Name);
1102
0
  workload.set_write_timeout_millis(30000);
1103
0
  workload.set_num_tablets(1);
1104
0
  workload.set_num_write_threads(10);
1105
0
  workload.set_write_batch_size(1);
1106
0
  workload.set_payload_bytes(1_KB);
1107
0
  workload.Setup();
1108
0
  workload.Start();
1109
1110
0
  while (workload.rows_inserted() < RegularBuildVsSanitizers(75000, 20000)) {
1111
0
    std::this_thread::sleep_for(10ms);
1112
0
  }
1113
1114
0
  workload.StopAndJoin();
1115
1116
  // Flush on rocksdb shutdown could produce second immutable memtable, that will stop writes.
1117
0
  SetAtomicFlag(true, &FLAGS_TEST_allow_stop_writes);
1118
0
  cluster_->Shutdown(); // Need to shutdown cluster before resetting clock back.
1119
0
  cluster_.reset();
1120
0
}
1121
1122
0
TEST_F(QLTabletTest, OperationMemTracking) {
1123
0
  FLAGS_TEST_log_cache_skip_eviction = true;
1124
1125
0
  constexpr ssize_t kValueSize = 64_KB;
1126
0
  const auto kWaitInterval = 50ms;
1127
1128
0
  YBSchemaBuilder builder;
1129
0
  builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull();
1130
0
  builder.AddColumn(kValueColumn)->Type(STRING);
1131
1132
0
  TableHandle table;
1133
0
  ASSERT_OK(table.Create(kTable1Name, CalcNumTablets(3), client_.get(), &builder));
1134
1135
0
  FLAGS_TEST_tablet_inject_latency_on_apply_write_txn_ms = 1000;
1136
1137
0
  const auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
1138
0
  auto* const req = op->mutable_request();
1139
0
  QLAddInt32HashValue(req, 42);
1140
0
  auto session = CreateSession();
1141
0
  table.AddStringColumnValue(req, kValueColumn, std::string(kValueSize, 'X'));
1142
0
  session->Apply(op);
1143
0
  auto future = session->FlushFuture();
1144
0
  auto server_tracker = MemTracker::GetRootTracker()->FindChild("server 1");
1145
0
  auto tablets_tracker = server_tracker->FindChild("Tablets");
1146
0
  auto log_tracker = server_tracker->FindChild("log_cache");
1147
1148
0
  std::chrono::steady_clock::time_point deadline;
1149
0
  bool tracked_by_tablets = false;
1150
0
  bool tracked_by_log_cache = false;
1151
0
  for (;;) {
1152
    // The consumption get order is important, otherwise we could get into situation where
1153
    // mem tracking changed between gets.
1154
0
    auto log_cache_consumption = log_tracker->consumption();
1155
0
    tracked_by_log_cache = tracked_by_log_cache || log_cache_consumption >= kValueSize;
1156
0
    int64_t operation_tracker_consumption = 0;
1157
0
    for (auto& child : tablets_tracker->ListChildren()) {
1158
0
      operation_tracker_consumption += child->FindChild("operation_tracker")->consumption();
1159
0
    }
1160
1161
0
    tracked_by_tablets = tracked_by_tablets || operation_tracker_consumption >= kValueSize;
1162
0
    LOG(INFO) << "Operation tracker consumption: " << operation_tracker_consumption
1163
0
              << ", log cache consumption: " << log_cache_consumption;
1164
    // We have overhead in both log cache and tablets.
1165
    // So if value is double tracked then sum consumption will be higher than double value size.
1166
0
    ASSERT_LE(operation_tracker_consumption + log_cache_consumption, kValueSize * 2)
1167
0
        << DumpMemoryUsage();
1168
0
    if (std::chrono::steady_clock::time_point() == deadline) { // operation did not finish yet
1169
0
      if (future.wait_for(kWaitInterval) == std::future_status::ready) {
1170
0
        LOG(INFO) << "Value written";
1171
0
        deadline = std::chrono::steady_clock::now() + 3s;
1172
0
        ASSERT_OK(future.get().status);
1173
0
        ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status());
1174
0
      }
1175
0
    } else if (deadline < std::chrono::steady_clock::now() || tracked_by_log_cache) {
1176
0
      break;
1177
0
    } else {
1178
0
      std::this_thread::sleep_for(kWaitInterval);
1179
0
    }
1180
0
  }
1181
1182
0
  ASSERT_TRUE(tracked_by_tablets);
1183
0
  ASSERT_TRUE(tracked_by_log_cache);
1184
0
}
1185
1186
// Checks the existance of the BlockBasedTable memtracker and verifies that its size is greater
1187
// than zero after creating a table and flushing it.  Then deletes the table, and verifies that
1188
// the memtracker is removed.
1189
0
TEST_F(QLTabletTest, BlockCacheMemTracking) {
1190
0
  const auto kSleepTime = NonTsanVsTsan(5s, 1s);
1191
0
  constexpr size_t kTotalRows = 10000;
1192
0
  const string kBlockTrackerName = "BlockBasedTable";
1193
1194
0
  TableHandle table;
1195
0
  CreateTable(kTable1Name, &table, 1);
1196
0
  FillTable(0, kTotalRows, table);
1197
1198
0
  auto server_tracker = MemTracker::GetRootTracker()->FindChild("server 1");
1199
0
  auto block_cache_tracker = server_tracker->FindChild(kBlockTrackerName);
1200
0
  ASSERT_TRUE(block_cache_tracker);
1201
1202
0
  std::this_thread::sleep_for(kSleepTime);
1203
0
  LOG(INFO) << "Flushing tablets";
1204
0
  ASSERT_OK(cluster_->FlushTablets());
1205
0
  std::this_thread::sleep_for(kSleepTime);
1206
1207
0
  auto block_cache_children = block_cache_tracker->ListChildren();
1208
  // check that there is exactly one child memtracker
1209
0
  ASSERT_EQ(block_cache_children.size(), 1);
1210
  // check that the child memtracker has a consumption greater than zero
1211
0
  ASSERT_GT(block_cache_children[0]->consumption(), 0);
1212
1213
0
  LOG(INFO) << "Deleting table";
1214
0
  ASSERT_OK(client_->DeleteTable(kTable1Name, true));
1215
0
  std::this_thread::sleep_for(kSleepTime);
1216
1217
  // after table deletion, assert that there is no longer a block cache memtracker
1218
0
  block_cache_tracker = server_tracker->FindChild(kBlockTrackerName);
1219
0
  ASSERT_FALSE(block_cache_tracker);
1220
0
}
1221
1222
// Checks history cutoff for cluster against previous state.
1223
// Committed history cutoff should not go backward.
1224
// Updates committed_history_cutoff with current state.
1225
void VerifyHistoryCutoff(MiniCluster* cluster, HybridTime* prev_committed,
1226
0
                         const std::string& trace) {
1227
0
  SCOPED_TRACE(trace);
1228
0
  const auto base_delta_us =
1229
0
      -FLAGS_timestamp_history_retention_interval_sec * MonoTime::kMicrosecondsPerSecond;
1230
0
  constexpr auto kExtraDeltaMs = 200;
1231
  // Allow one 2 Raft rounds + processing delta to replicate operation, update committed and
1232
  // propagate it.
1233
0
  const auto committed_delta_us =
1234
0
      base_delta_us -
1235
0
      (FLAGS_raft_heartbeat_interval_ms * 2 + kExtraDeltaMs) * MonoTime::kMicrosecondsPerMillisecond
1236
0
          * kTimeMultiplier;
1237
1238
0
  HybridTime committed = HybridTime::kMin;
1239
0
  auto deadline = CoarseMonoClock::now() + 5s * kTimeMultiplier;
1240
0
  for (;;) {
1241
0
    ASSERT_LE(CoarseMonoClock::now(), deadline);
1242
0
    auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll);
1243
0
    std::sort(peers.begin(), peers.end(), [](const auto& lhs, const auto& rhs) {
1244
0
      return lhs->permanent_uuid() < rhs->permanent_uuid();
1245
0
    });
1246
0
    if (peers.size() != cluster->num_tablet_servers()) {
1247
0
      std::this_thread::sleep_for(100ms);
1248
0
      continue;
1249
0
    }
1250
0
    bool complete = false;
1251
0
    for (size_t i = 0; i < peers.size(); ++i) {
1252
0
      auto peer = peers[i];
1253
0
      SCOPED_TRACE(Format("Peer: $0", peer->permanent_uuid()));
1254
0
      if (peer->state() != tablet::RaftGroupStatePB::RUNNING) {
1255
0
        complete = false;
1256
0
        break;
1257
0
      }
1258
0
      auto peer_history_cutoff =
1259
0
          peer->tablet()->RetentionPolicy()->GetRetentionDirective().history_cutoff;
1260
0
      committed = std::max(peer_history_cutoff, committed);
1261
0
      auto min_allowed = std::min(peer->clock_ptr()->Now().AddMicroseconds(committed_delta_us),
1262
0
                                  peer->tablet()->mvcc_manager()->LastReplicatedHybridTime());
1263
0
      if (peer_history_cutoff < min_allowed) {
1264
0
        LOG(INFO) << "Committed did not catch up for " << peer->permanent_uuid() << ": "
1265
0
                  << peer_history_cutoff << " vs " << min_allowed;
1266
0
        complete = false;
1267
0
        break;
1268
0
      }
1269
0
      if (peer->consensus()->GetLeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
1270
0
        complete = true;
1271
0
      }
1272
0
    }
1273
0
    if (complete) {
1274
0
      break;
1275
0
    }
1276
0
    std::this_thread::sleep_for(100ms);
1277
0
  }
1278
0
  ASSERT_GE(committed, *prev_committed);
1279
0
  *prev_committed = committed;
1280
0
}
1281
1282
// Basic check for history cutoff evolution
1283
0
TEST_F(QLTabletTest, HistoryCutoff) {
1284
0
  FLAGS_timestamp_history_retention_interval_sec = kTimeMultiplier;
1285
0
  FLAGS_history_cutoff_propagation_interval_ms = 100;
1286
1287
0
  CreateTable(kTable1Name, &table1_, /* num_tablets= */ 1);
1288
0
  HybridTime committed_history_cutoff = HybridTime::kMin;
1289
0
  FillTable(0, 10, table1_);
1290
0
  ASSERT_NO_FATALS(VerifyHistoryCutoff(cluster_.get(), &committed_history_cutoff, "After write"));
1291
1292
  // Check that we restore committed state after restart.
1293
0
  std::array<HybridTime, 3> peer_committed;
1294
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
1295
0
    auto peers = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers();
1296
0
    ASSERT_EQ(peers.size(), 1);
1297
0
    peer_committed[i] =
1298
0
        peers[0]->tablet()->RetentionPolicy()->GetRetentionDirective().history_cutoff;
1299
0
    LOG(INFO) << "Peer: " << peers[0]->permanent_uuid() << ", index: " << i
1300
0
              << ", committed: " << peer_committed[i];
1301
0
    cluster_->mini_tablet_server(i)->Shutdown();
1302
0
  }
1303
1304
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
1305
0
    ASSERT_OK(cluster_->mini_tablet_server(i)->Start());
1306
0
    for (;;) {
1307
0
      auto peers = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers();
1308
0
      ASSERT_LE(peers.size(), 1);
1309
0
      if (peers.empty() || peers[0]->state() != tablet::RaftGroupStatePB::RUNNING) {
1310
0
        std::this_thread::sleep_for(100ms);
1311
0
        continue;
1312
0
      }
1313
0
      SCOPED_TRACE(Format("Peer: $0, index: $1", peers[0]->permanent_uuid(), i));
1314
0
      ASSERT_GE(peers[0]->tablet()->RetentionPolicy()->GetRetentionDirective().history_cutoff,
1315
0
                peer_committed[i]);
1316
0
      break;
1317
0
    }
1318
0
    cluster_->mini_tablet_server(i)->Shutdown();
1319
0
  }
1320
1321
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
1322
0
    ASSERT_OK(cluster_->mini_tablet_server(i)->Start());
1323
0
  }
1324
0
  ASSERT_NO_FATALS(VerifyHistoryCutoff(cluster_.get(), &committed_history_cutoff, "After restart"));
1325
1326
  // Wait to check history cutoff advance w/o operations.
1327
0
  std::this_thread::sleep_for(
1328
0
      FLAGS_timestamp_history_retention_interval_sec * 1s +
1329
0
      FLAGS_history_cutoff_propagation_interval_ms * 3ms);
1330
0
  ASSERT_NO_FATALS(VerifyHistoryCutoff(cluster_.get(), &committed_history_cutoff, "Final"));
1331
0
}
1332
1333
class QLTabletRf1Test : public QLTabletTest {
1334
 public:
1335
2
  void SetUp() override {
1336
2
    mini_cluster_opt_.num_masters = 1;
1337
2
    mini_cluster_opt_.num_tablet_servers = 1;
1338
2
    QLTabletTest::SetUp();
1339
2
  }
1340
};
1341
1342
// For this test we don't need actually RF3 setup which also makes test flaky because of
1343
// https://github.com/yugabyte/yugabyte-db/issues/4663.
1344
0
TEST_F_EX(QLTabletTest, GetMiddleKey, QLTabletRf1Test) {
1345
0
  FLAGS_db_write_buffer_size = 20_KB;
1346
1347
0
  TestWorkload workload(cluster_.get());
1348
0
  workload.set_table_name(kTable1Name);
1349
0
  workload.set_write_timeout_millis(30000);
1350
0
  workload.set_num_tablets(1);
1351
0
  workload.set_num_write_threads(2);
1352
0
  workload.set_write_batch_size(1);
1353
0
  workload.set_payload_bytes(16);
1354
0
  workload.Setup();
1355
1356
0
  LOG(INFO) << "Starting workload ...";
1357
0
  Stopwatch s(Stopwatch::ALL_THREADS);
1358
0
  s.start();
1359
0
  workload.Start();
1360
1361
0
  const auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
1362
0
  ASSERT_EQ(peers.size(), 1);
1363
0
  const auto& tablet = *ASSERT_NOTNULL(peers[0]->tablet());
1364
1365
  // We want some compactions to happen, so largest SST file will become large enough for its
1366
  // approximate middle key to roughly split the whole tablet into two parts that are close in size.
1367
0
  while (tablet.TEST_db()->GetCurrentVersionDataSstFilesSize() <
1368
0
         implicit_cast<size_t>(20 * FLAGS_db_write_buffer_size)) {
1369
0
    std::this_thread::sleep_for(100ms);
1370
0
  }
1371
1372
0
  workload.StopAndJoin();
1373
0
  s.stop();
1374
0
  LOG(INFO) << "Workload stopped, it took: " << AsString(s.elapsed());
1375
1376
0
  LOG(INFO) << "Rows inserted: " << workload.rows_inserted();
1377
0
  LOG(INFO) << "Number of SST files: " << tablet.TEST_db()->GetCurrentVersionNumSSTFiles();
1378
1379
0
  ASSERT_OK(cluster_->FlushTablets());
1380
1381
0
  const auto encoded_split_key = ASSERT_RESULT(tablet.GetEncodedMiddleSplitKey());
1382
0
  LOG(INFO) << "Encoded split key: " << Slice(encoded_split_key).ToDebugString();
1383
1384
0
  if (tablet.metadata()->partition_schema()->IsHashPartitioning()) {
1385
0
    docdb::DocKey split_key;
1386
0
    Slice key_slice = encoded_split_key;
1387
0
    ASSERT_OK(split_key.DecodeFrom(&key_slice, docdb::DocKeyPart::kUpToHashCode));
1388
0
    ASSERT_TRUE(key_slice.empty()) << "Extra bytes after decoding: " << key_slice.ToDebugString();
1389
0
    ASSERT_EQ(split_key.hashed_group().size() + split_key.range_group().size(), 0)
1390
0
        << "Hash-based partition: middle key should only have encoded hash code";
1391
0
    LOG(INFO) << "Split key: " << AsString(split_key);
1392
0
  } else {
1393
0
    docdb::SubDocKey split_key;
1394
0
    ASSERT_OK(split_key.FullyDecodeFrom(encoded_split_key, docdb::HybridTimeRequired::kFalse));
1395
0
    ASSERT_EQ(split_key.num_subkeys(), 0)
1396
0
        << "Range-based partition: middle doc key should not have sub doc key components";
1397
0
    LOG(INFO) << "Split key: " << AsString(split_key);
1398
0
  }
1399
1400
  // Checking number of keys less/bigger than the approximate middle key.
1401
0
  size_t total_keys = 0;
1402
0
  size_t num_keys_less = 0;
1403
1404
0
  rocksdb::ReadOptions read_opts;
1405
0
  read_opts.query_id = rocksdb::kDefaultQueryId;
1406
0
  std::unique_ptr<rocksdb::Iterator> iter(tablet.TEST_db()->NewIterator(read_opts));
1407
1408
0
  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1409
0
    Slice key = iter->key();
1410
0
    if (key.Less(encoded_split_key)) {
1411
0
      ++num_keys_less;
1412
0
    }
1413
0
    ++total_keys;
1414
0
  }
1415
1416
0
  LOG(INFO) << "Total keys: " << total_keys;
1417
0
  LOG(INFO) << "Number of keys less than approximate middle key: " << num_keys_less;
1418
0
  const auto num_keys_less_percent = 100 * num_keys_less / total_keys;
1419
1420
0
  LOG(INFO) << Format(
1421
0
      "Number of keys less than approximate middle key: $0 ($1%)", num_keys_less,
1422
0
      num_keys_less_percent);
1423
1424
0
  ASSERT_GE(num_keys_less_percent, 40);
1425
0
  ASSERT_LE(num_keys_less_percent, 60);
1426
0
}
1427
1428
namespace {
1429
1430
0
std::vector<OpId> GetLastAppliedOpIds(const std::vector<tablet::TabletPeerPtr>& peers) {
1431
0
  std::vector<OpId> last_applied_op_ids;
1432
0
  for (auto& peer : peers) {
1433
0
    const auto last_applied_op_id = peer->consensus()->GetLastAppliedOpId();
1434
0
    VLOG(1) << "Peer: " << AsString(peer->permanent_uuid())
1435
0
            << ", last applied op ID: " << AsString(last_applied_op_id);
1436
0
    last_applied_op_ids.push_back(last_applied_op_id);
1437
0
  }
1438
0
  return last_applied_op_ids;
1439
0
}
1440
1441
0
Result<OpId> GetAllAppliedOpId(const std::vector<tablet::TabletPeerPtr>& peers) {
1442
0
  for (auto& peer : peers) {
1443
0
    if (peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
1444
0
      return peer->raft_consensus()->GetAllAppliedOpId();
1445
0
    }
1446
0
  }
1447
0
  return STATUS(NotFound, "No leader found");
1448
0
}
1449
1450
CHECKED_STATUS WaitForAppliedOpIdsStabilized(
1451
0
    const std::vector<tablet::TabletPeerPtr>& peers, const MonoDelta& timeout) {
1452
0
  std::vector<OpId> prev_last_applied_op_ids;
1453
0
  return WaitFor(
1454
0
      [&]() {
1455
0
        std::vector<OpId> last_applied_op_ids = GetLastAppliedOpIds(peers);
1456
0
        LOG(INFO) << "last_applied_op_ids: " << AsString(last_applied_op_ids);
1457
0
        if (last_applied_op_ids == prev_last_applied_op_ids) {
1458
0
          return true;
1459
0
        }
1460
0
        prev_last_applied_op_ids = last_applied_op_ids;
1461
0
        return false;
1462
0
      },
1463
0
      timeout, "Waiting for applied op IDs to stabilize", 2000ms * kTimeMultiplier, 1);
1464
0
}
1465
1466
} // namespace
1467
1468
0
TEST_F(QLTabletTest, LastAppliedOpIdTracking) {
1469
0
  constexpr auto kAppliesTimeout = 10s * kTimeMultiplier;
1470
1471
0
  TableHandle table;
1472
0
  CreateTable(kTable1Name, &table, /* num_tablets =*/1);
1473
0
  auto session = client_->NewSession();
1474
0
  session->SetTimeout(60s);
1475
1476
0
  LOG(INFO) << "Writing data...";
1477
0
  int key = 0;
1478
0
  for (; key < 10; ++key) {
1479
0
    SetValue(session, key, key, table);
1480
0
  }
1481
0
  LOG(INFO) << "Writing completed";
1482
1483
0
  auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll);
1484
1485
0
  ASSERT_OK(WaitForAppliedOpIdsStabilized(peers, kAppliesTimeout));
1486
0
  auto last_applied_op_ids = GetLastAppliedOpIds(peers);
1487
0
  LOG(INFO) << "last_applied_op_ids: " << AsString(last_applied_op_ids);
1488
0
  auto all_applied_op_id = ASSERT_RESULT(GetAllAppliedOpId(peers));
1489
0
  LOG(INFO) << "all_applied_op_id: " << AsString(all_applied_op_id);
1490
0
  for (const auto& last_applied_op_id : last_applied_op_ids) {
1491
0
    ASSERT_EQ(last_applied_op_id, all_applied_op_id);
1492
0
  }
1493
1494
0
  LOG(INFO) << "Shutting down TS-0";
1495
0
  cluster_->mini_tablet_server(0)->Shutdown();
1496
1497
0
  peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll);
1498
1499
0
  LOG(INFO) << "Writing more data...";
1500
0
  for (; key < 20; ++key) {
1501
0
    SetValue(session, key, key, table);
1502
0
  }
1503
0
  LOG(INFO) << "Writing completed";
1504
1505
0
  ASSERT_OK(WaitForAppliedOpIdsStabilized(peers, kAppliesTimeout));
1506
0
  auto new_all_applied_op_id = ASSERT_RESULT(GetAllAppliedOpId(peers));
1507
  // We expect turned off TS to lag behind and not let all applied OP ids to advance.
1508
  // In case TS-0 was leader, all_applied_op_id will be 0 on a new leader until it hears from TS-0.
1509
0
  ASSERT_TRUE(new_all_applied_op_id == all_applied_op_id || new_all_applied_op_id.empty());
1510
1511
  // Save max applied op ID.
1512
0
  last_applied_op_ids = GetLastAppliedOpIds(peers);
1513
0
  auto max_applied_op_id = OpId::Min();
1514
0
  for (const auto& last_applied_op_id : last_applied_op_ids) {
1515
0
    max_applied_op_id = std::max(max_applied_op_id, last_applied_op_id);
1516
0
  }
1517
0
  ASSERT_GT(max_applied_op_id, all_applied_op_id);
1518
1519
0
  LOG(INFO) << "Restarting TS-0";
1520
0
  ASSERT_OK(cluster_->mini_tablet_server(0)->Start());
1521
1522
  // TS-0 should catch up on applied ops.
1523
0
  ASSERT_OK(WaitFor(
1524
0
      [&]() -> Result<bool> {
1525
0
        return VERIFY_RESULT(GetAllAppliedOpId(peers)) == max_applied_op_id;
1526
0
      },
1527
0
      kAppliesTimeout, "Waiting for all ops to apply"));
1528
0
  last_applied_op_ids = GetLastAppliedOpIds(peers);
1529
0
  for (const auto& last_applied_op_id : last_applied_op_ids) {
1530
0
    ASSERT_EQ(last_applied_op_id, max_applied_op_id);
1531
0
  }
1532
0
}
1533
1534
0
TEST_F(QLTabletTest, SlowPrepare) {
1535
0
  FLAGS_TEST_preparer_batch_inject_latency_ms = 100;
1536
1537
0
  const int kNumTablets = 1;
1538
1539
0
  auto session = client_->NewSession();
1540
0
  session->SetTimeout(60s);
1541
1542
0
  TestWorkload workload(cluster_.get());
1543
0
  workload.set_table_name(kTable1Name);
1544
0
  workload.set_write_timeout_millis(30000 * kTimeMultiplier);
1545
0
  workload.set_num_tablets(kNumTablets);
1546
0
  workload.set_num_write_threads(2);
1547
0
  workload.set_write_batch_size(1);
1548
0
  workload.Setup();
1549
0
  workload.Start();
1550
1551
0
  std::this_thread::sleep_for(2s);
1552
0
  StepDownAllTablets(cluster_.get());
1553
1554
0
  workload.StopAndJoin();
1555
0
}
1556
1557
0
TEST_F(QLTabletTest, ElectUnsynchronizedFollower) {
1558
0
  TableHandle table;
1559
0
  CreateTable(kTable1Name, &table, 1);
1560
1561
0
  auto unsynchronized_follower = cluster_->mini_tablet_server(0)->server()->permanent_uuid();
1562
0
  LOG(INFO) << "Unsynchronized follower: " << unsynchronized_follower;
1563
0
  cluster_->mini_tablet_server(0)->Shutdown();
1564
1565
0
  auto session = CreateSession();
1566
0
  SetValue(session, 1, -1, table);
1567
1568
0
  auto leader_idx = ASSERT_RESULT(ServerWithLeaders(cluster_.get()));
1569
0
  LOG(INFO) << "Leader: " << cluster_->mini_tablet_server(leader_idx)->server()->permanent_uuid();
1570
0
  auto follower_idx = 1 ^ 2 ^ leader_idx;
1571
0
  LOG(INFO) << "Turning off follower: "
1572
0
            << cluster_->mini_tablet_server(follower_idx)->server()->permanent_uuid();
1573
0
  cluster_->mini_tablet_server(follower_idx)->Shutdown();
1574
0
  auto peers =
1575
0
      cluster_->mini_tablet_server(leader_idx)->server()->tablet_manager()->GetTabletPeers();
1576
0
  ASSERT_EQ(peers.size(), 1);
1577
0
  {
1578
0
    google::FlagSaver flag_saver;
1579
0
    consensus::LeaderStepDownRequestPB req;
1580
0
    req.set_tablet_id(peers.front()->tablet_id());
1581
0
    req.set_force_step_down(true);
1582
0
    req.set_new_leader_uuid(unsynchronized_follower);
1583
0
    consensus::LeaderStepDownResponsePB resp;
1584
1585
0
    FLAGS_leader_failure_max_missed_heartbeat_periods = 10000;
1586
0
    ASSERT_OK(peers.front()->raft_consensus()->StepDown(&req, &resp));
1587
0
    ASSERT_FALSE(resp.has_error()) << resp.error().ShortDebugString();
1588
0
  }
1589
1590
0
  ASSERT_OK(cluster_->mini_tablet_server(0)->Start());
1591
1592
0
  ASSERT_NO_FATALS(SetValue(session, 2, -2, table));
1593
1594
0
  ASSERT_OK(cluster_->mini_tablet_server(follower_idx)->Start());
1595
0
}
1596
1597
0
TEST_F(QLTabletTest, FollowerRestartDuringWrite) {
1598
0
  TableHandle table;
1599
0
  CreateTable(kTable1Name, &table, 1);
1600
1601
0
  for (auto iter = 0; iter != 6; ++iter) {
1602
0
    auto session = CreateSession();
1603
0
    SetValue(session, 1, -1, table);
1604
1605
0
    auto leader_idx = ASSERT_RESULT(ServerWithLeaders(cluster_.get()));
1606
0
    LOG(INFO) << "Leader: " << cluster_->mini_tablet_server(leader_idx)->server()->permanent_uuid();
1607
0
    auto follower_idx = (leader_idx + 1) % cluster_->num_tablet_servers();
1608
0
    auto follower = cluster_->mini_tablet_server(follower_idx)->server();
1609
0
    LOG(INFO) << "Follower: "  << follower->permanent_uuid();
1610
0
    auto follower_peers = follower->tablet_manager()->GetTabletPeers();
1611
0
    for (const auto& peer : follower_peers) {
1612
0
      peer->raft_consensus()->TEST_DelayUpdate(FLAGS_raft_heartbeat_interval_ms / 2 * 1ms);
1613
0
    }
1614
1615
0
    SetValue(session, 2, -2, table);
1616
0
    std::this_thread::sleep_for(FLAGS_raft_heartbeat_interval_ms / 2 * 1ms);
1617
0
    SetValue(session, 3, -3, table);
1618
1619
    // Shutdown follower, so it would not accept updates and exponential backoff will turn to send
1620
    // empty operations.
1621
0
    cluster_->mini_tablet_server(follower_idx)->Shutdown();
1622
1623
    // Wait exponential backoff goes to empty operations.
1624
0
    std::this_thread::sleep_for(FLAGS_raft_heartbeat_interval_ms * 3ms);
1625
1626
0
    SetValue(session, 4, -4, table);
1627
1628
0
    ASSERT_OK(cluster_->mini_tablet_server(follower_idx)->Start());
1629
1630
    // Wait until newly started follower receive a new operation.
1631
    // Without fix for GH #7145 it would crash in this case.
1632
0
    std::this_thread::sleep_for(FLAGS_raft_heartbeat_interval_ms * 3ms);
1633
1634
0
    ASSERT_OK(cluster_->RestartSync());
1635
0
  }
1636
0
}
1637
1638
0
TEST_F_EX(QLTabletTest, DataBlockKeyValueEncoding, QLTabletRf1Test) {
1639
0
  constexpr auto kNumRows = 4000;
1640
0
  constexpr auto kNumRowsPerBatch = 100;
1641
  // We are testing delta encoding, but not compression.
1642
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_compression_type) = "NoCompression";
1643
1644
0
  struct SstSizes {
1645
0
    size_t regular_table = 0;
1646
0
    size_t index_table = 0;
1647
0
  };
1648
0
  std::unordered_map<rocksdb::KeyValueEncodingFormat, SstSizes, EnumHash> sst_sizes;
1649
1650
0
  constexpr auto kEncodingSharedPrefix =
1651
0
      rocksdb::KeyValueEncodingFormat::kKeyDeltaEncodingSharedPrefix;
1652
0
  constexpr auto kEncodingThreeSharedParts =
1653
0
      rocksdb::KeyValueEncodingFormat::kKeyDeltaEncodingThreeSharedParts;
1654
1655
0
  for (auto encoding : {kEncodingSharedPrefix, kEncodingThreeSharedParts}) {
1656
0
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_regular_tablets_data_block_key_value_encoding) =
1657
0
        KeyValueEncodingFormatToString(encoding);
1658
0
    const YBTableName table_name(
1659
0
        YQL_DATABASE_CQL, "my_keyspace", Format("ql_client_test_table_$0", encoding));
1660
0
    TableHandle table;
1661
0
    CreateTable(table_name, &table, /* num_tablets = */ 1);
1662
0
    ASSERT_OK(BatchedFillTable(/* begin = */ 0, /* end = */ kNumRows, kNumRowsPerBatch, table));
1663
1664
0
    TableHandle index_table;
1665
0
    kv_table_test::CreateIndex(
1666
0
        yb::client::Transactional::kTrue, /* indexed_column_index = */ 1,
1667
0
        /* use_mangled_names = */ false, table, client_.get(), &index_table);
1668
0
    ASSERT_OK(client_->WaitUntilIndexPermissionsAtLeast(
1669
0
        table_name, index_table.name(), INDEX_PERM_READ_WRITE_AND_DELETE, /* max_wait = */ 10s));
1670
1671
0
    ASSERT_OK(cluster_->FlushTablets());
1672
1673
0
    auto get_tablet_size = [](tablet::Tablet* tablet) -> Result<size_t> {
1674
0
      RETURN_NOT_OK(tablet->Flush(tablet::FlushMode::kSync));
1675
0
      RETURN_NOT_OK(tablet->ForceFullRocksDBCompact());
1676
0
      return tablet->GetCurrentVersionSstFilesSize();
1677
0
    };
1678
1679
0
    for (const auto& tablet_peer : ListTableTabletPeers(cluster_.get(), table->id())) {
1680
0
      sst_sizes[encoding].regular_table += ASSERT_RESULT(get_tablet_size(tablet_peer->tablet()));
1681
0
    }
1682
0
    for (const auto& tablet_peer : ListTableTabletPeers(cluster_.get(), index_table->id())) {
1683
0
      sst_sizes[encoding].index_table += ASSERT_RESULT(get_tablet_size(tablet_peer->tablet()));
1684
0
    }
1685
0
  }
1686
0
  ASSERT_GT(
1687
0
      1.0 * sst_sizes[kEncodingSharedPrefix].regular_table /
1688
0
          sst_sizes[kEncodingThreeSharedParts].regular_table,
1689
0
      1.3);
1690
0
  ASSERT_EQ(sst_sizes[kEncodingSharedPrefix].index_table,
1691
0
            sst_sizes[kEncodingThreeSharedParts].index_table);
1692
0
}
1693
1694
} // namespace client
1695
} // namespace yb