YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/alter_table-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <map>
34
#include <string>
35
#include <utility>
36
37
#include <gflags/gflags.h>
38
#include <gtest/gtest.h>
39
40
#include "yb/client/client-test-util.h"
41
#include "yb/client/client.h"
42
#include "yb/client/error.h"
43
#include "yb/client/schema.h"
44
#include "yb/client/session.h"
45
#include "yb/client/table.h"
46
#include "yb/client/table_alterer.h"
47
#include "yb/client/table_creator.h"
48
#include "yb/client/table_handle.h"
49
#include "yb/client/yb_op.h"
50
51
#include "yb/common/ql_value.h"
52
#include "yb/common/schema.h"
53
54
#include "yb/consensus/log.h"
55
56
#include "yb/gutil/stl_util.h"
57
#include "yb/gutil/strings/join.h"
58
#include "yb/gutil/strings/substitute.h"
59
60
#include "yb/integration-tests/mini_cluster.h"
61
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
62
63
#include "yb/master/mini_master.h"
64
#include "yb/master/sys_catalog.h"
65
66
#include "yb/tablet/tablet.h"
67
#include "yb/tablet/tablet_metadata.h"
68
#include "yb/tablet/tablet_peer.h"
69
70
#include "yb/tserver/mini_tablet_server.h"
71
#include "yb/tserver/tablet_server.h"
72
#include "yb/tserver/ts_tablet_manager.h"
73
74
#include "yb/util/atomic.h"
75
#include "yb/util/faststring.h"
76
#include "yb/util/metrics.h"
77
#include "yb/util/random.h"
78
#include "yb/util/random_util.h"
79
#include "yb/util/status_log.h"
80
#include "yb/util/test_util.h"
81
#include "yb/util/thread.h"
82
83
using namespace std::literals;
84
85
DECLARE_bool(enable_data_block_fsync);
86
DECLARE_bool(enable_maintenance_manager);
87
DECLARE_bool(enable_ysql);
88
DECLARE_bool(flush_rocksdb_on_shutdown);
89
DECLARE_int32(heartbeat_interval_ms);
90
DECLARE_bool(use_hybrid_clock);
91
DECLARE_int32(ht_lease_duration_ms);
92
DECLARE_int32(replication_factor);
93
DECLARE_int32(log_min_seconds_to_retain);
94
DECLARE_int32(catalog_manager_report_batch_size);
95
96
METRIC_DECLARE_counter(sys_catalog_peer_write_count);
97
98
namespace yb {
99
100
using client::YBClient;
101
using client::YBClientBuilder;
102
using client::YBColumnSchema;
103
using client::YBError;
104
using client::YBSchema;
105
using client::YBSchemaBuilder;
106
using client::YBSession;
107
using client::YBTable;
108
using client::YBTableAlterer;
109
using client::YBTableCreator;
110
using client::YBTableName;
111
using client::YBTableType;
112
using client::YBValue;
113
using std::shared_ptr;
114
using master::AlterTableRequestPB;
115
using master::AlterTableResponsePB;
116
using master::MiniMaster;
117
using std::map;
118
using std::pair;
119
using std::vector;
120
using tablet::TabletPeer;
121
using tserver::MiniTabletServer;
122
123
class AlterTableTest : public YBMiniClusterTestBase<MiniCluster>,
124
                       public ::testing::WithParamInterface<int> {
125
 public:
126
  AlterTableTest()
127
    : stop_threads_(false),
128
36
      inserted_idx_(0) {
129
130
36
    YBSchemaBuilder b;
131
36
    b.AddColumn("c0")->Type(INT32)->NotNull()->HashPrimaryKey();
132
36
    b.AddColumn("c1")->Type(INT32)->NotNull();
133
36
    CHECK_OK(b.Build(&schema_));
134
135
36
    FLAGS_enable_data_block_fsync = false; // Keep unit tests fast.
136
36
    FLAGS_use_hybrid_clock = false;
137
36
    FLAGS_ht_lease_duration_ms = 0;
138
36
    FLAGS_enable_ysql = false;
139
36
    ANNOTATE_BENIGN_RACE(&FLAGS_enable_maintenance_manager,
140
36
                         "safe to change at runtime");
141
36
  }
142
143
36
  void SetUp() override {
144
    // Make heartbeats faster to speed test runtime.
145
36
    FLAGS_heartbeat_interval_ms = 10;
146
147
36
    FLAGS_catalog_manager_report_batch_size = GetParam();
148
149
36
    YBMiniClusterTestBase::SetUp();
150
151
36
    MiniClusterOptions opts;
152
36
    opts.num_tablet_servers = num_replicas();
153
36
    FLAGS_replication_factor = num_replicas();
154
36
    cluster_.reset(new MiniCluster(opts));
155
36
    ASSERT_OK(cluster_->Start());
156
32
    ASSERT_OK(cluster_->WaitForTabletServerCount(num_replicas()));
157
158
32
    client_ = CHECK_RESULT(YBClientBuilder()
159
32
        .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str())
160
32
        .default_admin_operation_timeout(MonoDelta::FromSeconds(60))
161
32
        .Build());
162
163
32
    CHECK_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(),
164
32
                                                 kTableName.namespace_type()));
165
166
    // Add a table, make sure it reports itself.
