/Users/deen/code/yugabyte-db/src/yb/integration-tests/alter_table-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <map> |
34 | | #include <string> |
35 | | #include <utility> |
36 | | |
37 | | #include <gflags/gflags.h> |
38 | | #include <gtest/gtest.h> |
39 | | |
40 | | #include "yb/client/client-test-util.h" |
41 | | #include "yb/client/client.h" |
42 | | #include "yb/client/error.h" |
43 | | #include "yb/client/schema.h" |
44 | | #include "yb/client/session.h" |
45 | | #include "yb/client/table.h" |
46 | | #include "yb/client/table_alterer.h" |
47 | | #include "yb/client/table_creator.h" |
48 | | #include "yb/client/table_handle.h" |
49 | | #include "yb/client/yb_op.h" |
50 | | |
51 | | #include "yb/common/ql_value.h" |
52 | | #include "yb/common/schema.h" |
53 | | |
54 | | #include "yb/consensus/log.h" |
55 | | |
56 | | #include "yb/gutil/stl_util.h" |
57 | | #include "yb/gutil/strings/join.h" |
58 | | #include "yb/gutil/strings/substitute.h" |
59 | | |
60 | | #include "yb/integration-tests/mini_cluster.h" |
61 | | #include "yb/integration-tests/yb_mini_cluster_test_base.h" |
62 | | |
63 | | #include "yb/master/mini_master.h" |
64 | | #include "yb/master/sys_catalog.h" |
65 | | |
66 | | #include "yb/tablet/tablet.h" |
67 | | #include "yb/tablet/tablet_metadata.h" |
68 | | #include "yb/tablet/tablet_peer.h" |
69 | | |
70 | | #include "yb/tserver/mini_tablet_server.h" |
71 | | #include "yb/tserver/tablet_server.h" |
72 | | #include "yb/tserver/ts_tablet_manager.h" |
73 | | |
74 | | #include "yb/util/atomic.h" |
75 | | #include "yb/util/faststring.h" |
76 | | #include "yb/util/metrics.h" |
77 | | #include "yb/util/random.h" |
78 | | #include "yb/util/random_util.h" |
79 | | #include "yb/util/status_log.h" |
80 | | #include "yb/util/test_util.h" |
81 | | #include "yb/util/thread.h" |
82 | | |
83 | | using namespace std::literals; |
84 | | |
85 | | DECLARE_bool(enable_data_block_fsync); |
86 | | DECLARE_bool(enable_maintenance_manager); |
87 | | DECLARE_bool(enable_ysql); |
88 | | DECLARE_bool(flush_rocksdb_on_shutdown); |
89 | | DECLARE_int32(heartbeat_interval_ms); |
90 | | DECLARE_bool(use_hybrid_clock); |
91 | | DECLARE_int32(ht_lease_duration_ms); |
92 | | DECLARE_int32(replication_factor); |
93 | | DECLARE_int32(log_min_seconds_to_retain); |
94 | | DECLARE_int32(catalog_manager_report_batch_size); |
95 | | |
96 | | METRIC_DECLARE_counter(sys_catalog_peer_write_count); |
97 | | |
98 | | namespace yb { |
99 | | |
100 | | using client::YBClient; |
101 | | using client::YBClientBuilder; |
102 | | using client::YBColumnSchema; |
103 | | using client::YBError; |
104 | | using client::YBSchema; |
105 | | using client::YBSchemaBuilder; |
106 | | using client::YBSession; |
107 | | using client::YBTable; |
108 | | using client::YBTableAlterer; |
109 | | using client::YBTableCreator; |
110 | | using client::YBTableName; |
111 | | using client::YBTableType; |
112 | | using client::YBValue; |
113 | | using std::shared_ptr; |
114 | | using master::AlterTableRequestPB; |
115 | | using master::AlterTableResponsePB; |
116 | | using master::MiniMaster; |
117 | | using std::map; |
118 | | using std::pair; |
119 | | using std::vector; |
120 | | using tablet::TabletPeer; |
121 | | using tserver::MiniTabletServer; |
122 | | |
123 | | class AlterTableTest : public YBMiniClusterTestBase<MiniCluster>, |
124 | | public ::testing::WithParamInterface<int> { |
125 | | public: |
126 | | AlterTableTest() |
127 | | : stop_threads_(false), |
128 | 36 | inserted_idx_(0) { |
129 | | |
130 | 36 | YBSchemaBuilder b; |
131 | 36 | b.AddColumn("c0")->Type(INT32)->NotNull()->HashPrimaryKey(); |
132 | 36 | b.AddColumn("c1")->Type(INT32)->NotNull(); |
133 | 36 | CHECK_OK(b.Build(&schema_)); |
134 | | |
135 | 36 | FLAGS_enable_data_block_fsync = false; // Keep unit tests fast. |
136 | 36 | FLAGS_use_hybrid_clock = false; |
137 | 36 | FLAGS_ht_lease_duration_ms = 0; |
138 | 36 | FLAGS_enable_ysql = false; |
139 | 36 | ANNOTATE_BENIGN_RACE(&FLAGS_enable_maintenance_manager, |
140 | 36 | "safe to change at runtime"); |
141 | 36 | } |
142 | | |
143 | 36 | void SetUp() override { |
144 | | // Make heartbeats faster to speed test runtime. |
145 | 36 | FLAGS_heartbeat_interval_ms = 10; |
146 | | |
147 | 36 | FLAGS_catalog_manager_report_batch_size = GetParam(); |
148 | | |
149 | 36 | YBMiniClusterTestBase::SetUp(); |
150 | | |
151 | 36 | MiniClusterOptions opts; |
152 | 36 | opts.num_tablet_servers = num_replicas(); |
153 | 36 | FLAGS_replication_factor = num_replicas(); |
154 | 36 | cluster_.reset(new MiniCluster(opts)); |
155 | 36 | ASSERT_OK(cluster_->Start()); |
156 | 32 | ASSERT_OK(cluster_->WaitForTabletServerCount(num_replicas())); |
157 | | |
158 | 32 | client_ = CHECK_RESULT(YBClientBuilder() |
159 | 32 | .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str()) |
160 | 32 | .default_admin_operation_timeout(MonoDelta::FromSeconds(60)) |
161 | 32 | .Build()); |
162 | | |
163 | 32 | CHECK_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(), |
164 | 32 | kTableName.namespace_type())); |
165 | | |
166 | | // Add a table, make sure it reports itself. |
167 | 32 | std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
168 | 32 | CHECK_OK(table_creator->table_name(kTableName) |
169 | 32 | .schema(&schema_) |
170 | 32 | .table_type(YBTableType::YQL_TABLE_TYPE) |
171 | 32 | .num_tablets(1) |
172 | 32 | .Create()); |
173 | | |
174 | 32 | if (num_replicas() == 1) { |
175 | 0 | tablet_peer_ = LookupTabletPeer(); |
176 | 0 | } |
177 | 32 | LOG(INFO) << "Tablet successfully located"; |
178 | 32 | } |
179 | | |
180 | 16 | void DoTearDown() override { |
181 | 16 | client_.reset(); |
182 | 16 | tablet_peer_.reset(); |
183 | 16 | cluster_->Shutdown(); |
184 | 16 | } |
185 | | |
186 | 0 | std::shared_ptr<TabletPeer> LookupTabletPeer() { |
187 | 0 | auto peers = cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletPeers(); |
188 | 0 | return peers[0]; |
189 | 0 | } |
190 | | |
191 | 0 | void ShutdownTS(int idx = 0) { |
192 | | // Drop the tablet_peer_ reference since the tablet peer becomes invalid once |
193 | | // we shut down the server. Additionally, if we hold onto the reference, |
194 | | // we'll end up calling the destructor from the test code instead of the |
195 | | // normal location, which can cause crashes, etc. |
196 | 0 | tablet_peer_.reset(); |
197 | 0 | if (cluster_->mini_tablet_server(idx)->server() != nullptr) { |
198 | 0 | cluster_->mini_tablet_server(idx)->Shutdown(); |
199 | 0 | } |
200 | 0 | } |
201 | | |
202 | 0 | void RestartTabletServer(int idx = 0) { |
203 | 0 | tablet_peer_.reset(); |
204 | 0 | if (cluster_->mini_tablet_server(idx)->server()) { |
205 | 0 | ASSERT_OK(cluster_->mini_tablet_server(idx)->Restart()); |
206 | 0 | } else { |
207 | 0 | ASSERT_OK(cluster_->mini_tablet_server(idx)->Start()); |
208 | 0 | } |
209 | |
|
210 | 0 | ASSERT_OK(cluster_->mini_tablet_server(idx)->WaitStarted()); |
211 | 0 | if (idx == 0) { |
212 | 0 | tablet_peer_ = LookupTabletPeer(); |
213 | 0 | } |
214 | 0 | } |
215 | | |
216 | 0 | Status WaitAlterTableCompletion(const YBTableName& table_name, int attempts) { |
217 | 0 | int wait_time = 1000; |
218 | 0 | for (int i = 0; i < attempts; ++i) { |
219 | 0 | bool in_progress; |
220 | 0 | string table_id; |
221 | 0 | RETURN_NOT_OK(client_->IsAlterTableInProgress(table_name, table_id, &in_progress)); |
222 | 0 | if (!in_progress) { |
223 | 0 | return Status::OK(); |
224 | 0 | } |
225 | | |
226 | 0 | SleepFor(MonoDelta::FromMicroseconds(wait_time)); |
227 | 0 | wait_time = std::min(wait_time * 5 / 4, 1000000); |
228 | 0 | } |
229 | |
|
230 | 0 | return STATUS(TimedOut, "AlterTable not completed within the timeout"); |
231 | 0 | } |
232 | | |
233 | | Status AddNewI32Column(const YBTableName& table_name, |
234 | 0 | const string& column_name) { |
235 | 0 | return AddNewI32Column(table_name, column_name, MonoDelta::FromSeconds(60)); |
236 | 0 | } |
237 | | |
238 | | Status AddNewI32Column(const YBTableName& table_name, |
239 | | const string& column_name, |
240 | 0 | const MonoDelta& timeout) { |
241 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(table_name)); |
242 | 0 | table_alterer->AddColumn(column_name)->Type(INT32)->NotNull(); |
243 | 0 | return table_alterer->timeout(timeout)->Alter(); |
244 | 0 | } |
245 | | |
246 | | enum VerifyPattern { |
247 | | C1_MATCHES_INDEX, |
248 | | C1_IS_DEADBEEF, |
249 | | C1_DOESNT_EXIST |
250 | | }; |
251 | | |
252 | | void VerifyRows(int start_row, int num_rows, VerifyPattern pattern); |
253 | | |
254 | | void InsertRows(int start_row, int num_rows); |
255 | | |
256 | | void UpdateRow(int32_t row_key, const map<string, int32_t>& updates); |
257 | | |
258 | | std::vector<std::string> ScanToStrings(); |
259 | | |
260 | | void WriteThread(QLWriteRequestPB::QLStmtType type); |
261 | | void ScannerThread(); |
262 | | |
263 | 0 | Status CreateTable(const YBTableName& table_name) { |
264 | 0 | RETURN_NOT_OK(client_->CreateNamespaceIfNotExists(table_name.namespace_name(), |
265 | 0 | table_name.namespace_type())); |
266 | |
|
267 | 0 | std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
268 | 0 | return table_creator->table_name(table_name) |
269 | 0 | .schema(&schema_) |
270 | 0 | .num_tablets(10) |
271 | 0 | .Create(); |
272 | 0 | } |
273 | | |
274 | 0 | int64_t GetSysCatalogWrites() { |
275 | 0 | auto GetSysCatalogMetric = [&](CounterPrototype& prototype) -> int64_t { |
276 | 0 | auto metrics = cluster_->mini_master()->sys_catalog().GetMetricEntity(); |
277 | 0 | return prototype.Instantiate(metrics)->value(); |
278 | 0 | }; |
279 | 0 | return GetSysCatalogMetric(METRIC_sys_catalog_peer_write_count); |
280 | 0 | } |
281 | | |
282 | | protected: |
283 | 96 | virtual int num_replicas() const { return 1; } |
284 | | |
285 | | static const YBTableName kTableName; |
286 | | |
287 | | std::unique_ptr<YBClient> client_; |
288 | | |
289 | | YBSchema schema_; |
290 | | |
291 | | std::shared_ptr<TabletPeer> tablet_peer_; |
292 | | |
293 | | AtomicBool stop_threads_; |
294 | | |
295 | | // The index of the last row inserted by InserterThread. |
296 | | // UpdaterThread uses this to figure out which rows can be |
297 | | // safely updated. |
298 | | AtomicInt<int32_t> inserted_idx_; |
299 | | }; |
300 | | |
301 | | // Subclass which creates three servers and a replicated cluster. |
302 | | class ReplicatedAlterTableTest : public AlterTableTest { |
303 | | protected: |
304 | 8 | virtual int num_replicas() const override { return 3; } |
305 | | }; |
306 | | |
307 | | const YBTableName AlterTableTest::kTableName(YQL_DATABASE_CQL, "my_keyspace", "fake-table"); |
308 | | |
309 | | INSTANTIATE_TEST_CASE_P(BatchSize, AlterTableTest, ::testing::Values(1, 10)); |
310 | | INSTANTIATE_TEST_CASE_P(BatchSize, ReplicatedAlterTableTest, ::testing::Values(1, 10)); |
311 | | |
312 | | // Simple test to verify that the "alter table" command sent and executed |
313 | | // on the TS handling the tablet of the altered table. |
314 | | // TODO: create and verify multiple tablets when the client will support that. |
315 | 0 | TEST_P(AlterTableTest, TestTabletReports) { |
316 | 0 | ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version()); |
317 | 0 | ASSERT_OK(AddNewI32Column(kTableName, "new-i32")); |
318 | 0 | ASSERT_EQ(1, tablet_peer_->tablet()->metadata()->schema_version()); |
319 | 0 | } |
320 | | |
321 | | // Verify that adding an existing column will return an "already present" error |
322 | 0 | TEST_P(AlterTableTest, TestAddExistingColumn) { |
323 | 0 | ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version()); |
324 | |
|
325 | 0 | { |
326 | 0 | Status s = AddNewI32Column(kTableName, "c1"); |
327 | 0 | ASSERT_TRUE(s.IsAlreadyPresent()); |
328 | 0 | ASSERT_STR_CONTAINS(s.ToString(), "The column already exists: c1"); |
329 | 0 | } |
330 | |
|
331 | 0 | ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version()); |
332 | 0 | } |
333 | | |
334 | | // Adding a nullable column with no default value should be equivalent |
335 | | // to a NULL default. |
336 | 0 | TEST_P(AlterTableTest, TestAddNullableColumnWithoutDefault) { |
337 | 0 | InsertRows(0, 1); |
338 | 0 | ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync)); |
339 | |
|
340 | 0 | { |
341 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
342 | 0 | table_alterer->AddColumn("new")->Type(INT32); |
343 | 0 | ASSERT_OK(table_alterer->Alter()); |
344 | 0 | } |
345 | |
|
346 | 0 | InsertRows(1, 1); |
347 | |
|
348 | 0 | vector<string> rows = ScanToStrings(); |
349 | 0 | EXPECT_EQ(2, rows.size()); |
350 | 0 | EXPECT_EQ("{ int32:0, int32:0, null }", rows[0]); |
351 | 0 | EXPECT_EQ("{ int32:16777216, int32:1, null }", rows[1]); |
352 | 0 | } |
353 | | |
354 | | // Verify that, if a tablet server is down when an alter command is issued, |
355 | | // it will eventually receive the command when it restarts. |
356 | 0 | TEST_P(AlterTableTest, TestAlterOnTSRestart) { |
357 | 0 | ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version()); |
358 | |
|
359 | 0 | ShutdownTS(); |
360 | | |
361 | | // Send the Alter request |
362 | 0 | { |
363 | 0 | Status s = AddNewI32Column(kTableName, "new-32", MonoDelta::FromMilliseconds(500)); |
364 | 0 | ASSERT_TRUE(s.IsTimedOut()); |
365 | 0 | } |
366 | | |
367 | | // Verify that the Schema is the old one |
368 | 0 | YBSchema schema; |
369 | 0 | PartitionSchema partition_schema; |
370 | 0 | bool alter_in_progress = false; |
371 | 0 | string table_id; |
372 | 0 | ASSERT_OK(client_->GetTableSchema(kTableName, &schema, &partition_schema)); |
373 | 0 | ASSERT_TRUE(schema_.Equals(schema)); |
374 | 0 | ASSERT_OK(client_->IsAlterTableInProgress(kTableName, table_id, &alter_in_progress)); |
375 | 0 | ASSERT_TRUE(alter_in_progress); |
376 | | |
377 | | // Restart the TS and wait for the new schema |
378 | 0 | RestartTabletServer(); |
379 | 0 | ASSERT_OK(WaitAlterTableCompletion(kTableName, 50)); |
380 | 0 | ASSERT_EQ(1, tablet_peer_->tablet()->metadata()->schema_version()); |
381 | 0 | } |
382 | | |
383 | | // Verify that nothing is left behind on cluster shutdown with pending async tasks |
384 | 0 | TEST_P(AlterTableTest, TestShutdownWithPendingTasks) { |
385 | 0 | DontVerifyClusterBeforeNextTearDown(); |
386 | 0 | ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version()); |
387 | |
|
388 | 0 | ShutdownTS(); |
389 | | |
390 | | // Send the Alter request |
391 | 0 | { |
392 | 0 | Status s = AddNewI32Column(kTableName, "new-i32", MonoDelta::FromMilliseconds(500)); |
393 | 0 | ASSERT_TRUE(s.IsTimedOut()); |
394 | 0 | } |
395 | 0 | } |
396 | | |
397 | | // Verify that the new schema is applied/reported even when |
398 | | // the TS is going down with the alter operation in progress. |
399 | | // On TS restart the master should: |
400 | | // - get the new schema state, and mark the alter as complete |
401 | | // - get the old schema state, and ask the TS again to perform the alter. |
402 | 0 | TEST_P(AlterTableTest, TestRestartTSDuringAlter) { |
403 | 0 | if (!AllowSlowTests()) { |
404 | 0 | LOG(INFO) << "Skipping slow test"; |
405 | 0 | return; |
406 | 0 | } |
407 | | |
408 | 0 | ASSERT_EQ(0, tablet_peer_->tablet()->metadata()->schema_version()); |
409 | |
|
410 | 0 | Status s = AddNewI32Column(kTableName, "new-i32", MonoDelta::FromMilliseconds(1)); |
411 | 0 | ASSERT_TRUE(s.IsTimedOut()); |
412 | | |
413 | | // Restart the TS while alter is running |
414 | 0 | for (int i = 0; i < 3; i++) { |
415 | 0 | SleepFor(MonoDelta::FromMicroseconds(500)); |
416 | 0 | RestartTabletServer(); |
417 | 0 | } |
418 | | |
419 | | // Wait for the new schema |
420 | 0 | ASSERT_OK(WaitAlterTableCompletion(kTableName, 50)); |
421 | 0 | ASSERT_EQ(1, tablet_peer_->tablet()->metadata()->schema_version()); |
422 | 0 | } |
423 | | |
424 | 0 | TEST_P(AlterTableTest, TestGetSchemaAfterAlterTable) { |
425 | 0 | ASSERT_OK(AddNewI32Column(kTableName, "new-i32")); |
426 | |
|
427 | 0 | YBSchema s; |
428 | 0 | PartitionSchema partition_schema; |
429 | 0 | ASSERT_OK(client_->GetTableSchema(kTableName, &s, &partition_schema)); |
430 | 0 | } |
431 | | |
432 | 0 | void AlterTableTest::InsertRows(int start_row, int num_rows) { |
433 | 0 | shared_ptr<YBSession> session = client_->NewSession(); |
434 | 0 | session->SetTimeout(15s); |
435 | 0 | client::TableHandle table; |
436 | 0 | ASSERT_OK(table.Open(kTableName, client_.get())); |
437 | 0 | std::vector<std::shared_ptr<client::YBqlOp>> ops; |
438 | | |
439 | | // Insert a bunch of rows with the current schema |
440 | 0 | for (int i = start_row; i < start_row + num_rows; i++) { |
441 | 0 | auto op = table.NewInsertOp(); |
442 | | // Endian-swap the key so that we spew inserts randomly |
443 | | // instead of just a sequential write pattern. This way |
444 | | // compactions may actually be triggered. |
445 | 0 | int32_t key = bswap_32(i); |
446 | 0 | auto req = op->mutable_request(); |
447 | 0 | QLAddInt32HashValue(req, key); |
448 | |
|
449 | 0 | if (table.schema().num_columns() > 1) { |
450 | 0 | table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), i); |
451 | 0 | } |
452 | |
|
453 | 0 | ops.push_back(op); |
454 | 0 | session->Apply(op); |
455 | |
|
456 | 0 | if (ops.size() >= 50) { |
457 | 0 | FlushSessionOrDie(session, ops); |
458 | 0 | ops.clear(); |
459 | 0 | } |
460 | 0 | } |
461 | |
|
462 | 0 | FlushSessionOrDie(session, ops); |
463 | 0 | } |
464 | | |
465 | | void AlterTableTest::UpdateRow(int32_t row_key, |
466 | 0 | const map<string, int32_t>& updates) { |
467 | 0 | shared_ptr<YBSession> session = client_->NewSession(); |
468 | 0 | session->SetTimeout(15s); |
469 | |
|
470 | 0 | client::TableHandle table; |
471 | 0 | ASSERT_OK(table.Open(kTableName, client_.get())); |
472 | |
|
473 | 0 | auto update = table.NewUpdateOp(); |
474 | 0 | int32_t key = bswap_32(row_key); // endian swap to match 'InsertRows' |
475 | 0 | QLAddInt32HashValue(update->mutable_request(), key); |
476 | 0 | for (const auto& e : updates) { |
477 | 0 | table.AddInt32ColumnValue(update->mutable_request(), e.first, e.second); |
478 | 0 | } |
479 | 0 | session->Apply(update); |
480 | 0 | FlushSessionOrDie(session); |
481 | 0 | } |
482 | | |
483 | 0 | std::vector<string> AlterTableTest::ScanToStrings() { |
484 | 0 | client::TableHandle table; |
485 | 0 | EXPECT_OK(table.Open(kTableName, client_.get())); |
486 | 0 | auto result = ScanTableToStrings(table); |
487 | 0 | std::sort(result.begin(), result.end()); |
488 | 0 | return result; |
489 | 0 | } |
490 | | |
491 | | // Verify that the 'num_rows' starting with 'start_row' fit the given pattern. |
492 | | // Note that the 'start_row' here is not a row key, but the pre-transformation row |
493 | | // key (InsertRows swaps endianness so that we random-write instead of sequential-write) |
494 | 0 | void AlterTableTest::VerifyRows(int start_row, int num_rows, VerifyPattern pattern) { |
495 | 0 | client::TableHandle table; |
496 | 0 | ASSERT_OK(table.Open(kTableName, client_.get())); |
497 | |
|
498 | 0 | int verified = 0; |
499 | 0 | for (const auto& row : client::TableRange(table)) { |
500 | 0 | int32_t key = row.column(0).int32_value(); |
501 | 0 | int32_t row_idx = bswap_32(key); |
502 | 0 | if (row_idx < start_row || row_idx >= start_row + num_rows) { |
503 | | // Outside the range we're verifying |
504 | 0 | continue; |
505 | 0 | } |
506 | 0 | verified++; |
507 | |
|
508 | 0 | switch (pattern) { |
509 | 0 | case C1_MATCHES_INDEX: |
510 | 0 | ASSERT_EQ(row_idx, row.column(1).int32_value()); |
511 | 0 | break; |
512 | 0 | case C1_IS_DEADBEEF: |
513 | 0 | ASSERT_TRUE(row.column(1).IsNull()); |
514 | 0 | break; |
515 | 0 | case C1_DOESNT_EXIST: |
516 | 0 | continue; |
517 | 0 | default: |
518 | 0 | ASSERT_TRUE(false) << "Invalid pattern: " << pattern; |
519 | 0 | break; |
520 | 0 | } |
521 | 0 | } |
522 | 0 | ASSERT_EQ(verified, num_rows); |
523 | 0 | } |
524 | | |
525 | | // Test inserting/updating some data, dropping a column, and adding a new one |
526 | | // with the same name. Data should not "reappear" from the old column. |
527 | | // |
528 | | // This is a regression test for KUDU-461. |
529 | 0 | TEST_P(AlterTableTest, TestDropAndAddNewColumn) { |
530 | | // Reduce flush threshold so that we get both on-disk data |
531 | | // for the alter as well as in-MRS data. |
532 | | // This also increases chances of a race. |
533 | 0 | const int kNumRows = AllowSlowTests() ? 100000 : 1000; |
534 | 0 | InsertRows(0, kNumRows); |
535 | |
|
536 | 0 | LOG(INFO) << "Verifying initial pattern"; |
537 | 0 | VerifyRows(0, kNumRows, C1_MATCHES_INDEX); |
538 | |
|
539 | 0 | LOG(INFO) << "Dropping and adding back c1"; |
540 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
541 | 0 | ASSERT_OK(table_alterer->DropColumn("c1")->Alter()); |
542 | |
|
543 | 0 | ASSERT_OK(AddNewI32Column(kTableName, "c1")); |
544 | |
|
545 | 0 | LOG(INFO) << "Verifying that the new default shows up"; |
546 | 0 | VerifyRows(0, kNumRows, C1_IS_DEADBEEF); |
547 | 0 | } |
548 | | |
549 | 0 | TEST_P(AlterTableTest, DISABLED_TestCompactionAfterDrop) { |
550 | 0 | LOG(INFO) << "Inserting rows"; |
551 | 0 | InsertRows(0, 3); |
552 | |
|
553 | 0 | std::string docdb_dump = tablet_peer_->tablet()->TEST_DocDBDumpStr(); |
554 | | // DocDB should not be empty right now. |
555 | 0 | ASSERT_NE(0, docdb_dump.length()); |
556 | |
|
557 | 0 | LOG(INFO) << "Dropping c1"; |
558 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
559 | 0 | ASSERT_OK(table_alterer->DropColumn("c1")->Alter()); |
560 | |
|
561 | 0 | LOG(INFO) << "Forcing compaction"; |
562 | 0 | tablet_peer_->tablet()->ForceRocksDBCompactInTest(); |
563 | |
|
564 | 0 | docdb_dump = tablet_peer_->tablet()->TEST_DocDBDumpStr(); |
565 | |
|
566 | 0 | LOG(INFO) << "Checking that docdb is empty"; |
567 | 0 | ASSERT_EQ("", docdb_dump); |
568 | |
|
569 | 0 | ASSERT_OK(cluster_->RestartSync()); |
570 | 0 | tablet_peer_ = LookupTabletPeer(); |
571 | 0 | } |
572 | | |
573 | | // This tests the scenario where the log entries immediately after last RocksDB flush are for a |
574 | | // different schema than the one that was last flushed to the superblock. |
575 | 0 | TEST_P(AlterTableTest, TestLogSchemaReplay) { |
576 | 0 | ASSERT_OK(AddNewI32Column(kTableName, "c2")); |
577 | 0 | InsertRows(0, 2); |
578 | 0 | UpdateRow(1, { {"c1", 0} }); |
579 | |
|
580 | 0 | LOG(INFO) << "Flushing RocksDB"; |
581 | 0 | ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync)); |
582 | |
|
583 | 0 | UpdateRow(0, { {"c1", 1}, {"c2", 10001} }); |
584 | |
|
585 | 0 | LOG(INFO) << "Dropping c1"; |
586 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
587 | 0 | ASSERT_OK(table_alterer->DropColumn("c1")->Alter()); |
588 | |
|
589 | 0 | UpdateRow(1, { {"c2", 10002} }); |
590 | |
|
591 | 0 | auto rows = ScanToStrings(); |
592 | 0 | ASSERT_EQ(2, rows.size()); |
593 | 0 | ASSERT_EQ("{ int32:0, int32:10001 }", rows[0]); |
594 | 0 | ASSERT_EQ("{ int32:16777216, int32:10002 }", rows[1]); |
595 | |
|
596 | 0 | google::FlagSaver flag_saver; |
597 | | // Restart without flushing RocksDB |
598 | 0 | FLAGS_flush_rocksdb_on_shutdown = false; |
599 | 0 | LOG(INFO) << "Restarting tablet"; |
600 | 0 | ASSERT_NO_FATALS(RestartTabletServer()); |
601 | |
|
602 | 0 | rows = ScanToStrings(); |
603 | 0 | ASSERT_EQ(2, rows.size()); |
604 | 0 | ASSERT_EQ("{ int32:0, int32:10001 }", rows[0]); |
605 | 0 | ASSERT_EQ("{ int32:16777216, int32:10002 }", rows[1]); |
606 | 0 | } |
607 | | |
608 | | // Tests that a renamed table can still be altered. This is a regression test, we used to not carry |
609 | | // over column ids after a table rename. |
610 | 0 | TEST_P(AlterTableTest, TestRenameTableAndAdd) { |
611 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
612 | 0 | YBTableName new_name(kTableName.namespace_type(), kTableName.namespace_name(), "someothername"); |
613 | 0 | ASSERT_OK(table_alterer->RenameTo(new_name) |
614 | 0 | ->Alter()); |
615 | |
|
616 | 0 | ASSERT_OK(AddNewI32Column(new_name, "new")); |
617 | 0 | } |
618 | | |
619 | | // Test restarting a tablet server several times after various |
620 | | // schema changes. |
621 | | // This is a regression test for KUDU-462. |
622 | | TEST_P(AlterTableTest, TestBootstrapAfterAlters) { |
623 | | ASSERT_OK(AddNewI32Column(kTableName, "c2")); |
624 | | InsertRows(0, 1); |
625 | | ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync)); |
626 | | InsertRows(1, 1); |
627 | | |
628 | | UpdateRow(0, { {"c1", 10001} }); |
629 | | UpdateRow(1, { {"c1", 10002} }); |
630 | | |
631 | | auto rows = ScanToStrings(); |
632 | | ASSERT_EQ(2, rows.size()); |
633 | | ASSERT_EQ("{ int32:0, int32:10001, null }", rows[0]); |
634 | | ASSERT_EQ("{ int32:16777216, int32:10002, null }", rows[1]); |
635 | | |
636 | | LOG(INFO) << "Dropping c1"; |
637 | | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
638 | | ASSERT_OK(table_alterer->DropColumn("c1")->Alter()); |
639 | | |
640 | | rows = ScanToStrings(); |
641 | | ASSERT_EQ(2, rows.size()); |
642 | | ASSERT_EQ("{ int32:0, null }", rows[0]); |
643 | | ASSERT_EQ("{ int32:16777216, null }", rows[1]); |
644 | | |
645 | | // Test that restart doesn't fail when trying to replay updates or inserts |
646 | | // with the dropped column. |
647 | | ASSERT_NO_FATALS(RestartTabletServer()); |
648 | | |
649 | | rows = ScanToStrings(); |
650 | | ASSERT_EQ(2, rows.size()); |
651 | | ASSERT_EQ("{ int32:0, null }", rows[0]); |
652 | | ASSERT_EQ("{ int32:16777216, null }", rows[1]); |
653 | | |
654 | | // Add back a column called 'c2', but should not materialize old data. |
655 | | ASSERT_OK(AddNewI32Column(kTableName, "c1")); |
656 | | rows = ScanToStrings(); |
657 | | ASSERT_EQ(2, rows.size()); |
658 | | ASSERT_EQ("{ int32:0, null, null }", rows[0]); |
659 | | ASSERT_EQ("{ int32:16777216, null, null }", rows[1]); |
660 | | |
661 | | ASSERT_NO_FATALS(RestartTabletServer()); |
662 | | rows = ScanToStrings(); |
663 | | ASSERT_EQ(2, rows.size()); |
664 | | ASSERT_EQ("{ int32:0, null, null }", rows[0]); |
665 | | ASSERT_EQ("{ int32:16777216, null, null }", rows[1]); |
666 | | } |
667 | | |
668 | 0 | TEST_P(AlterTableTest, TestAlterWalRetentionSecs) { |
669 | 0 | InsertRows(1, 1000); |
670 | 0 | int kWalRetentionSecs = RandomUniformBool() |
671 | 0 | ? FLAGS_log_min_seconds_to_retain / 2 |
672 | 0 | : FLAGS_log_min_seconds_to_retain * 2; |
673 | |
|
674 | 0 | LOG(INFO) << "Modifying wal retention time to " << kWalRetentionSecs; |
675 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
676 | |
|
677 | 0 | ASSERT_OK(table_alterer->SetWalRetentionSecs(kWalRetentionSecs)->Alter()); |
678 | |
|
679 | 0 | int expected_wal_retention_secs = max(FLAGS_log_min_seconds_to_retain, kWalRetentionSecs); |
680 | |
|
681 | 0 | ASSERT_EQ(kWalRetentionSecs, tablet_peer_->tablet()->metadata()->wal_retention_secs()); |
682 | 0 | ASSERT_EQ(expected_wal_retention_secs, tablet_peer_->log()->wal_retention_secs()); |
683 | | |
684 | | // Test that the wal retention time gets set correctly in the metadata and in the log objects. |
685 | 0 | ASSERT_NO_FATALS(RestartTabletServer()); |
686 | |
|
687 | 0 | ASSERT_EQ(kWalRetentionSecs, tablet_peer_->tablet()->metadata()->wal_retention_secs()); |
688 | 0 | ASSERT_EQ(expected_wal_retention_secs, tablet_peer_->log()->wal_retention_secs()); |
689 | 0 | } |
690 | | |
691 | 0 | TEST_P(AlterTableTest, TestCompactAfterUpdatingRemovedColumn) { |
692 | | // Disable maintenance manager, since we manually flush/compact |
693 | | // in this test. |
694 | 0 | FLAGS_enable_maintenance_manager = false; |
695 | |
|
696 | 0 | vector<string> rows; |
697 | |
|
698 | 0 | ASSERT_OK(AddNewI32Column(kTableName, "c2")); |
699 | 0 | InsertRows(0, 1); |
700 | 0 | ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync)); |
701 | 0 | InsertRows(1, 1); |
702 | 0 | ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync)); |
703 | | |
704 | |
|
705 | 0 | rows = ScanToStrings(); |
706 | 0 | ASSERT_EQ(2, rows.size()); |
707 | 0 | ASSERT_EQ("{ int32:0, int32:0, null }", rows[0]); |
708 | 0 | ASSERT_EQ("{ int32:16777216, int32:1, null }", rows[1]); |
709 | | |
710 | | // Add a delta for c1. |
711 | 0 | UpdateRow(0, { {"c1", 54321} }); |
712 | | |
713 | | // Drop c1. |
714 | 0 | LOG(INFO) << "Dropping c1"; |
715 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
716 | 0 | ASSERT_OK(table_alterer->DropColumn("c1")->Alter()); |
717 | |
|
718 | 0 | rows = ScanToStrings(); |
719 | 0 | ASSERT_EQ(2, rows.size()); |
720 | 0 | ASSERT_EQ("{ int32:0, null }", rows[0]); |
721 | 0 | } |
722 | | |
723 | | typedef std::vector<std::shared_ptr<client::YBqlOp>> Ops; |
724 | | |
725 | 0 | std::pair<bool, int> AnalyzeResponse(const Ops& ops) { |
726 | 0 | std::pair<bool, int> result = { false, 0 }; |
727 | 0 | for (const auto& op : ops) { |
728 | 0 | if (op->response().status() == QLResponsePB::YQL_STATUS_OK) { |
729 | 0 | ++result.second; |
730 | 0 | } else { |
731 | 0 | if (QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH == op->response().status()) { |
732 | 0 | result.first = true; |
733 | 0 | } |
734 | 0 | } |
735 | 0 | } |
736 | 0 | return result; |
737 | 0 | } |
738 | | |
739 | | // Thread which inserts rows into the table. |
740 | | // After each batch of rows is inserted, inserted_idx_ is updated |
741 | | // to communicate how much data has been written (and should now be |
742 | | // updateable) |
743 | 0 | void AlterTableTest::WriteThread(QLWriteRequestPB::QLStmtType type) { |
744 | 0 | shared_ptr<YBSession> session = client_->NewSession(); |
745 | 0 | session->SetTimeout(15s); |
746 | |
|
747 | 0 | client::TableHandle table; |
748 | 0 | ASSERT_OK(table.Open(kTableName, client_.get())); |
749 | 0 | Ops ops; |
750 | 0 | int32_t processed = 0; |
751 | 0 | int32_t i = 0; |
752 | 0 | Random rng(1); |
753 | 0 | for (;;) { |
754 | 0 | bool should_stop = stop_threads_.Load(); |
755 | 0 | if (!should_stop) { |
756 | 0 | auto op = table.NewWriteOp(type); |
757 | 0 | auto req = op->mutable_request(); |
758 | | // Endian-swap the key so that we spew inserts randomly |
759 | | // instead of just a sequential write pattern. This way |
760 | | // compactions may actually be triggered. |
761 | |
|
762 | 0 | if (type == QLWriteRequestPB::QL_STMT_INSERT) { |
763 | 0 | int32_t key = bswap_32(i++); |
764 | 0 | QLAddInt32HashValue(req, key); |
765 | 0 | table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), i); |
766 | 0 | } else { |
767 | 0 | int32_t max = inserted_idx_.Load(); |
768 | 0 | if (max == 0) { |
769 | | // Inserter hasn't inserted anything yet, so we have nothing to update. |
770 | 0 | SleepFor(MonoDelta::FromMicroseconds(100)); |
771 | 0 | continue; |
772 | 0 | } |
773 | | // Endian-swap the key to match the way the insert generates keys. |
774 | 0 | int32_t key = bswap_32(rng.Uniform(max-1)); |
775 | 0 | QLAddInt32HashValue(req, key); |
776 | 0 | table.AddInt32ColumnValue(req, table.schema().columns()[1].name(), i); |
777 | 0 | } |
778 | |
|
779 | 0 | ops.push_back(op); |
780 | 0 | session->Apply(op); |
781 | 0 | } |
782 | |
|
783 | 0 | if (should_stop || ops.size() >= 10) { |
784 | 0 | Status s = session->Flush(); |
785 | 0 | ASSERT_TRUE(s.ok() || s.IsBusy() || s.IsIOError()); |
786 | 0 | auto result = AnalyzeResponse(ops); |
787 | 0 | ops.clear(); |
788 | 0 | processed += result.second; |
789 | 0 | if (type == QLWriteRequestPB::QL_STMT_INSERT) { |
790 | 0 | inserted_idx_.Store(processed); |
791 | 0 | i = processed; |
792 | 0 | } |
793 | 0 | if (result.first) { |
794 | 0 | ASSERT_OK(table.Open(kTableName, client_.get())); |
795 | 0 | } |
796 | 0 | } |
797 | |
|
798 | 0 | if (should_stop) { |
799 | 0 | break; |
800 | 0 | } |
801 | 0 | } |
802 | |
|
803 | 0 | LOG(INFO) << "Processed: " << processed << " of type " << QLWriteRequestPB::QLStmtType_Name(type); |
804 | 0 | ASSERT_GT(processed, 0); |
805 | 0 | } |
806 | | |
807 | | // Thread which loops reading data from the table. |
808 | | // No verification is performed. |
809 | 0 | void AlterTableTest::ScannerThread() { |
810 | 0 | client::TableHandle table; |
811 | 0 | ASSERT_OK(table.Open(kTableName, client_.get())); |
812 | 0 | while (!stop_threads_.Load()) { |
813 | 0 | int inserted_at_scanner_start = inserted_idx_.Load(); |
814 | 0 | client::TableIteratorOptions options; |
815 | 0 | bool failed = false; |
816 | 0 | options.error_handler = [&failed](const Status& status) { |
817 | 0 | LOG(WARNING) << "Scan failed: " << status; |
818 | 0 | failed = true; |
819 | 0 | }; |
820 | 0 | size_t count = boost::size(client::TableRange(table, options)); |
821 | 0 | if (failed) { |
822 | 0 | continue; |
823 | 0 | } |
824 | | |
825 | 0 | LOG(INFO) << "Scanner saw " << count << " rows"; |
826 | | // We may have gotten more rows than we expected, because inserts |
827 | | // kept going while we set up the scan. But, we should never get |
828 | | // fewer. |
829 | 0 | ASSERT_GE(count, inserted_at_scanner_start) |
830 | 0 | << "We didn't get as many rows as expected"; |
831 | 0 | } |
832 | 0 | } |
833 | | |
834 | | // Test altering a table while also sending a lot of writes, |
835 | | // checking for races between the two. |
836 | 0 | TEST_P(AlterTableTest, TestAlterUnderWriteLoad) { |
837 | 0 | scoped_refptr<Thread> writer; |
838 | 0 | CHECK_OK(Thread::Create( |
839 | 0 | "test", "inserter", |
840 | 0 | std::bind(&AlterTableTest::WriteThread, this, QLWriteRequestPB::QL_STMT_INSERT), &writer)); |
841 | |
|
842 | 0 | scoped_refptr<Thread> updater; |
843 | 0 | CHECK_OK(Thread::Create( |
844 | 0 | "test", "updater", |
845 | 0 | std::bind(&AlterTableTest::WriteThread, this, QLWriteRequestPB::QL_STMT_UPDATE), &updater)); |
846 | |
|
847 | 0 | scoped_refptr<Thread> scanner; |
848 | 0 | CHECK_OK( |
849 | 0 | Thread::Create("test", "scanner", std::bind(&AlterTableTest::ScannerThread, this), &scanner)); |
850 | | |
851 | | // Add columns until we reach 10. |
852 | 0 | for (int i = 2; i < 10; i++) { |
853 | 0 | MonoDelta delay; |
854 | 0 | if (AllowSlowTests()) { |
855 | | // In slow test mode, let more writes accumulate in between |
856 | | // alters, so that we get enough writes to cause flushes, |
857 | | // compactions, etc. |
858 | 0 | delay = MonoDelta::FromSeconds(3); |
859 | 0 | } else { |
860 | 0 | delay = MonoDelta::FromMilliseconds(100); |
861 | 0 | } |
862 | 0 | SleepFor(delay); |
863 | |
|
864 | 0 | ASSERT_OK(AddNewI32Column(kTableName, strings::Substitute("c$0", i))); |
865 | 0 | } |
866 | |
|
867 | 0 | stop_threads_.Store(true); |
868 | 0 | writer->Join(); |
869 | 0 | updater->Join(); |
870 | 0 | scanner->Join(); |
871 | 0 | } |
872 | | |
873 | 0 | TEST_P(AlterTableTest, TestInsertAfterAlterTable) { |
874 | 0 | YBTableName kSplitTableName(YQL_DATABASE_CQL, "my_keyspace", "split-table"); |
875 | | |
876 | | // Create a new table with 10 tablets. |
877 | | // |
878 | | // With more tablets, there's a greater chance that the TS will heartbeat |
879 | | // after some but not all tablets have finished altering. |
880 | 0 | ASSERT_OK(CreateTable(kSplitTableName)); |
881 | | |
882 | | // Add a column, and immediately try to insert a row including that |
883 | | // new column. |
884 | 0 | ASSERT_OK(AddNewI32Column(kSplitTableName, "new-i32")); |
885 | 0 | client::TableHandle table; |
886 | 0 | ASSERT_OK(table.Open(kSplitTableName, client_.get())); |
887 | 0 | auto insert = table.NewInsertOp(); |
888 | 0 | auto req = insert->mutable_request(); |
889 | 0 | QLAddInt32HashValue(req, 1); |
890 | 0 | table.AddInt32ColumnValue(req, "c1", 1); |
891 | 0 | table.AddInt32ColumnValue(req, "new-i32", 1); |
892 | 0 | shared_ptr<YBSession> session = client_->NewSession(); |
893 | 0 | session->SetTimeout(15s); |
894 | 0 | session->Apply(insert); |
895 | 0 | auto flush_status = session->FlushAndGetOpsErrors(); |
896 | 0 | const auto& s = flush_status.status; |
897 | 0 | if (!s.ok()) { |
898 | 0 | ASSERT_EQ(1, flush_status.errors.size()); |
899 | 0 | ASSERT_OK(flush_status.errors[0]->status()); // will fail |
900 | 0 | } |
901 | 0 | } |
902 | | |
903 | | // Issue a bunch of alter tables in quick succession. Regression for a bug |
904 | | // seen in an earlier implementation of "alter table" where these could |
905 | | // conflict with each other. |
906 | 0 | TEST_P(AlterTableTest, TestMultipleAlters) { |
907 | 0 | YBTableName kSplitTableName(YQL_DATABASE_CQL, "my_keyspace", "split-table"); |
908 | 0 | const size_t kNumNewCols = 10; |
909 | | |
910 | | // With more tablets, there's a greater chance that the TS will heartbeat |
911 | | // after some but not all tablets have finished altering. |
912 | 0 | ASSERT_OK(CreateTable(kSplitTableName)); |
913 | | |
914 | | // Issue a bunch of new alters without waiting for them to finish. |
915 | 0 | for (size_t i = 0; i < kNumNewCols; i++) { |
916 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kSplitTableName)); |
917 | 0 | table_alterer->AddColumn(strings::Substitute("new_col$0", i)) |
918 | 0 | ->Type(INT32)->NotNull(); |
919 | 0 | ASSERT_OK(table_alterer->wait(false)->Alter()); |
920 | 0 | } |
921 | | |
922 | | // Now wait. This should block on all of them. |
923 | 0 | ASSERT_OK(WaitAlterTableCompletion(kSplitTableName, 50)); |
924 | | |
925 | | // All new columns should be present. |
926 | 0 | YBSchema new_schema; |
927 | 0 | PartitionSchema partition_schema; |
928 | 0 | ASSERT_OK(client_->GetTableSchema(kSplitTableName, &new_schema, &partition_schema)); |
929 | 0 | ASSERT_EQ(kNumNewCols + schema_.num_columns(), new_schema.num_columns()); |
930 | 0 | } |
931 | | |
932 | 0 | TEST_P(ReplicatedAlterTableTest, TestReplicatedAlter) { |
933 | 0 | const int kNumRows = 100; |
934 | 0 | InsertRows(0, kNumRows); |
935 | |
|
936 | 0 | LOG(INFO) << "Verifying initial pattern"; |
937 | 0 | VerifyRows(0, kNumRows, C1_MATCHES_INDEX); |
938 | |
|
939 | 0 | LOG(INFO) << "Dropping and adding back c1"; |
940 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
941 | 0 | ASSERT_OK(table_alterer->DropColumn("c1")->Alter()); |
942 | |
|
943 | 0 | ASSERT_OK(AddNewI32Column(kTableName, "c1")); |
944 | |
|
945 | 0 | bool alter_in_progress; |
946 | 0 | string table_id; |
947 | 0 | ASSERT_OK(client_->IsAlterTableInProgress(kTableName, table_id, &alter_in_progress)); |
948 | 0 | ASSERT_FALSE(alter_in_progress); |
949 | |
|
950 | 0 | LOG(INFO) << "Verifying that the new default shows up"; |
951 | 0 | VerifyRows(0, kNumRows, C1_IS_DEADBEEF); |
952 | 0 | } |
953 | | |
954 | 0 | TEST_P(ReplicatedAlterTableTest, TestAlterOneTSDown) { |
955 | 0 | const int kNumRows = 100; |
956 | 0 | InsertRows(0, kNumRows); |
957 | |
|
958 | 0 | LOG(INFO) << "Verifying initial pattern"; |
959 | 0 | VerifyRows(0, kNumRows, C1_MATCHES_INDEX); |
960 | |
|
961 | 0 | ShutdownTS(0); |
962 | | // Now operating with 2 out of 3 servers. Alter should still work because it's quorum-based. |
963 | |
|
964 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
965 | 0 | ASSERT_OK(table_alterer->DropColumn("c1")->Alter()); |
966 | 0 | ASSERT_OK(AddNewI32Column(kTableName, "c1")); |
967 | 0 | ASSERT_OK(AddNewI32Column(kTableName, "new_col")); |
968 | |
|
969 | 0 | bool alter_in_progress; |
970 | 0 | string table_id; |
971 | 0 | ASSERT_OK(client_->IsAlterTableInProgress(kTableName, table_id, &alter_in_progress)); |
972 | 0 | ASSERT_FALSE(alter_in_progress); |
973 | |
|
974 | 0 | LOG(INFO) << "Verifying that the new default shows up"; |
975 | 0 | VerifyRows(0, kNumRows, C1_IS_DEADBEEF); |
976 | |
|
977 | 0 | RestartTabletServer(0); |
978 | 0 | } |
979 | | |
980 | | } // namespace yb |