167
32
    std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
168
32
    CHECK_OK(table_creator->table_name(kTableName)
169
32
             .schema(&schema_)
170
32
             .table_type(YBTableType::YQL_TABLE_TYPE)
171
32
             .num_tablets(1)
172
32
             .Create());
173
174
32
    if (num_replicas() == 1) {
175
0
      tablet_peer_ = LookupTabletPeer();
176
0
    }
177
32
    LOG(INFO) << "Tablet successfully located";
178
32
  }
179
180
16
  void DoTearDown() override {
181
16
    client_.reset();
182
16
    tablet_peer_.reset();
183
16
    cluster_->Shutdown();
184
16
  }
185
186
0
  std::shared_ptr<TabletPeer> LookupTabletPeer() {
187
0
    auto peers = cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletPeers();
188
0
    return peers[0];
189
0
  }
190
191
0
  void ShutdownTS(int idx = 0) {
192
    // Drop the tablet_peer_ reference since the tablet peer becomes invalid once
193
    // we shut down the server. Additionally, if we hold onto the reference,
194
    // we'll end up calling the destructor from the test code instead of the
195
    // normal location, which can cause crashes, etc.
196
0
    tablet_peer_.reset();
197
0
    if (cluster_->mini_tablet_server(idx)->server() != nullptr) {
198
0
      cluster_->mini_tablet_server(idx)->Shutdown();
199
0
    }
200
0
  }
201
202
0
  void RestartTabletServer(int idx = 0) {
203
0
    tablet_peer_.reset();
204
0
    if (cluster_->mini_tablet_server(idx)->server()) {
205
0
      ASSERT_OK(cluster_->mini_tablet_server(idx)->Restart());
206
0
    } else {
207
0
      ASSERT_OK(cluster_->mini_tablet_server(idx)->Start());
208
0
    }
209
210
0
    ASSERT_OK(cluster_->mini_tablet_server(idx)->WaitStarted());
211
0
    if (idx == 0) {
212
0
      tablet_peer_ = LookupTabletPeer();
213
0
    }
214
0
  }
215
216
0
  Status WaitAlterTableCompletion(const YBTableName& table_name, int attempts) {
217
0
    int wait_time = 1000;
218
0
    for (int i = 0; i < attempts; ++i) {
219
0
      bool in_progress;
220
0
      string table_id;
221
0
      RETURN_NOT_OK(client_->IsAlterTableInProgress(table_name, table_id, &in_progress));
222
0
      if (!in_progress) {
223
0
        return Status::OK();
224
0
      }
225
226
0
      SleepFor(MonoDelta::FromMicroseconds(wait_time));
227
0
      wait_time = std::min(wait_time * 5 / 4, 1000000);
228
0
    }
229
230
0
    return STATUS(TimedOut, "AlterTable not completed within the timeout");
231
0
  }
232
233
  Status AddNewI32Column(const YBTableName& table_name,
234
0
                         const string& column_name) {
235
0
    return AddNewI32Column(table_name, column_name, MonoDelta::FromSeconds(60));
236
0
  }
237
238
  Status AddNewI32Column(const YBTableName& table_name,
239
                         const string& column_name,
240
0
                         const MonoDelta& timeout) {
241
0
    std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(table_name));
242
0
    table_alterer->AddColumn(column_name)->Type(INT32)->NotNull();
243
0
    return table_alterer->timeout(timeout)->Alter();
244
0
  }
245
246
  enum VerifyPattern {
247
    C1_MATCHES_INDEX,
248
    C1_IS_DEADBEEF,
249
    C1_DOESNT_EXIST
250
  };
251
252
  void VerifyRows(int start_row, int num_rows, VerifyPattern pattern);
253
254
  void InsertRows(int start_row, int num_rows);
255
256
  void UpdateRow(int32_t row_key, const map<string, int32_t>& updates);
257
258
  std::vector<std::string> ScanToStrings();
259
260
  void WriteThread(QLWriteRequestPB::QLStmtType type);
261
  void ScannerThread();
262
263
0
  Status CreateTable(const YBTableName& table_name) {
264
0
    RETURN_NOT_OK(client_->CreateNamespaceIfNotExists(table_name.namespace_name(),
265
0
                                                      table_name.namespace_type()));
266
267
0
    std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
268
0
    return table_creator->table_name(table_name)
269
0
        .schema(&schema_)
270
0
        .num_tablets(10)
271
0
        .Create();
272
0
  }
273
274
0
  int64_t GetSysCatalogWrites() {
275
0
    auto GetSysCatalogMetric = [&](CounterPrototype& prototype) -> int64_t {
276
0
      auto metrics = cluster_->mini_master()->sys_catalog().GetMetricEntity();
277
0
      return prototype.Instantiate(metrics)->value();
278
0
    };
279
0
    return GetSysCatalogMetric(METRIC_sys_catalog_peer_write_count);
280
0
  }
281
282
 protected:
283
96
  virtual int num_replicas() const { return 1; }
284
285
  static const YBTableName kTableName;
286
287
  std::unique_ptr<YBClient> client_;
288
289
  YBSchema schema_;
290
291
  std::shared_ptr<TabletPeer> tablet_peer_;
292
293
  AtomicBool stop_threads_;
294
295
  // The index of the last row inserted by InserterThread.
296
  // UpdaterThread uses this to figure out which rows can be
297
  // safely updated.
298
  AtomicInt<int32_t> inserted_idx_;
299
};
300
301
// Subclass which creates three servers and a replicated cluster.
302
class ReplicatedAlterTableTest : public AlterTableTest {
303
 protected:
304
8
  virtual int num_replicas() const override { return 3; }
305
};
306
307
const YBTableName AlterTableTest::kTableName(YQL_DATABASE_CQL, "my_keyspace", "fake-table");
308
309
INSTANTIATE_TEST_CASE_P(BatchSize, AlterTableTest, ::testing::Values(1, 10));
310
INSTANTIATE_TEST_CASE_P(BatchSize, ReplicatedAlterTableTest, ::testing::Values(1, 10));
311
312
// Simple test to verify that the "alter table" command sent and executed
313
// on the TS handling the tablet of the altered table.
314
// TODO: create and verify multiple tablets when the client will support that.
315
0
TEST_P(AlterTableTest, TestTabletReports) {
316
0
  ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version());
317
0
  ASSERT_OK(AddNewI32Column(kTableName, "new-i32"));
318
0
  ASSERT_EQ(1, tablet_peer_->tablet()->metadata()->schema_version());
319
0
}
320
321
// Verify that adding an existing column will return an "already present" error
322
0
TEST_P(AlterTableTest, TestAddExistingColumn) {
323
0
  ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version());
324
325
0
  {
326
0
    Status s = AddNewI32Column(kTableName, "c1");
327
0
    ASSERT_TRUE(s.IsAlreadyPresent());
328
0
    ASSERT_STR_CONTAINS(s.ToString(), "The column already exists: c1");
329
0
  }
330
331
0
  ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version());
332
0
}
333
334
// Adding a nullable column with no default value should be equivalent
335
// to a NULL default.
336
0
TEST_P(AlterTableTest, TestAddNullableColumnWithoutDefault) {
337
0
  InsertRows(0, 1);
338
0
  ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync));
339
340
0
  {
341
0
    std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
342
0
    table_alterer->AddColumn("new")->Type(INT32);
343
0
    ASSERT_OK(table_alterer->Alter());
344
0
  }
345
346
0
  InsertRows(1, 1);
347
348
0
  vector<string> rows = ScanToStrings();
349
0
  EXPECT_EQ(2, rows.size());
350
0
  EXPECT_EQ("{ int32:0, int32:0, null }", rows[0]);
351
0
  EXPECT_EQ("{ int32:16777216, int32:1, null }", rows[1]);
352
0
}
353
354
// Verify that, if a tablet server is down when an alter command is issued,
355
// it will eventually receive the command when it restarts.
356
0
TEST_P(AlterTableTest, TestAlterOnTSRestart) {
357
0
  ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version());
358
359
0
  ShutdownTS();
360
361
  // Send the Alter request
362
0
  {
363
0
    Status s = AddNewI32Column(kTableName, "new-32", MonoDelta::FromMilliseconds(500));
364
0
    ASSERT_TRUE(s.IsTimedOut());
365
0
  }
366
367
  // Verify that the Schema is the old one
368
0
  YBSchema schema;
369
0
  PartitionSchema partition_schema;
370
0
  bool alter_in_progress = false;
371
0
  string table_id;
372
0
  ASSERT_OK(client_->GetTableSchema(kTableName, &schema, &partition_schema));
373
0
  ASSERT_TRUE(schema_.Equals(schema));
374
0
  ASSERT_OK(client_->IsAlterTableInProgress(kTableName, table_id, &alter_in_progress));
375
0
  ASSERT_TRUE(alter_in_progress);
376
377
  // Restart the TS and wait for the new schema
378
0
  RestartTabletServer();
379
0
  ASSERT_OK(WaitAlterTableCompletion(kTableName, 50));
380
0
  ASSERT_EQ(1, tablet_peer_->tablet()->metadata()->schema_version());
381
0
}
382
383
// Verify that nothing is left behind on cluster shutdown with pending async tasks
384
0
TEST_P(AlterTableTest, TestShutdownWithPendingTasks) {
385
0
  DontVerifyClusterBeforeNextTearDown();
386
0
  ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version());
387
388
0
  ShutdownTS();
389
390
  // Send the Alter request
391
0
  {
392
0
    Status s = AddNewI32Column(kTableName, "new-i32", MonoDelta::FromMilliseconds(500));
393
0
    ASSERT_TRUE(s.IsTimedOut());
394
0
  }
395
0
}
396
397
// Verify that the new schema is applied/reported even when
398
// the TS is going down with the alter operation in progress.
399
// On TS restart the master should:
400
//  - get the new schema state, and mark the alter as complete
401
//  - get the old schema state, and ask the TS again to perform the alter.
402
0
TEST_P(AlterTableTest, TestRestartTSDuringAlter) {
403
0
  if (!AllowSlowTests()) {
404
0
    LOG(INFO) << "Skipping slow test";
405
0
    return;
406
0
  }
407
408
0
  ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version());
409
410
0
  Status s = AddNewI32Column(kTableName, "new-i32", MonoDelta::FromMilliseconds(1));
411
0
  ASSERT_TRUE(s.IsTimedOut());
412
413
  // Restart the TS while alter is running
414
0
  for (int i = 0; i < 3; i++) {
415
0
    SleepFor(MonoDelta::FromMicroseconds(500));
416
0
    RestartTabletServer();
417
0
  }
418
419
  // Wait for the new schema
420
0
  ASSERT_OK(WaitAlterTableCompletion(kTableName, 50));
421
0
  ASSERT_EQ(1, tablet_peer_->tablet()->metadata()->schema_version());
422
0
}
423
424
0
TEST_P(AlterTableTest, TestGetSchemaAfterAlterTable) {
425
0
  ASSERT_OK(AddNewI32Column(kTableName, "new-i32"));
426
427
0
  YBSchema s;
428
0
  PartitionSchema partition_schema;
429
0
  ASSERT_OK(client_->GetTableSchema(kTableName, &s, &partition_schema));
430
0
}
431
432
0
void AlterTableTest::InsertRows(int start_row, int num_rows) {
433
0
  shared_ptr<YBSession> session = client_->NewSession();
434
0
  session->SetTimeout(15s);
435
0
  client::TableHandle table;
436
0
  ASSERT_OK(table.Open(kTableName, client_.get()));
437
0
  std::vector<std::shared_ptr<client::YBqlOp>> ops;
438
439
  // Insert a bunch of rows with the current schema
440
0
  for (int i = start_row; i < start_row + num_rows; i++) {
441
0
    auto op = table.NewInsertOp();
442
    // Endian-swap the key so that we spew inserts randomly
443
    // instead of just a sequential write pattern. This way
444
    // compactions may actually be triggered.
445
0
    int32_t key = bswap_32(i);
446
0
    auto req = op->mutable_request();
447
0
    QLAddInt32HashValue(req, key);
448
449
0
    if (table.schema().num_columns() > 1) {
450
0
      table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), i);
451
0
    }
452
453
0
    ops.push_back(op);
454
0
    session->Apply(op);
455
456
0
    if (ops.size() >= 50) {
457
0
      FlushSessionOrDie(session, ops);
458
0
      ops.clear();
459
0
    }
460
0
  }
461
462
0
  FlushSessionOrDie(session, ops);
463
0
}
464
465
void AlterTableTest::UpdateRow(int32_t row_key,
466
0
                               const map<string, int32_t>& updates) {
467
0
  shared_ptr<YBSession> session = client_->NewSession();
468
0
  session->SetTimeout(15s);
469
470
0
  client::TableHandle table;
471
0
  ASSERT_OK(table.Open(kTableName, client_.get()));
472
473
0
  auto update = table.NewUpdateOp();
474
0
  int32_t key = bswap_32(row_key); // endian swap to match 'InsertRows'
475
0
  QLAddInt32HashValue(update->mutable_request(), key);
476
0
  for (const auto& e : updates) {
477
0
    table.AddInt32ColumnValue(update->mutable_request(), e.first, e.second);
478
0
  }
479
0
  session->Apply(update);
480
0
  FlushSessionOrDie(session);
481
0
}
482
483
0
std::vector<string> AlterTableTest::ScanToStrings() {
484
0
  client::TableHandle table;
485
0
  EXPECT_OK(table.Open(kTableName, client_.get()));
486
0
  auto result = ScanTableToStrings(table);
487
0
  std::sort(result.begin(), result.end());
488
0
  return result;
489
0
}
490
491
// Verify that the 'num_rows' starting with 'start_row' fit the given pattern.
492
// Note that the 'start_row' here is not a row key, but the pre-transformation row
493
// key (InsertRows swaps endianness so that we random-write instead of sequential-write)
494
0
void AlterTableTest::VerifyRows(int start_row, int num_rows, VerifyPattern pattern) {
495
0
  client::TableHandle table;
496
0
  ASSERT_OK(table.Open(kTableName, client_.get()));
497
498
0
  int verified = 0;
499
0
  for (const auto& row : client::TableRange(table)) {
500
0
    int32_t key = row.column(0).int32_value();
501
0
    int32_t row_idx = bswap_32(key);
502
0
    if (row_idx < start_row || row_idx >= start_row + num_rows) {
503
      // Outside the range we're verifying
504
0
      continue;
505
0
    }
506
0
    verified++;
507
508
0
    switch (pattern) {
509
0
      case C1_MATCHES_INDEX:
510
0
        ASSERT_EQ(row_idx, row.column(1).int32_value());
511
0
        break;
512
0
      case C1_IS_DEADBEEF:
513
0
        ASSERT_TRUE(row.column(1).IsNull());
514
0
        break;
515
0
      case C1_DOESNT_EXIST:
516
0
        continue;
517
0
      default:
518
0
        ASSERT_TRUE(false) << "Invalid pattern: " << pattern;
519
0
        break;
520
0
    }
521
0
  }
522
0
  ASSERT_EQ(verified, num_rows);
523
0
}
524
525
// Test inserting/updating some data, dropping a column, and adding a new one
526
// with the same name. Data should not "reappear" from the old column.
527
//
528
// This is a regression test for KUDU-461.
529
0
TEST_P(AlterTableTest, TestDropAndAddNewColumn) {
530
  // Reduce flush threshold so that we get both on-disk data
531
  // for the alter as well as in-MRS data.
532
  // This also increases chances of a race.
533
0
  const int kNumRows = AllowSlowTests() ? 100000 : 1000;
534
0
  InsertRows(0, kNumRows);
535
536
0
  LOG(INFO) << "Verifying initial pattern";
537
0
  VerifyRows(0, kNumRows, C1_MATCHES_INDEX);
538
539
0
  LOG(INFO) << "Dropping and adding back c1";
540
0
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
541
0
  ASSERT_OK(table_alterer->DropColumn("c1")->Alter());
542
543
0
  ASSERT_OK(AddNewI32Column(kTableName, "c1"));
544
545
0
  LOG(INFO) << "Verifying that the new default shows up";
546
0
  VerifyRows(0, kNumRows, C1_IS_DEADBEEF);
547
0
}
548
549
0
TEST_P(AlterTableTest, DISABLED_TestCompactionAfterDrop) {
550
0
  LOG(INFO) << "Inserting rows";
551
0
  InsertRows(0, 3);
552
553
0
  std::string docdb_dump = tablet_peer_->tablet()->TEST_DocDBDumpStr();
554
  // DocDB should not be empty right now.
555
0
  ASSERT_NE(0, docdb_dump.length());
556
557
0
  LOG(INFO) << "Dropping c1";
558
0
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
559
0
  ASSERT_OK(table_alterer->DropColumn("c1")->Alter());
560
561
0
  LOG(INFO) << "Forcing compaction";
562
0
  tablet_peer_->tablet()->ForceRocksDBCompactInTest();
563
564
0
  docdb_dump = tablet_peer_->tablet()->TEST_DocDBDumpStr();
565
566
0
  LOG(INFO) << "Checking that docdb is empty";
567
0
  ASSERT_EQ("", docdb_dump);
568
569
0
  ASSERT_OK(cluster_->RestartSync());
570
0
  tablet_peer_ = LookupTabletPeer();
571
0
}
572
573
// This tests the scenario where the log entries immediately after last RocksDB flush are for a
574
// different schema than the one that was last flushed to the superblock.
575
0
TEST_P(AlterTableTest, TestLogSchemaReplay) {
576
0
  ASSERT_OK(AddNewI32Column(kTableName, "c2"));
577
0
  InsertRows(0, 2);
578
0
  UpdateRow(1, { {"c1", 0} });
579
580
0
  LOG(INFO) << "Flushing RocksDB";
581
0
  ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync));
582
583
0
  UpdateRow(0, { {"c1", 1}, {"c2", 10001} });
584
585
0
  LOG(INFO) << "Dropping c1";
586
0
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
587
0
  ASSERT_OK(table_alterer->DropColumn("c1")->Alter());
588
589
0
  UpdateRow(1, { {"c2", 10002} });
590
591
0
  auto rows = ScanToStrings();
592
0
  ASSERT_EQ(2, rows.size());
593
0
  ASSERT_EQ("{ int32:0, int32:10001 }", rows[0]);
594
0
  ASSERT_EQ("{ int32:16777216, int32:10002 }", rows[1]);
595
596
0
  google::FlagSaver flag_saver;
597
  // Restart without flushing RocksDB
598
0
  FLAGS_flush_rocksdb_on_shutdown = false;
599
0
  LOG(INFO) << "Restarting tablet";
600
0
  ASSERT_NO_FATALS(RestartTabletServer());
601
602
0
  rows = ScanToStrings();
603
0
  ASSERT_EQ(2, rows.size());
604
0
  ASSERT_EQ("{ int32:0, int32:10001 }", rows[0]);
605
0
  ASSERT_EQ("{ int32:16777216, int32:10002 }", rows[1]);
606
0
}
607
608
// Tests that a renamed table can still be altered. This is a regression test, we used to not carry
609
// over column ids after a table rename.
610
0
TEST_P(AlterTableTest, TestRenameTableAndAdd) {
611
0
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
612
0
  YBTableName new_name(kTableName.namespace_type(), kTableName.namespace_name(), "someothername");
613
0
  ASSERT_OK(table_alterer->RenameTo(new_name)
614
0
            ->Alter());
615
616
0
  ASSERT_OK(AddNewI32Column(new_name, "new"));
617
0
}
618
619
// Test restarting a tablet server several times after various
620
// schema changes.
621
// This is a regression test for KUDU-462.
622
TEST_P(AlterTableTest, TestBootstrapAfterAlters) {
623
  ASSERT_OK(AddNewI32Column(kTableName, "c2"));
624
  InsertRows(0, 1);
625
  ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync));
626
  InsertRows(1, 1);
627
628
  UpdateRow(0, { {"c1", 10001} });
629
  UpdateRow(1, { {"c1", 10002} });
630
631
  auto rows = ScanToStrings();
632
  ASSERT_EQ(2, rows.size());
633
  ASSERT_EQ("{ int32:0, int32:10001, null }", rows[0]);
634
  ASSERT_EQ("{ int32:16777216, int32:10002, null }", rows[1]);
635
636
  LOG(INFO) << "Dropping c1";
637
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
638
  ASSERT_OK(table_alterer->DropColumn("c1")->Alter());
639
640
  rows = ScanToStrings();
641
  ASSERT_EQ(2, rows.size());
642
  ASSERT_EQ("{ int32:0, null }", rows[0]);
643
  ASSERT_EQ("{ int32:16777216, null }", rows[1]);
644
645
  // Test that restart doesn't fail when trying to replay updates or inserts
646
  // with the dropped column.
647
  ASSERT_NO_FATALS(RestartTabletServer());
648
649
  rows = ScanToStrings();
650
  ASSERT_EQ(2, rows.size());
651
  ASSERT_EQ("{ int32:0, null }", rows[0]);
652
  ASSERT_EQ("{ int32:16777216, null }", rows[1]);
653
654
  // Add back a column called 'c2', but should not materialize old data.
655
  ASSERT_OK(AddNewI32Column(kTableName, "c1"));
656
  rows = ScanToStrings();
657
  ASSERT_EQ(2, rows.size());
658
  ASSERT_EQ("{ int32:0, null, null }", rows[0]);
659
  ASSERT_EQ("{ int32:16777216, null, null }", rows[1]);
660
661
  ASSERT_NO_FATALS(RestartTabletServer());
662
  rows = ScanToStrings();
663
  ASSERT_EQ(2, rows.size());
664
  ASSERT_EQ("{ int32:0, null, null }", rows[0]);
665
  ASSERT_EQ("{ int32:16777216, null, null }", rows[1]);
666
}
667
668
0
TEST_P(AlterTableTest, TestAlterWalRetentionSecs) {
669
0
  InsertRows(1, 1000);
670
0
  int kWalRetentionSecs = RandomUniformBool()
671
0
      ? FLAGS_log_min_seconds_to_retain / 2
672
0
      : FLAGS_log_min_seconds_to_retain * 2;
673
674
0
  LOG(INFO) << "Modifying wal retention time to " << kWalRetentionSecs;
675
0
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
676
677
0
  ASSERT_OK(table_alterer->SetWalRetentionSecs(kWalRetentionSecs)->Alter());
678
679
0
  int expected_wal_retention_secs = max(FLAGS_log_min_seconds_to_retain, kWalRetentionSecs);
680
681
0
  ASSERT_EQ(kWalRetentionSecs, tablet_peer_->tablet()->metadata()->wal_retention_secs());
682
0
  ASSERT_EQ(expected_wal_retention_secs, tablet_peer_->log()->wal_retention_secs());
683
684
  // Test that the wal retention time gets set correctly in the metadata and in the log objects.
685
0
  ASSERT_NO_FATALS(RestartTabletServer());
686
687
0
  ASSERT_EQ(kWalRetentionSecs, tablet_peer_->tablet()->metadata()->wal_retention_secs());
688
0
  ASSERT_EQ(expected_wal_retention_secs, tablet_peer_->log()->wal_retention_secs());
689
0
}
690
691
0
TEST_P(AlterTableTest, TestCompactAfterUpdatingRemovedColumn) {
692
  // Disable maintenance manager, since we manually flush/compact
693
  // in this test.
694
0
  FLAGS_enable_maintenance_manager = false;
695
696
0
  vector<string> rows;
697
698
0
  ASSERT_OK(AddNewI32Column(kTableName, "c2"));
699
0
  InsertRows(0, 1);
700
0
  ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync));
701
0
  InsertRows(1, 1);
702
0
  ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync));
703
704
705
0
  rows = ScanToStrings();
706
0
  ASSERT_EQ(2, rows.size());
707
0
  ASSERT_EQ("{ int32:0, int32:0, null }", rows[0]);
708
0
  ASSERT_EQ("{ int32:16777216, int32:1, null }", rows[1]);
709
710
  // Add a delta for c1.
711
0
  UpdateRow(0, { {"c1", 54321} });
712
713
  // Drop c1.
714
0
  LOG(INFO) << "Dropping c1";
715
0
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
716
0
  ASSERT_OK(table_alterer->DropColumn("c1")->Alter());
717
718
0
  rows = ScanToStrings();
719
0
  ASSERT_EQ(2, rows.size());
720
0
  ASSERT_EQ("{ int32:0, null }", rows[0]);
721
0
}
722
723
typedef std::vector<std::shared_ptr<client::YBqlOp>> Ops;
724
725
0
std::pair<bool, int> AnalyzeResponse(const Ops& ops) {
726
0
  std::pair<bool, int> result = { false, 0 };
727
0
  for (const auto& op : ops) {
728
0
    if (op->response().status() == QLResponsePB::YQL_STATUS_OK) {
729
0
      ++result.second;
730
0
    } else {
731
0
      if (QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH == op->response().status()) {
732
0
        result.first = true;
733
0
      }
734
0
    }
735
0
  }
736
0
  return result;
737
0
}
738
739
// Thread which inserts rows into the table.
740
// After each batch of rows is inserted, inserted_idx_ is updated
741
// to communicate how much data has been written (and should now be
742
// updateable)
743
0
void AlterTableTest::WriteThread(QLWriteRequestPB::QLStmtType type) {
744
0
  shared_ptr<YBSession> session = client_->NewSession();
745
0
  session->SetTimeout(15s);
746
747
0
  client::TableHandle table;
748
0
  ASSERT_OK(table.Open(kTableName, client_.get()));
749
0
  Ops ops;
750
0
  int32_t processed = 0;
751
0
  int32_t i = 0;
752
0
  Random rng(1);
753
0
  for (;;) {
754
0
    bool should_stop = stop_threads_.Load();
755
0
    if (!should_stop) {
756
0
      auto op = table.NewWriteOp(type);
757
0
      auto req = op->mutable_request();
758
      // Endian-swap the key so that we spew inserts randomly
759
      // instead of just a sequential write pattern. This way
760
      // compactions may actually be triggered.
761
762
0
      if (type == QLWriteRequestPB::QL_STMT_INSERT) {
763
0
        int32_t key = bswap_32(i++);
764
0
        QLAddInt32HashValue(req, key);
765
0
        table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), i);
766
0
      } else {
767
0
        int32_t max = inserted_idx_.Load();
768
0
        if (max == 0) {
769
          // Inserter hasn't inserted anything yet, so we have nothing to update.
770
0
          SleepFor(MonoDelta::FromMicroseconds(100));
771
0
          continue;
772
0
        }
773
        // Endian-swap the key to match the way the insert generates keys.
774
0
        int32_t key = bswap_32(rng.Uniform(max-1));
775
0
        QLAddInt32HashValue(req, key);
776
0
        table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), i);
777
0
      }
778
779
0
      ops.push_back(op);
780
0
      session->Apply(op);
781
0
    }
782
783
0
    if (should_stop || ops.size() >= 10) {
784
0
      Status s = session->Flush();
785
0
      ASSERT_TRUE(s.ok() || s.IsBusy() || s.IsIOError());
786
0
      auto result = AnalyzeResponse(ops);
787
0
      ops.clear();
788
0
      processed += result.second;
789
0
      if (type == QLWriteRequestPB::QL_STMT_INSERT) {
790
0
        inserted_idx_.Store(processed);
791
0
        i = processed;
792
0
      }
793
0
      if (result.first) {
794
0
        ASSERT_OK(table.Open(kTableName, client_.get()));
795
0
      }
796
0
    }
797
798
0
    if (should_stop) {
799
0
      break;
800
0
    }
801
0
  }
802
803
0
  LOG(INFO) << "Processed: " << processed << " of type " << QLWriteRequestPB::QLStmtType_Name(type);
804
0
  ASSERT_GT(processed, 0);
805
0
}
806
807
// Thread which loops reading data from the table.
808
// No verification is performed.
809
0
void AlterTableTest::ScannerThread() {
810
0
  client::TableHandle table;
811
0
  ASSERT_OK(table.Open(kTableName, client_.get()));
812
0
  while (!stop_threads_.Load()) {
813
0
    int inserted_at_scanner_start = inserted_idx_.Load();
814
0
    client::TableIteratorOptions options;
815
0
    bool failed = false;
816
0
    options.error_handler = [&failed](const Status& status) {
817
0
      LOG(WARNING) << "Scan failed: " << status;
818
0
      failed = true;
819
0
    };
820
0
    size_t count = boost::size(client::TableRange(table, options));
821
0
    if (failed) {
822
0
      continue;
823
0
    }
824
825
0
    LOG(INFO) << "Scanner saw " << count << " rows";
826
    // We may have gotten more rows than we expected, because inserts
827
    // kept going while we set up the scan. But, we should never get
828
    // fewer.
829
0
    ASSERT_GE(count, inserted_at_scanner_start)
830
0
      << "We didn't get as many rows as expected";
831
0
  }
832
0
}
833
834
// Test altering a table while also sending a lot of writes,
835
// checking for races between the two.
836
0
TEST_P(AlterTableTest, TestAlterUnderWriteLoad) {
837
0
  scoped_refptr<Thread> writer;
838
0
  CHECK_OK(Thread::Create(
839
0
      "test", "inserter",
840
0
      std::bind(&AlterTableTest::WriteThread, this, QLWriteRequestPB::QL_STMT_INSERT), &writer));
841
842
0
  scoped_refptr<Thread> updater;
843
0
  CHECK_OK(Thread::Create(
844
0
      "test", "updater",
845
0
      std::bind(&AlterTableTest::WriteThread, this, QLWriteRequestPB::QL_STMT_UPDATE), &updater));
846
847
0
  scoped_refptr<Thread> scanner;
848
0
  CHECK_OK(
849
0
      Thread::Create("test", "scanner", std::bind(&AlterTableTest::ScannerThread, this), &scanner));
850
851
  // Add columns until we reach 10.
852
0
  for (int i = 2; i < 10; i++) {
853
0
    MonoDelta delay;
854
0
    if (AllowSlowTests()) {
855
      // In slow test mode, let more writes accumulate in between
856
      // alters, so that we get enough writes to cause flushes,
857
      // compactions, etc.
858
0
      delay = MonoDelta::FromSeconds(3);
859
0
    } else {
860
0
      delay = MonoDelta::FromMilliseconds(100);
861
0
    }
862
0
    SleepFor(delay);
863
864
0
    ASSERT_OK(AddNewI32Column(kTableName, strings::Substitute("c$0", i)));
865
0
  }
866
867
0
  stop_threads_.Store(true);
868
0
  writer->Join();
869
0
  updater->Join();
870
0
  scanner->Join();
871
0
}
872
873
0
TEST_P(AlterTableTest, TestInsertAfterAlterTable) {
874
0
  YBTableName kSplitTableName(YQL_DATABASE_CQL, "my_keyspace", "split-table");
875
876
  // Create a new table with 10 tablets.
877
  //
878
  // With more tablets, there's a greater chance that the TS will heartbeat
879
  // after some but not all tablets have finished altering.
880
0
  ASSERT_OK(CreateTable(kSplitTableName));
881
882
  // Add a column, and immediately try to insert a row including that
883
  // new column.
884
0
  ASSERT_OK(AddNewI32Column(kSplitTableName, "new-i32"));
885
0
  client::TableHandle table;
886
0
  ASSERT_OK(table.Open(kSplitTableName, client_.get()));
887
0
  auto insert = table.NewInsertOp();
888
0
  auto req = insert->mutable_request();
889
0
  QLAddInt32HashValue(req, 1);
890
0
  table.AddInt32ColumnValue(req, "c1", 1);
891
0
  table.AddInt32ColumnValue(req, "new-i32", 1);
892
0
  shared_ptr<YBSession> session = client_->NewSession();
893
0
  session->SetTimeout(15s);
894
0
  session->Apply(insert);
895
0
  auto flush_status = session->FlushAndGetOpsErrors();
896
0
  const auto& s = flush_status.status;
897
0
  if (!s.ok()) {
898
0
    ASSERT_EQ(1, flush_status.errors.size());
899
0
    ASSERT_OK(flush_status.errors[0]->status()); // will fail
900
0
  }
901
0
}
902
903
// Issue a bunch of alter tables in quick succession. Regression for a bug
904
// seen in an earlier implementation of "alter table" where these could
905
// conflict with each other.
906
0
TEST_P(AlterTableTest, TestMultipleAlters) {
907
0
  YBTableName kSplitTableName(YQL_DATABASE_CQL, "my_keyspace", "split-table");
908
0
  const size_t kNumNewCols = 10;
909
910
  // With more tablets, there's a greater chance that the TS will heartbeat
911
  // after some but not all tablets have finished altering.
912
0
  ASSERT_OK(CreateTable(kSplitTableName));
913
914
  // Issue a bunch of new alters without waiting for them to finish.
915
0
  for (size_t i = 0; i < kNumNewCols; i++) {
916
0
    std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kSplitTableName));
917
0
    table_alterer->AddColumn(strings::Substitute("new_col$0", i))
918
0
                 ->Type(INT32)->NotNull();
919
0
    ASSERT_OK(table_alterer->wait(false)->Alter());
920
0
  }
921
922
  // Now wait. This should block on all of them.
923
0
  ASSERT_OK(WaitAlterTableCompletion(kSplitTableName, 50));
924
925
  // All new columns should be present.
926
0
  YBSchema new_schema;
927
0
  PartitionSchema partition_schema;
928
0
  ASSERT_OK(client_->GetTableSchema(kSplitTableName, &new_schema, &partition_schema));
929
0
  ASSERT_EQ(kNumNewCols + schema_.num_columns(), new_schema.num_columns());
930
0
}
931
932
0
TEST_P(ReplicatedAlterTableTest, TestReplicatedAlter) {
933
0
  const int kNumRows = 100;
934
0
  InsertRows(0, kNumRows);
935
936
0
  LOG(INFO) << "Verifying initial pattern";
937
0
  VerifyRows(0, kNumRows, C1_MATCHES_INDEX);
938
939
0
  LOG(INFO) << "Dropping and adding back c1";
940
0
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
941
0
  ASSERT_OK(table_alterer->DropColumn("c1")->Alter());
942
943
0
  ASSERT_OK(AddNewI32Column(kTableName, "c1"));
944
945
0
  bool alter_in_progress;
946
0
  string table_id;
947
0
  ASSERT_OK(client_->IsAlterTableInProgress(kTableName, table_id, &alter_in_progress));
948
0
  ASSERT_FALSE(alter_in_progress);
949
950
0
  LOG(INFO) << "Verifying that the new default shows up";
951
0
  VerifyRows(0, kNumRows, C1_IS_DEADBEEF);
952
0
}
953
954
0
TEST_P(ReplicatedAlterTableTest, TestAlterOneTSDown) {
955
0
  const int kNumRows = 100;
956
0
  InsertRows(0, kNumRows);
957
958
0
  LOG(INFO) << "Verifying initial pattern";
959
0
  VerifyRows(0, kNumRows, C1_MATCHES_INDEX);
960
961
0
  ShutdownTS(0);
962
  // Now operating with 2 out of 3 servers.  Alter should still work because it's quorum-based.
963
964
0
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
965
0
  ASSERT_OK(table_alterer->DropColumn("c1")->Alter());
966
0
  ASSERT_OK(AddNewI32Column(kTableName, "c1"));
967
0
  ASSERT_OK(AddNewI32Column(kTableName, "new_col"));
968
969
0
  bool alter_in_progress;
970
0
  string table_id;
971
0
  ASSERT_OK(client_->IsAlterTableInProgress(kTableName, table_id, &alter_in_progress));
972
0
  ASSERT_FALSE(alter_in_progress);
973
974
0
  LOG(INFO) << "Verifying that the new default shows up";
975
0
  VerifyRows(0, kNumRows, C1_IS_DEADBEEF);
976
977
0
  RestartTabletServer(0);
978
0
}
979
980
}  // namespace yb