/Users/deen/code/yugabyte-db/src/yb/client/client-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <algorithm> |
34 | | #include <functional> |
35 | | #include <regex> |
36 | | #include <set> |
37 | | #include <thread> |
38 | | #include <vector> |
39 | | |
40 | | #include <gflags/gflags.h> |
41 | | #include <gtest/gtest.h> |
42 | | |
43 | | #include "yb/client/async_initializer.h" |
44 | | #include "yb/client/client-internal.h" |
45 | | #include "yb/client/client-test-util.h" |
46 | | #include "yb/client/client.h" |
47 | | #include "yb/client/client_utils.h" |
48 | | #include "yb/client/error.h" |
49 | | #include "yb/client/meta_cache.h" |
50 | | #include "yb/client/schema.h" |
51 | | #include "yb/client/session.h" |
52 | | #include "yb/client/table.h" |
53 | | #include "yb/client/table_alterer.h" |
54 | | #include "yb/client/table_creator.h" |
55 | | #include "yb/client/table_handle.h" |
56 | | #include "yb/client/table_info.h" |
57 | | #include "yb/client/tablet_server.h" |
58 | | #include "yb/client/value.h" |
59 | | #include "yb/client/yb_op.h" |
60 | | |
61 | | #include "yb/common/partial_row.h" |
62 | | #include "yb/common/ql_type.h" |
63 | | #include "yb/common/ql_value.h" |
64 | | #include "yb/common/schema.h" |
65 | | #include "yb/common/wire_protocol.h" |
66 | | |
67 | | #include "yb/consensus/consensus.proxy.h" |
68 | | |
69 | | #include "yb/gutil/algorithm.h" |
70 | | #include "yb/gutil/atomicops.h" |
71 | | #include "yb/gutil/stl_util.h" |
72 | | #include "yb/gutil/strings/substitute.h" |
73 | | |
74 | | #include "yb/integration-tests/mini_cluster.h" |
75 | | #include "yb/integration-tests/yb_mini_cluster_test_base.h" |
76 | | |
77 | | #include "yb/master/catalog_manager_if.h" |
78 | | #include "yb/master/master.h" |
79 | | #include "yb/master/master_client.pb.h" |
80 | | #include "yb/master/master_ddl.pb.h" |
81 | | #include "yb/master/master_error.h" |
82 | | #include "yb/master/mini_master.h" |
83 | | |
84 | | #include "yb/rpc/messenger.h" |
85 | | #include "yb/rpc/proxy.h" |
86 | | #include "yb/rpc/rpc_controller.h" |
87 | | #include "yb/rpc/rpc_test_util.h" |
88 | | |
89 | | #include "yb/tablet/tablet.h" |
90 | | #include "yb/tablet/tablet_metadata.h" |
91 | | #include "yb/tablet/tablet_peer.h" |
92 | | |
93 | | #include "yb/tserver/mini_tablet_server.h" |
94 | | #include "yb/tserver/tablet_server.h" |
95 | | #include "yb/tserver/ts_tablet_manager.h" |
96 | | #include "yb/tserver/tserver_service.proxy.h" |
97 | | |
98 | | #include "yb/util/capabilities.h" |
99 | | #include "yb/util/metrics.h" |
100 | | #include "yb/util/net/dns_resolver.h" |
101 | | #include "yb/util/net/sockaddr.h" |
102 | | #include "yb/util/random_util.h" |
103 | | #include "yb/util/status.h" |
104 | | #include "yb/util/status_log.h" |
105 | | #include "yb/util/stopwatch.h" |
106 | | #include "yb/util/test_thread_holder.h" |
107 | | #include "yb/util/test_util.h" |
108 | | #include "yb/util/thread.h" |
109 | | #include "yb/util/tostring.h" |
110 | | #include "yb/util/tsan_util.h" |
111 | | |
112 | | #include "yb/yql/cql/ql/util/statement_result.h" |
113 | | |
114 | | DECLARE_bool(enable_data_block_fsync); |
115 | | DECLARE_bool(log_inject_latency); |
116 | | DECLARE_double(leader_failure_max_missed_heartbeat_periods); |
117 | | DECLARE_int32(heartbeat_interval_ms); |
118 | | DECLARE_int32(log_inject_latency_ms_mean); |
119 | | DECLARE_int32(log_inject_latency_ms_stddev); |
120 | | DECLARE_int32(master_inject_latency_on_tablet_lookups_ms); |
121 | | DECLARE_int32(max_create_tablets_per_ts); |
122 | | DECLARE_int32(TEST_scanner_inject_latency_on_each_batch_ms); |
123 | | DECLARE_int32(scanner_max_batch_size_bytes); |
124 | | DECLARE_int32(scanner_ttl_ms); |
125 | | DECLARE_int32(tablet_server_svc_queue_length); |
126 | | DECLARE_int32(replication_factor); |
127 | | |
128 | | DEFINE_int32(test_scan_num_rows, 1000, "Number of rows to insert and scan"); |
129 | | DECLARE_int32(min_backoff_ms_exponent); |
130 | | DECLARE_int32(max_backoff_ms_exponent); |
131 | | DECLARE_bool(TEST_force_master_lookup_all_tablets); |
132 | | DECLARE_double(TEST_simulate_lookup_timeout_probability); |
133 | | DECLARE_string(TEST_fail_to_fast_resolve_address); |
134 | | |
135 | | METRIC_DECLARE_counter(rpcs_queue_overflow); |
136 | | |
137 | | DEFINE_CAPABILITY(ClientTest, 0x1523c5ae); |
138 | | DECLARE_CAPABILITY(TabletReportLimit); |
139 | | |
140 | | using namespace std::literals; // NOLINT |
141 | | using namespace std::placeholders; |
142 | | |
143 | | namespace yb { |
144 | | namespace client { |
145 | | |
146 | | using std::string; |
147 | | using std::set; |
148 | | using std::vector; |
149 | | |
150 | | using base::subtle::Atomic32; |
151 | | using base::subtle::NoBarrier_AtomicIncrement; |
152 | | using base::subtle::NoBarrier_Load; |
153 | | using base::subtle::NoBarrier_Store; |
154 | | using master::CatalogManager; |
155 | | using master::GetNamespaceInfoResponsePB; |
156 | | using master::GetTableLocationsRequestPB; |
157 | | using master::GetTableLocationsResponsePB; |
158 | | using master::TabletLocationsPB; |
159 | | using master::TSInfoPB; |
160 | | using std::shared_ptr; |
161 | | using tablet::TabletPeer; |
162 | | using tserver::MiniTabletServer; |
163 | | |
164 | | namespace { |
165 | | |
166 | | constexpr int32_t kNoBound = kint32max; |
167 | | constexpr int kNumTablets = 2; |
168 | | |
169 | | const std::string kKeyspaceName = "my_keyspace"; |
170 | | const std::string kPgsqlKeyspaceID = "1234"; |
171 | | const std::string kPgsqlKeyspaceName = "psql" + kKeyspaceName; |
172 | | |
173 | | } // namespace |
174 | | |
175 | | class ClientTest: public YBMiniClusterTestBase<MiniCluster> { |
176 | | public: |
177 | 61 | ClientTest() { |
178 | 61 | YBSchemaBuilder b; |
179 | 61 | b.AddColumn("key")->Type(INT32)->NotNull()->HashPrimaryKey(); |
180 | 61 | b.AddColumn("int_val")->Type(INT32)->NotNull(); |
181 | 61 | b.AddColumn("string_val")->Type(STRING)->Nullable(); |
182 | 61 | b.AddColumn("non_null_with_default")->Type(INT32)->NotNull(); |
183 | 61 | CHECK_OK(b.Build(&schema_)); |
184 | | |
185 | 61 | FLAGS_enable_data_block_fsync = false; // Keep unit tests fast. |
186 | 61 | } |
187 | | |
188 | 61 | void SetUp() override { |
189 | 61 | YBMiniClusterTestBase::SetUp(); |
190 | | |
191 | | // Reduce the TS<->Master heartbeat interval |
192 | 61 | FLAGS_heartbeat_interval_ms = 10; |
193 | | |
194 | | // Start minicluster and wait for tablet servers to connect to master. |
195 | 61 | auto opts = MiniClusterOptions(); |
196 | 61 | opts.num_tablet_servers = 3; |
197 | 61 | opts.num_masters = NumMasters(); |
198 | 61 | cluster_.reset(new MiniCluster(opts)); |
199 | 61 | ASSERT_OK(cluster_->Start()); |
200 | | |
201 | | // Connect to the cluster. |
202 | 0 | ASSERT_OK(InitClient()); |
203 | | |
204 | | // Create a keyspace; |
205 | 0 | ASSERT_OK(client_->CreateNamespace(kKeyspaceName)); |
206 | |
|
207 | 0 | ASSERT_NO_FATALS(CreateTable(kTableName, kNumTablets, &client_table_)); |
208 | 0 | ASSERT_NO_FATALS(CreateTable(kTable2Name, 1, &client_table2_)); |
209 | 0 | } |
210 | | |
211 | 5 | void DoTearDown() override { |
212 | 5 | client_.reset(); |
213 | 5 | if (cluster_) { |
214 | 5 | cluster_->Shutdown(); |
215 | 5 | cluster_.reset(); |
216 | 5 | } |
217 | 5 | YBMiniClusterTestBase::DoTearDown(); |
218 | 5 | } |
219 | | |
220 | | protected: |
221 | | static const YBTableName kTableName; |
222 | | static const YBTableName kTable2Name; |
223 | | static const YBTableName kTable3Name; |
224 | | |
225 | 60 | virtual int NumMasters() { |
226 | 60 | return 1; |
227 | 60 | } |
228 | | |
229 | 0 | virtual Status InitClient() { |
230 | 0 | client_ = VERIFY_RESULT(YBClientBuilder() |
231 | 0 | .add_master_server_addr(yb::ToString(cluster_->mini_master()->bound_rpc_addr())) |
232 | 0 | .Build()); |
233 | 0 | return Status::OK(); |
234 | 0 | } |
235 | | |
236 | 0 | string GetFirstTabletId(YBTable* table) { |
237 | 0 | GetTableLocationsRequestPB req; |
238 | 0 | GetTableLocationsResponsePB resp; |
239 | 0 | table->name().SetIntoTableIdentifierPB(req.mutable_table()); |
240 | 0 | CHECK_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp)); |
241 | 0 | CHECK_GT(resp.tablet_locations_size(), 0); |
242 | 0 | return resp.tablet_locations(0).tablet_id(); |
243 | 0 | } |
244 | | |
245 | 0 | void CheckNoRpcOverflow() { |
246 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
247 | 0 | MiniTabletServer* server = cluster_->mini_tablet_server(i); |
248 | 0 | if (server->is_started()) { |
249 | 0 | ASSERT_EQ(0, server->server()->rpc_server()-> |
250 | 0 | TEST_service_pool("yb.tserver.TabletServerService")-> |
251 | 0 | RpcsQueueOverflowMetric()->value()); |
252 | 0 | } |
253 | 0 | } |
254 | 0 | } |
255 | | |
256 | 0 | YBSessionPtr CreateSession(YBClient* client = nullptr) { |
257 | 0 | if (client == nullptr) { |
258 | 0 | client = client_.get(); |
259 | 0 | } |
260 | 0 | std::shared_ptr<YBSession> session = client->NewSession(); |
261 | 0 | session->SetTimeout(10s); |
262 | 0 | return session; |
263 | 0 | } |
264 | | |
265 | | // Inserts 'num_rows' test rows using 'client' |
266 | 0 | void InsertTestRows(YBClient* client, const TableHandle& table, int num_rows, int first_row = 0) { |
267 | 0 | auto session = CreateSession(client); |
268 | 0 | for (int i = first_row; i < num_rows + first_row; i++) { |
269 | 0 | session->Apply(BuildTestRow(table, i)); |
270 | 0 | } |
271 | 0 | FlushSessionOrDie(session); |
272 | 0 | ASSERT_NO_FATALS(CheckNoRpcOverflow()); |
273 | 0 | } |
274 | | |
275 | | // Inserts 'num_rows' using the default client. |
276 | 0 | void InsertTestRows(const TableHandle& table, int num_rows, int first_row = 0) { |
277 | 0 | InsertTestRows(client_.get(), table, num_rows, first_row); |
278 | 0 | } |
279 | | |
280 | 0 | void UpdateTestRows(const TableHandle& table, int lo, int hi) { |
281 | 0 | auto session = CreateSession(); |
282 | 0 | for (int i = lo; i < hi; i++) { |
283 | 0 | session->Apply(UpdateTestRow(table, i)); |
284 | 0 | } |
285 | 0 | FlushSessionOrDie(session); |
286 | 0 | ASSERT_NO_FATALS(CheckNoRpcOverflow()); |
287 | 0 | } |
288 | | |
289 | 0 | void DeleteTestRows(const TableHandle& table, int lo, int hi) { |
290 | 0 | auto session = CreateSession(); |
291 | 0 | for (int i = lo; i < hi; i++) { |
292 | 0 | session->Apply(DeleteTestRow(table, i)); |
293 | 0 | } |
294 | 0 | FlushSessionOrDie(session); |
295 | 0 | ASSERT_NO_FATALS(CheckNoRpcOverflow()); |
296 | 0 | } |
297 | | |
298 | 0 | shared_ptr<YBqlWriteOp> BuildTestRow(const TableHandle& table, int index) { |
299 | 0 | auto insert = table.NewInsertOp(); |
300 | 0 | auto req = insert->mutable_request(); |
301 | 0 | QLAddInt32HashValue(req, index); |
302 | 0 | const auto& columns = table.schema().columns(); |
303 | 0 | table.AddInt32ColumnValue(req, columns[1].name(), index * 2); |
304 | 0 | table.AddStringColumnValue(req, columns[2].name(), StringPrintf("hello %d", index)); |
305 | 0 | table.AddInt32ColumnValue(req, columns[3].name(), index * 3); |
306 | 0 | return insert; |
307 | 0 | } |
308 | | |
309 | 0 | shared_ptr<YBqlWriteOp> UpdateTestRow(const TableHandle& table, int index) { |
310 | 0 | auto update = table.NewUpdateOp(); |
311 | 0 | auto req = update->mutable_request(); |
312 | 0 | QLAddInt32HashValue(req, index); |
313 | 0 | const auto& columns = table.schema().columns(); |
314 | 0 | table.AddInt32ColumnValue(req, columns[1].name(), index * 2 + 1); |
315 | 0 | table.AddStringColumnValue(req, columns[2].name(), StringPrintf("hello again %d", index)); |
316 | 0 | return update; |
317 | 0 | } |
318 | | |
319 | 0 | shared_ptr<YBqlWriteOp> DeleteTestRow(const TableHandle& table, int index) { |
320 | 0 | auto del = table.NewDeleteOp(); |
321 | 0 | QLAddInt32HashValue(del->mutable_request(), index); |
322 | 0 | return del; |
323 | 0 | } |
324 | | |
325 | 0 | void DoTestScanWithoutPredicates() { |
326 | 0 | client::TableIteratorOptions options; |
327 | 0 | options.columns = std::vector<std::string>{"key"}; |
328 | 0 | LOG_TIMING(INFO, "Scanning with no predicates") { |
329 | 0 | uint64_t sum = 0; |
330 | 0 | for (const auto& row : client::TableRange(client_table_, options)) { |
331 | 0 | sum += row.column(0).int32_value(); |
332 | 0 | } |
333 | | // The sum should be the sum of the arithmetic series from |
334 | | // 0..FLAGS_test_scan_num_rows-1 |
335 | 0 | uint64_t expected = FLAGS_test_scan_num_rows * |
336 | 0 | (0 + (FLAGS_test_scan_num_rows - 1)) / 2; |
337 | 0 | ASSERT_EQ(expected, sum); |
338 | 0 | } |
339 | 0 | } |
340 | | |
341 | 0 | void DoTestScanWithStringPredicate() { |
342 | 0 | TableIteratorOptions options; |
343 | 0 | options.filter = FilterBetween("hello 2"s, Inclusive::kFalse, |
344 | 0 | "hello 3"s, Inclusive::kFalse, |
345 | 0 | "string_val"); |
346 | |
|
347 | 0 | bool found = false; |
348 | 0 | LOG_TIMING(INFO, "Scanning with string predicate") { |
349 | 0 | for (const auto& row : TableRange(client_table_, options)) { |
350 | 0 | found = true; |
351 | 0 | Slice slice(row.column(2).string_value()); |
352 | 0 | if (!slice.starts_with("hello 2") && !slice.starts_with("hello 3")) { |
353 | 0 | FAIL() << row.ToString(); |
354 | 0 | } |
355 | 0 | } |
356 | 0 | } |
357 | 0 | ASSERT_TRUE(found); |
358 | 0 | } |
359 | | |
360 | 0 | void DoTestScanWithKeyPredicate() { |
361 | 0 | auto op = client_table_.NewReadOp(); |
362 | 0 | auto req = op->mutable_request(); |
363 | |
|
364 | 0 | auto* const condition = req->mutable_where_expr()->mutable_condition(); |
365 | 0 | condition->set_op(QL_OP_AND); |
366 | 0 | client_table_.AddInt32Condition(condition, "key", QL_OP_GREATER_THAN_EQUAL, 5); |
367 | 0 | client_table_.AddInt32Condition(condition, "key", QL_OP_LESS_THAN_EQUAL, 10); |
368 | 0 | client_table_.AddColumns({"key"}, req); |
369 | 0 | auto session = client_->NewSession(); |
370 | 0 | session->SetTimeout(60s); |
371 | 0 | ASSERT_OK(session->ApplyAndFlush(op)); |
372 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status()); |
373 | 0 | auto rowblock = ql::RowsResult(op.get()).GetRowBlock(); |
374 | 0 | for (const auto& row : rowblock->rows()) { |
375 | 0 | int32_t key = row.column(0).int32_value(); |
376 | 0 | ASSERT_GE(key, 5); |
377 | 0 | ASSERT_LE(key, 10); |
378 | 0 | } |
379 | 0 | } |
380 | | |
381 | | // Creates a table with RF=FLAGS_replication_factor, split into tablets based on 'split_rows' |
382 | | // (or single tablet if 'split_rows' is empty). |
383 | | void CreateTable(const YBTableName& table_name_orig, |
384 | | int num_tablets, |
385 | 0 | TableHandle* table) { |
386 | 0 | size_t num_replicas = FLAGS_replication_factor; |
387 | | // The implementation allows table name without a keyspace. |
388 | 0 | YBTableName table_name(table_name_orig.namespace_type(), table_name_orig.has_namespace() ? |
389 | 0 | table_name_orig.namespace_name() : kKeyspaceName, table_name_orig.table_name()); |
390 | |
|
391 | 0 | bool added_replicas = false; |
392 | | // Add more tablet servers to satisfy all replicas, if necessary. |
393 | 0 | while (cluster_->num_tablet_servers() < num_replicas) { |
394 | 0 | ASSERT_OK(cluster_->AddTabletServer()); |
395 | 0 | added_replicas = true; |
396 | 0 | } |
397 | |
|
398 | 0 | if (added_replicas) { |
399 | 0 | ASSERT_OK(cluster_->WaitForTabletServerCount(num_replicas)); |
400 | 0 | } |
401 | |
|
402 | 0 | ASSERT_OK(table->Create(table_name, num_tablets, schema_, client_.get())); |
403 | 0 | } |
404 | | |
405 | | // Kills a tablet server. |
406 | | // Boolean flags control whether to restart the tserver, and if so, whether to wait for it to |
407 | | // finish bootstrapping. |
408 | 0 | Status KillTServerImpl(const string& uuid, const bool restart, const bool wait_started) { |
409 | 0 | bool ts_found = false; |
410 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
411 | 0 | MiniTabletServer* ts = cluster_->mini_tablet_server(i); |
412 | 0 | if (ts->server()->instance_pb().permanent_uuid() == uuid) { |
413 | 0 | if (restart) { |
414 | 0 | LOG(INFO) << "Restarting TS at " << ts->bound_rpc_addr(); |
415 | 0 | RETURN_NOT_OK(ts->Restart()); |
416 | 0 | if (wait_started) { |
417 | 0 | LOG(INFO) << "Waiting for TS " << ts->bound_rpc_addr() << " to finish bootstrapping"; |
418 | 0 | RETURN_NOT_OK(ts->WaitStarted()); |
419 | 0 | } |
420 | 0 | } else { |
421 | 0 | LOG(INFO) << "Killing TS " << uuid << " at " << ts->bound_rpc_addr(); |
422 | 0 | ts->Shutdown(); |
423 | 0 | } |
424 | 0 | ts_found = true; |
425 | 0 | break; |
426 | 0 | } |
427 | 0 | } |
428 | 0 | if (!ts_found) { |
429 | 0 | return STATUS(InvalidArgument, strings::Substitute("Could not find tablet server $1", uuid)); |
430 | 0 | } |
431 | | |
432 | 0 | return Status::OK(); |
433 | 0 | } |
434 | | |
435 | 0 | Status RestartTServerAndWait(const string& uuid) { |
436 | 0 | return KillTServerImpl(uuid, true, true); |
437 | 0 | } |
438 | | |
439 | 0 | Status RestartTServerAsync(const string& uuid) { |
440 | 0 | return KillTServerImpl(uuid, true, false); |
441 | 0 | } |
442 | | |
443 | 0 | Status KillTServer(const string& uuid) { |
444 | 0 | return KillTServerImpl(uuid, false, false); |
445 | 0 | } |
446 | | |
447 | | void DoApplyWithoutFlushTest(int sleep_micros); |
448 | | |
449 | 0 | Result<std::unique_ptr<rpc::Messenger>> CreateMessenger(const std::string& name) { |
450 | 0 | return rpc::MessengerBuilder(name).Build(); |
451 | 0 | } |
452 | | |
453 | | void VerifyKeyRangeFiltering(const std::vector<string>& sorted_partitions, |
454 | | const std::vector<internal::RemoteTabletPtr>& tablets, |
455 | 0 | const string& start_key, const string& end_key) { |
456 | 0 | auto start_idx = FindPartitionStartIndex(sorted_partitions, start_key); |
457 | 0 | auto end_idx = FindPartitionStartIndexExclusiveBound(sorted_partitions, end_key); |
458 | 0 | auto filtered_tablets = |
459 | 0 | ASSERT_RESULT(FilterTabletsByHashPartitionKeyRange(tablets, start_key, end_key)); |
460 | 0 | std::vector<string> filtered_partitions; |
461 | 0 | std::transform(filtered_tablets.begin(), filtered_tablets.end(), |
462 | 0 | std::back_inserter(filtered_partitions), |
463 | 0 | [](const auto& tablet) { return tablet->partition().partition_key_start(); }); |
464 | 0 | std::sort(filtered_partitions.begin(), filtered_partitions.end()); |
465 | |
|
466 | 0 | ASSERT_EQ(filtered_partitions, |
467 | 0 | std::vector<string>(&sorted_partitions[start_idx], &sorted_partitions[end_idx + 1])); |
468 | 0 | } |
469 | | |
470 | | size_t FindPartitionStartIndexExclusiveBound( |
471 | | const std::vector<std::string>& partitions, |
472 | 0 | const std::string& partition_key) { |
473 | 0 | if (partition_key.empty()) { |
474 | 0 | return partitions.size() - 1; |
475 | 0 | } |
476 | | |
477 | 0 | auto it = std::lower_bound(partitions.begin(), partitions.end(), partition_key); |
478 | 0 | if (it == partitions.end() || *it >= partition_key) { |
479 | 0 | if (it == partitions.begin()) { |
480 | 0 | return 0; |
481 | 0 | } |
482 | 0 | --it; |
483 | 0 | } |
484 | 0 | return it - partitions.begin(); |
485 | 0 | } |
486 | | |
487 | | enum WhichServerToKill { |
488 | | DEAD_MASTER, |
489 | | DEAD_TSERVER |
490 | | }; |
491 | | void DoTestWriteWithDeadServer(WhichServerToKill which); |
492 | | |
493 | | YBSchema schema_; |
494 | | |
495 | | std::unique_ptr<MiniCluster> cluster_; |
496 | | std::unique_ptr<YBClient> client_; |
497 | | TableHandle client_table_; |
498 | | TableHandle client_table2_; |
499 | | TableHandle client_table3_; |
500 | | }; |
501 | | |
502 | | |
503 | | const YBTableName ClientTest::kTableName(YQL_DATABASE_CQL, kKeyspaceName, "client-testtb"); |
504 | | const YBTableName ClientTest::kTable2Name(YQL_DATABASE_CQL, kKeyspaceName, "client-testtb2"); |
505 | | const YBTableName ClientTest::kTable3Name(YQL_DATABASE_CQL, kKeyspaceName, "client-testtb3"); |
506 | | |
507 | | namespace { |
508 | | |
509 | 0 | TableFilter MakeFilter(int32_t lower_bound, int32_t upper_bound, std::string column = "key") { |
510 | 0 | if (lower_bound != kNoBound) { |
511 | 0 | if (upper_bound != kNoBound) { |
512 | 0 | return FilterBetween(lower_bound, Inclusive::kTrue, upper_bound, Inclusive::kTrue, |
513 | 0 | std::move(column)); |
514 | 0 | } else { |
515 | 0 | return FilterGreater(lower_bound, Inclusive::kTrue, std::move(column)); |
516 | 0 | } |
517 | 0 | } |
518 | 0 | if (upper_bound != kNoBound) { |
519 | 0 | return FilterLess(upper_bound, Inclusive::kTrue, std::move(column)); |
520 | 0 | } |
521 | 0 | return TableFilter(); |
522 | 0 | } |
523 | | |
524 | | size_t CountRowsFromClient(const TableHandle& table, YBConsistencyLevel consistency, |
525 | 0 | int32_t lower_bound, int32_t upper_bound) { |
526 | 0 | TableIteratorOptions options; |
527 | 0 | options.consistency = consistency; |
528 | 0 | options.columns = std::vector<std::string>{"key"}; |
529 | 0 | options.filter = MakeFilter(lower_bound, upper_bound); |
530 | 0 | return boost::size(TableRange(table, options)); |
531 | 0 | } |
532 | | |
533 | 0 | size_t CountRowsFromClient(const TableHandle& table, int32_t lower_bound, int32_t upper_bound) { |
534 | 0 | return CountRowsFromClient(table, YBConsistencyLevel::STRONG, lower_bound, upper_bound); |
535 | 0 | } |
536 | | |
537 | 0 | size_t CountRowsFromClient(const TableHandle& table) { |
538 | 0 | return CountRowsFromClient(table, kNoBound, kNoBound); |
539 | 0 | } |
540 | | |
541 | | // Count the rows of a table, checking that the operation succeeds. |
542 | | // |
543 | | // Must be public to use as a thread closure. |
544 | 0 | void CheckRowCount(const TableHandle& table) { |
545 | 0 | CountRowsFromClient(table); |
546 | 0 | } |
547 | | |
548 | | } // namespace |
549 | | |
550 | | constexpr int kLookupWaitTimeSecs = 30; |
551 | | constexpr int kNumTabletsPerTable = 8; |
552 | | constexpr int kNumIterations = 1000; |
553 | | |
554 | | class ClientTestForceMasterLookup : |
555 | | public ClientTest, public ::testing::WithParamInterface<bool /* force_master_lookup */> { |
556 | | public: |
557 | 2 | void SetUp() override { |
558 | 2 | ClientTest::SetUp(); |
559 | | // Do we want to force going to the master instead of using cache. |
560 | 2 | SetAtomicFlag(GetParam(), &FLAGS_TEST_force_master_lookup_all_tablets); |
561 | 2 | SetAtomicFlag(0.5, &FLAGS_TEST_simulate_lookup_timeout_probability); |
562 | 2 | } |
563 | | |
564 | | |
565 | 0 | void PerformManyLookups(const std::shared_ptr<YBTable>& table, bool point_lookup) { |
566 | 0 | for (int i = 0; i < kNumIterations; i++) { |
567 | 0 | if (point_lookup) { |
568 | 0 | auto key_rt = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), table).get()); |
569 | 0 | ASSERT_NOTNULL(key_rt); |
570 | 0 | } else { |
571 | 0 | auto tablets = ASSERT_RESULT(client_->LookupAllTabletsFuture( |
572 | 0 | table, CoarseMonoClock::Now() + MonoDelta::FromSeconds(kLookupWaitTimeSecs)).get()); |
573 | 0 | ASSERT_EQ(tablets.size(), kNumTabletsPerTable); |
574 | 0 | } |
575 | 0 | } |
576 | 0 | } |
577 | | }; |
578 | | |
579 | | INSTANTIATE_TEST_CASE_P(ForceMasterLookup, ClientTestForceMasterLookup, ::testing::Bool()); |
580 | | |
581 | 0 | TEST_P(ClientTestForceMasterLookup, TestConcurrentLookups) { |
582 | 0 | ASSERT_NO_FATALS(CreateTable(kTable3Name, kNumTabletsPerTable, &client_table3_)); |
583 | |
|
584 | 0 | std::shared_ptr<YBTable> table; |
585 | 0 | ASSERT_OK(client_->OpenTable(kTable3Name, &table)); |
586 | |
|
587 | 0 | ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()-> |
588 | 0 | WaitUntilCatalogManagerIsLeaderAndReadyForTests()); |
589 | |
|
590 | 0 | auto t1 = std::thread([&]() { ASSERT_NO_FATALS( |
591 | 0 | PerformManyLookups(table, true /* point_lookup */)); }); |
592 | 0 | auto t2 = std::thread([&]() { ASSERT_NO_FATALS( |
593 | 0 | PerformManyLookups(table, false /* point_lookup */)); }); |
594 | |
|
595 | 0 | t1.join(); |
596 | 0 | t2.join(); |
597 | 0 | } |
598 | | |
599 | 0 | TEST_F(ClientTest, TestLookupAllTablets) { |
600 | 0 | ASSERT_NO_FATALS(CreateTable(kTable3Name, kNumTabletsPerTable, &client_table3_)); |
601 | |
|
602 | 0 | std::shared_ptr<YBTable> table; |
603 | 0 | ASSERT_OK(client_->OpenTable(kTable3Name, &table)); |
604 | |
|
605 | 0 | ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()-> |
606 | 0 | WaitUntilCatalogManagerIsLeaderAndReadyForTests()); |
607 | |
|
608 | 0 | auto future = client_->LookupAllTabletsFuture( |
609 | 0 | table, CoarseMonoClock::Now() + MonoDelta::FromSeconds(kLookupWaitTimeSecs)); |
610 | |
|
611 | 0 | auto tablets = ASSERT_RESULT(future.get()); |
612 | 0 | ASSERT_EQ(tablets.size(), 8); |
613 | 0 | } |
614 | | |
615 | 0 | TEST_F(ClientTest, TestPointThenRangeLookup) { |
616 | 0 | ASSERT_NO_FATALS(CreateTable(kTable3Name, kNumTabletsPerTable, &client_table3_)); |
617 | |
|
618 | 0 | std::shared_ptr<YBTable> table; |
619 | 0 | ASSERT_OK(client_->OpenTable(kTable3Name, &table)); |
620 | |
|
621 | 0 | ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()-> |
622 | 0 | WaitUntilCatalogManagerIsLeaderAndReadyForTests()); |
623 | |
|
624 | 0 | auto key_rt = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), table).get()); |
625 | 0 | ASSERT_NOTNULL(key_rt); |
626 | |
|
627 | 0 | auto tablets = ASSERT_RESULT(client_->LookupAllTabletsFuture( |
628 | 0 | table, CoarseMonoClock::Now() + MonoDelta::FromSeconds(kLookupWaitTimeSecs)).get()); |
629 | |
|
630 | 0 | ASSERT_EQ(tablets.size(), kNumTabletsPerTable); |
631 | 0 | } |
632 | | |
633 | 0 | TEST_F(ClientTest, TestKeyRangeFiltering) { |
634 | 0 | ASSERT_NO_FATALS(CreateTable(kTable3Name, 8, &client_table3_)); |
635 | |
|
636 | 0 | std::shared_ptr<YBTable> table; |
637 | 0 | ASSERT_OK(client_->OpenTable(kTable3Name, &table)); |
638 | |
|
639 | 0 | ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()-> |
640 | 0 | WaitUntilCatalogManagerIsLeaderAndReadyForTests()); |
641 | |
|
642 | 0 | auto tablets = ASSERT_RESULT(client_->LookupAllTabletsFuture( |
643 | 0 | table, CoarseMonoClock::Now() + MonoDelta::FromSeconds(kLookupWaitTimeSecs)).get()); |
644 | | // First, verify, that using empty bounds on both sides returns all tablets. |
645 | 0 | auto filtered_tablets = |
646 | 0 | ASSERT_RESULT(FilterTabletsByHashPartitionKeyRange(tablets, std::string(), std::string())); |
647 | 0 | ASSERT_EQ(kNumTabletsPerTable, filtered_tablets.size()); |
648 | |
|
649 | 0 | std::vector<std::string> partition_starts; |
650 | 0 | for (const auto& tablet : tablets) { |
651 | 0 | partition_starts.push_back(tablet->partition().partition_key_start()); |
652 | 0 | } |
653 | 0 | std::sort(partition_starts.begin(), partition_starts.end()); |
654 | |
|
655 | 0 | auto start_key = partition_starts[0]; |
656 | 0 | auto end_key = partition_starts[2]; |
657 | 0 | ASSERT_NO_FATALS(VerifyKeyRangeFiltering(partition_starts, tablets, start_key, end_key)); |
658 | |
|
659 | 0 | start_key = partition_starts[5]; |
660 | 0 | end_key = partition_starts[7]; |
661 | 0 | ASSERT_NO_FATALS(VerifyKeyRangeFiltering(partition_starts, tablets, start_key, end_key)); |
662 | |
|
663 | 0 | auto fixed_key = PartitionSchema::EncodeMultiColumnHashValue(10); |
664 | 0 | ASSERT_NOK(FilterTabletsByHashPartitionKeyRange(tablets, fixed_key, fixed_key)); |
665 | |
|
666 | 0 | for (int i = 0; i < kNumIterations; i++) { |
667 | 0 | auto start_idx = RandomUniformInt(0, PartitionSchema::kMaxPartitionKey - 1); |
668 | 0 | auto end_idx = RandomUniformInt(start_idx + 1, PartitionSchema::kMaxPartitionKey); |
669 | 0 | ASSERT_NO_FATALS(VerifyKeyRangeFiltering(partition_starts, tablets, |
670 | 0 | PartitionSchema::EncodeMultiColumnHashValue(start_idx), |
671 | 0 | PartitionSchema::EncodeMultiColumnHashValue(end_idx))); |
672 | 0 | } |
673 | 0 | } |
674 | | |
675 | 0 | TEST_F(ClientTest, TestListTables) { |
676 | 0 | auto tables = ASSERT_RESULT(client_->ListTables()); |
677 | 0 | std::sort(tables.begin(), tables.end(), [](const YBTableName& n1, const YBTableName& n2) { |
678 | 0 | return n1.ToString() < n2.ToString(); |
679 | 0 | }); |
680 | 0 | ASSERT_EQ(2 + master::kNumSystemTablesWithTxn, tables.size()); |
681 | 0 | ASSERT_EQ(kTableName, tables[0]) << "Tables:" << AsString(tables); |
682 | 0 | ASSERT_EQ(kTable2Name, tables[1]) << "Tables:" << AsString(tables); |
683 | 0 | tables.clear(); |
684 | 0 | tables = ASSERT_RESULT(client_->ListTables("testtb2")); |
685 | 0 | ASSERT_EQ(1, tables.size()); |
686 | 0 | ASSERT_EQ(kTable2Name, tables[0]) << "Tables:" << AsString(tables); |
687 | 0 | } |
688 | | |
689 | 0 | TEST_F(ClientTest, TestListTabletServers) { |
690 | 0 | auto tss = ASSERT_RESULT(client_->ListTabletServers()); |
691 | 0 | ASSERT_EQ(3, tss.size()); |
692 | 0 | set<string> actual_ts_uuids; |
693 | 0 | set<string> actual_ts_hostnames; |
694 | 0 | set<string> expected_ts_uuids; |
695 | 0 | set<string> expected_ts_hostnames; |
696 | 0 | for (size_t i = 0; i < tss.size(); ++i) { |
697 | 0 | auto server = cluster_->mini_tablet_server(i)->server(); |
698 | 0 | expected_ts_uuids.insert(server->instance_pb().permanent_uuid()); |
699 | 0 | actual_ts_uuids.insert(tss[i].uuid); |
700 | 0 | expected_ts_hostnames.insert(server->options().broadcast_addresses[0].host()); |
701 | 0 | actual_ts_hostnames.insert(tss[i].hostname); |
702 | 0 | } |
703 | 0 | ASSERT_EQ(expected_ts_uuids, actual_ts_uuids); |
704 | 0 | ASSERT_EQ(expected_ts_hostnames, actual_ts_hostnames); |
705 | 0 | } |
706 | | |
707 | 0 | bool TableNotFound(const Status& status) { |
708 | 0 | return status.IsNotFound() |
709 | 0 | && (master::MasterError(status) == master::MasterErrorPB::OBJECT_NOT_FOUND); |
710 | 0 | } |
711 | | |
712 | | TEST_F(ClientTest, TestBadTable) { |
713 | | shared_ptr<YBTable> t; |
714 | | Status s = client_->OpenTable( |
715 | | YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "xxx-does-not-exist"), &t); |
716 | | ASSERT_TRUE(TableNotFound(s)) << s; |
717 | | } |
718 | | |
719 | | // Test that, if the master is down, we experience a network error talking |
720 | | // to it (no "find the new leader master" since there's only one master). |
721 | 0 | TEST_F(ClientTest, TestMasterDown) { |
722 | 0 | DontVerifyClusterBeforeNextTearDown(); |
723 | 0 | cluster_->mini_master()->Shutdown(); |
724 | 0 | shared_ptr<YBTable> t; |
725 | 0 | client_->data_->default_admin_operation_timeout_ = MonoDelta::FromSeconds(1); |
726 | 0 | Status s = client_->OpenTable(YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "other-tablet"), &t); |
727 | 0 | ASSERT_TRUE(s.IsTimedOut()); |
728 | 0 | } |
729 | | |
730 | | // TODO scan with predicates is not supported. |
731 | 0 | TEST_F(ClientTest, TestScan) { |
732 | 0 | ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows)); |
733 | |
|
734 | 0 | ASSERT_EQ(FLAGS_test_scan_num_rows, CountRowsFromClient(client_table_)); |
735 | | |
736 | | // Scan after insert |
737 | 0 | DoTestScanWithoutPredicates(); |
738 | 0 | DoTestScanWithStringPredicate(); |
739 | 0 | DoTestScanWithKeyPredicate(); |
740 | | |
741 | | // Scan after update |
742 | 0 | UpdateTestRows(client_table_, 0, FLAGS_test_scan_num_rows); |
743 | 0 | DoTestScanWithKeyPredicate(); |
744 | | |
745 | | // Scan after delete half |
746 | 0 | DeleteTestRows(client_table_, 0, FLAGS_test_scan_num_rows / 2); |
747 | 0 | DoTestScanWithKeyPredicate(); |
748 | | |
749 | | // Scan after delete all |
750 | 0 | DeleteTestRows(client_table_, FLAGS_test_scan_num_rows / 2 + 1, FLAGS_test_scan_num_rows); |
751 | 0 | DoTestScanWithKeyPredicate(); |
752 | | |
753 | | // Scan after re-insert |
754 | 0 | InsertTestRows(client_table_, 1); |
755 | 0 | DoTestScanWithKeyPredicate(); |
756 | 0 | } |
757 | | |
758 | 0 | void CheckCounts(const TableHandle& table, const std::vector<int>& expected) { |
759 | 0 | std::vector<std::pair<int, int>> bounds = { |
760 | 0 | { kNoBound, kNoBound }, |
761 | 0 | { kNoBound, 15 }, |
762 | 0 | { 27, kNoBound }, |
763 | 0 | { 0, 15 }, |
764 | 0 | { 0, 10 }, |
765 | 0 | { 0, 20 }, |
766 | 0 | { 0, 30 }, |
767 | 0 | { 14, 30 }, |
768 | 0 | { 30, 30 }, |
769 | 0 | { 50, kNoBound }, |
770 | 0 | }; |
771 | 0 | ASSERT_EQ(bounds.size(), expected.size()); |
772 | 0 | for (size_t i = 0; i != bounds.size(); ++i) { |
773 | 0 | ASSERT_EQ(expected[i], CountRowsFromClient(table, bounds[i].first, bounds[i].second)); |
774 | 0 | } |
775 | | // Run through various scans. |
776 | 0 | } |
777 | | |
778 | 0 | TEST_F(ClientTest, TestScanMultiTablet) { |
779 | | // 5 tablets, each with 10 rows worth of space. |
780 | 0 | TableHandle table; |
781 | 0 | ASSERT_NO_FATALS(CreateTable(YBTableName(YQL_DATABASE_CQL, "TestScanMultiTablet"), 5, &table)); |
782 | | |
783 | | // Insert rows with keys 12, 13, 15, 17, 22, 23, 25, 27...47 into each |
784 | | // tablet, except the first which is empty. |
785 | 0 | auto session = CreateSession(); |
786 | 0 | for (int i = 1; i < 5; i++) { |
787 | 0 | session->Apply(BuildTestRow(table, 2 + (i * 10))); |
788 | 0 | session->Apply(BuildTestRow(table, 3 + (i * 10))); |
789 | 0 | session->Apply(BuildTestRow(table, 5 + (i * 10))); |
790 | 0 | session->Apply(BuildTestRow(table, 7 + (i * 10))); |
791 | 0 | } |
792 | 0 | FlushSessionOrDie(session); |
793 | | |
794 | | // Run through various scans. |
795 | 0 | CheckCounts(table, { 16, 3, 9, 3, 0, 4, 8, 6, 0, 0 }); |
796 | | |
797 | | // Update every other row |
798 | 0 | for (int i = 1; i < 5; ++i) { |
799 | 0 | session->Apply(UpdateTestRow(table, 2 + i * 10)); |
800 | 0 | session->Apply(UpdateTestRow(table, 5 + i * 10)); |
801 | 0 | } |
802 | 0 | FlushSessionOrDie(session); |
803 | | |
804 | | // Check all counts the same (make sure updates don't change # of rows) |
805 | 0 | CheckCounts(table, { 16, 3, 9, 3, 0, 4, 8, 6, 0, 0 }); |
806 | | |
807 | | // Delete half the rows |
808 | 0 | for (int i = 1; i < 5; ++i) { |
809 | 0 | session->Apply(DeleteTestRow(table, 5 + i*10)); |
810 | 0 | session->Apply(DeleteTestRow(table, 7 + i*10)); |
811 | 0 | } |
812 | 0 | FlushSessionOrDie(session); |
813 | | |
814 | | // Check counts changed accordingly |
815 | 0 | CheckCounts(table, { 8, 2, 4, 2, 0, 2, 4, 2, 0, 0 }); |
816 | | |
817 | | // Delete rest of rows |
818 | 0 | for (int i = 1; i < 5; ++i) { |
819 | 0 | session->Apply(DeleteTestRow(table, 2 + i*10)); |
820 | 0 | session->Apply(DeleteTestRow(table, 3 + i*10)); |
821 | 0 | } |
822 | 0 | FlushSessionOrDie(session); |
823 | | |
824 | | // Check counts changed accordingly |
825 | 0 | CheckCounts(table, { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }); |
826 | 0 | } |
827 | | |
828 | 0 | TEST_F(ClientTest, TestScanEmptyTable) { |
829 | 0 | TableIteratorOptions options; |
830 | 0 | options.columns = std::vector<std::string>(); |
831 | 0 | ASSERT_EQ(boost::size(TableRange(client_table_, options)), 0); |
832 | 0 | } |
833 | | |
834 | | // Test scanning with an empty projection. This should yield an empty |
835 | | // row block with the proper number of rows filled in. Impala issues |
836 | | // scans like this in order to implement COUNT(*). |
837 | 0 | TEST_F(ClientTest, TestScanEmptyProjection) { |
838 | 0 | ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows)); |
839 | 0 | TableIteratorOptions options; |
840 | 0 | options.columns = std::vector<std::string>(); |
841 | 0 | ASSERT_EQ(boost::size(TableRange(client_table_, options)), FLAGS_test_scan_num_rows); |
842 | 0 | } |
843 | | |
844 | | // Test a scan where we have a predicate on a key column that is not |
845 | | // in the projection. |
846 | 0 | TEST_F(ClientTest, TestScanPredicateKeyColNotProjected) { |
847 | 0 | ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows)); |
848 | |
|
849 | 0 | size_t nrows = 0; |
850 | 0 | TableIteratorOptions options; |
851 | 0 | options.columns = std::vector<std::string>{"key", "int_val"}; |
852 | 0 | options.filter = MakeFilter(5, 10); |
853 | 0 | for (const auto& row : TableRange(client_table_, options)) { |
854 | 0 | int32_t key = row.column(0).int32_value(); |
855 | 0 | int32_t val = row.column(1).int32_value(); |
856 | 0 | ASSERT_EQ(key * 2, val); |
857 | |
|
858 | 0 | ++nrows; |
859 | 0 | } |
860 | |
|
861 | 0 | ASSERT_EQ(6, nrows); |
862 | 0 | } |
863 | | |
864 | | // Test a scan where we have a predicate on a non-key column that is |
865 | | // not in the projection. |
866 | 0 | TEST_F(ClientTest, TestScanPredicateNonKeyColNotProjected) { |
867 | 0 | ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows)); |
868 | |
|
869 | 0 | size_t nrows = 0; |
870 | 0 | TableIteratorOptions options; |
871 | 0 | options.columns = std::vector<std::string>{"key", "int_val"}; |
872 | 0 | options.filter = MakeFilter(10, 20, "int_val"); |
873 | 0 | TableRange range(client_table_, options); |
874 | 0 | for (const auto& row : range) { |
875 | 0 | int32_t key = row.column(0).int32_value(); |
876 | 0 | int32_t val = row.column(1).int32_value(); |
877 | 0 | ASSERT_EQ(key * 2, val); |
878 | |
|
879 | 0 | ++nrows; |
880 | 0 | } |
881 | |
|
882 | 0 | ASSERT_EQ(nrows, 6); |
883 | 0 | } |
884 | | |
885 | 0 | TEST_F(ClientTest, TestGetTabletServerBlacklist) { |
886 | 0 | TableHandle table; |
887 | 0 | ASSERT_NO_FATALS(CreateTable(YBTableName(YQL_DATABASE_CQL, "blacklist"), kNumTablets, &table)); |
888 | 0 | InsertTestRows(table, 1, 0); |
889 | | |
890 | | // Look up the tablet and its replicas into the metadata cache. |
891 | | // We have to loop since some replicas may have been created slowly. |
892 | 0 | scoped_refptr<internal::RemoteTablet> rt; |
893 | 0 | while (true) { |
894 | 0 | rt = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), table.table()).get()); |
895 | 0 | ASSERT_TRUE(rt.get() != nullptr); |
896 | 0 | vector<internal::RemoteTabletServer*> tservers; |
897 | 0 | rt->GetRemoteTabletServers(&tservers); |
898 | 0 | if (tservers.size() == 3) { |
899 | 0 | break; |
900 | 0 | } |
901 | 0 | rt->MarkStale(); |
902 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
903 | 0 | } |
904 | | |
905 | | // Get the Leader. |
906 | 0 | internal::RemoteTabletServer *rts; |
907 | 0 | set<string> blacklist; |
908 | 0 | vector<internal::RemoteTabletServer*> candidates; |
909 | 0 | vector<internal::RemoteTabletServer*> tservers; |
910 | 0 | ASSERT_OK(client_->data_->GetTabletServer(client_.get(), rt, |
911 | 0 | YBClient::LEADER_ONLY, |
912 | 0 | blacklist, &candidates, &rts)); |
913 | 0 | tservers.push_back(rts); |
914 | | // Blacklist the leader, should not work. |
915 | 0 | blacklist.insert(rts->permanent_uuid()); |
916 | 0 | { |
917 | 0 | Status s = client_->data_->GetTabletServer(client_.get(), rt, |
918 | 0 | YBClient::LEADER_ONLY, |
919 | 0 | blacklist, &candidates, &rts); |
920 | 0 | ASSERT_TRUE(s.IsServiceUnavailable()); |
921 | 0 | } |
922 | | // Keep blacklisting replicas until we run out. |
923 | 0 | ASSERT_OK(client_->data_->GetTabletServer(client_.get(), rt, |
924 | 0 | YBClient::CLOSEST_REPLICA, |
925 | 0 | blacklist, &candidates, &rts)); |
926 | 0 | tservers.push_back(rts); |
927 | 0 | blacklist.insert(rts->permanent_uuid()); |
928 | 0 | ASSERT_OK(client_->data_->GetTabletServer(client_.get(), rt, |
929 | 0 | YBClient::FIRST_REPLICA, |
930 | 0 | blacklist, &candidates, &rts)); |
931 | 0 | tservers.push_back(rts); |
932 | 0 | blacklist.insert(rts->permanent_uuid()); |
933 | | |
934 | | // Make sure none of the three modes work when all nodes are blacklisted. |
935 | 0 | vector<YBClient::ReplicaSelection> selections; |
936 | 0 | selections.push_back(YBClient::LEADER_ONLY); |
937 | 0 | selections.push_back(YBClient::CLOSEST_REPLICA); |
938 | 0 | selections.push_back(YBClient::FIRST_REPLICA); |
939 | 0 | for (YBClient::ReplicaSelection selection : selections) { |
940 | 0 | Status s = client_->data_->GetTabletServer(client_.get(), rt, selection, |
941 | 0 | blacklist, &candidates, &rts); |
942 | 0 | ASSERT_TRUE(s.IsServiceUnavailable()); |
943 | 0 | } |
944 | | |
945 | | // Make sure none of the modes work when all nodes are dead. |
946 | 0 | for (internal::RemoteTabletServer* rt : tservers) { |
947 | 0 | client_->data_->meta_cache_->MarkTSFailed(rt, STATUS(NetworkError, "test")); |
948 | 0 | } |
949 | 0 | blacklist.clear(); |
950 | 0 | for (YBClient::ReplicaSelection selection : selections) { |
951 | 0 | Status s = client_->data_->GetTabletServer(client_.get(), rt, |
952 | 0 | selection, |
953 | 0 | blacklist, &candidates, &rts); |
954 | 0 | ASSERT_TRUE(s.IsServiceUnavailable()); |
955 | 0 | } |
956 | 0 | } |
957 | | |
958 | | TEST_F(ClientTest, TestScanWithEncodedRangePredicate) { |
959 | | TableHandle table; |
960 | | ASSERT_NO_FATALS(CreateTable(YBTableName(YQL_DATABASE_CQL, "split-table"), |
961 | | kNumTablets, |
962 | | &table)); |
963 | | |
964 | | ASSERT_NO_FATALS(InsertTestRows(table, 100)); |
965 | | |
966 | | TableRange all_range(table, {}); |
967 | | auto all_rows = ScanToStrings(all_range); |
968 | | ASSERT_EQ(100, all_rows.size()); |
969 | | |
970 | | // Test a double-sided range within first tablet |
971 | | { |
972 | | TableIteratorOptions options; |
973 | | options.filter = FilterBetween(5, Inclusive::kTrue, 8, Inclusive::kFalse); |
974 | | auto rows = ScanToStrings(TableRange(table, options)); |
975 | | ASSERT_EQ(8 - 5, rows.size()); |
976 | | EXPECT_EQ(all_rows[5], rows.front()); |
977 | | EXPECT_EQ(all_rows[7], rows.back()); |
978 | | } |
979 | | |
980 | | // Test a double-sided range spanning tablets |
981 | | { |
982 | | TableIteratorOptions options; |
983 | | options.filter = FilterBetween(5, Inclusive::kTrue, 15, Inclusive::kFalse); |
984 | | auto rows = ScanToStrings(TableRange(table, options)); |
985 | | ASSERT_EQ(15 - 5, rows.size()); |
986 | | EXPECT_EQ(all_rows[5], rows.front()); |
987 | | EXPECT_EQ(all_rows[14], rows.back()); |
988 | | } |
989 | | |
990 | | // Test a double-sided range within second tablet |
991 | | { |
992 | | TableIteratorOptions options; |
993 | | options.filter = FilterBetween(15, Inclusive::kTrue, 20, Inclusive::kFalse); |
994 | | auto rows = ScanToStrings(TableRange(table, options)); |
995 | | ASSERT_EQ(20 - 15, rows.size()); |
996 | | EXPECT_EQ(all_rows[15], rows.front()); |
997 | | EXPECT_EQ(all_rows[19], rows.back()); |
998 | | } |
999 | | |
1000 | | // Test a lower-bound only range. |
1001 | | { |
1002 | | TableIteratorOptions options; |
1003 | | options.filter = FilterGreater(5, Inclusive::kTrue); |
1004 | | auto rows = ScanToStrings(TableRange(table, options)); |
1005 | | ASSERT_EQ(95, rows.size()); |
1006 | | EXPECT_EQ(all_rows[5], rows.front()); |
1007 | | EXPECT_EQ(all_rows[99], rows.back()); |
1008 | | } |
1009 | | |
1010 | | // Test an upper-bound only range in first tablet. |
1011 | | { |
1012 | | TableIteratorOptions options; |
1013 | | options.filter = FilterLess(5, Inclusive::kFalse); |
1014 | | auto rows = ScanToStrings(TableRange(table, options)); |
1015 | | ASSERT_EQ(5, rows.size()); |
1016 | | EXPECT_EQ(all_rows[0], rows.front()); |
1017 | | EXPECT_EQ(all_rows[4], rows.back()); |
1018 | | } |
1019 | | |
1020 | | // Test an upper-bound only range in second tablet. |
1021 | | { |
1022 | | TableIteratorOptions options; |
1023 | | options.filter = FilterLess(15, Inclusive::kFalse); |
1024 | | auto rows = ScanToStrings(TableRange(table, options)); |
1025 | | ASSERT_EQ(15, rows.size()); |
1026 | | EXPECT_EQ(all_rows[0], rows.front()); |
1027 | | EXPECT_EQ(all_rows[14], rows.back()); |
1028 | | } |
1029 | | } |
1030 | | |
1031 | 0 | static YBError* GetSingleErrorFromFlushStatus(const FlushStatus& flush_status) { |
1032 | 0 | CHECK_EQ(1, flush_status.errors.size()); |
1033 | 0 | return flush_status.errors.front().get(); |
1034 | 0 | } |
1035 | | |
1036 | | // Simplest case of inserting through the client API: a single row |
1037 | | // with manual batching. |
1038 | | // TODO Actually we need to check that hash columns present during insert. But it is not done yet. |
1039 | 0 | TEST_F(ClientTest, DISABLED_TestInsertSingleRowManualBatch) { |
1040 | 0 | auto session = CreateSession(); |
1041 | 0 | ASSERT_FALSE(session->TEST_HasPendingOperations()); |
1042 | |
|
1043 | 0 | auto insert = client_table_.NewInsertOp(); |
1044 | | // Try inserting without specifying a key: should fail. |
1045 | 0 | client_table_.AddInt32ColumnValue(insert->mutable_request(), "int_val", 54321); |
1046 | 0 | client_table_.AddStringColumnValue(insert->mutable_request(), "string_val", "hello world"); |
1047 | 0 | ASSERT_OK(session->ApplyAndFlush(insert)); |
1048 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_RUNTIME_ERROR, insert->response().status()); |
1049 | | |
1050 | | // Retry |
1051 | 0 | QLAddInt32HashValue(insert->mutable_request(), 12345); |
1052 | 0 | session->Apply(insert); |
1053 | 0 | ASSERT_TRUE(session->TEST_HasPendingOperations()) << "Should be pending until we Flush"; |
1054 | | |
1055 | 0 | FlushSessionOrDie(session, { insert }); |
1056 | 0 | } |
1057 | | |
1058 | | namespace { |
1059 | | |
1060 | | void ApplyInsertToSession(YBSession* session, |
1061 | | const TableHandle& table, |
1062 | | int row_key, |
1063 | | int int_val, |
1064 | | const char* string_val, |
1065 | 0 | std::shared_ptr<YBqlOp>* op = nullptr) { |
1066 | 0 | auto insert = table.NewInsertOp(); |
1067 | 0 | QLAddInt32HashValue(insert->mutable_request(), row_key); |
1068 | 0 | table.AddInt32ColumnValue(insert->mutable_request(), "int_val", int_val); |
1069 | 0 | table.AddStringColumnValue(insert->mutable_request(), "string_val", string_val); |
1070 | 0 | if (op) { |
1071 | 0 | *op = insert; |
1072 | 0 | } |
1073 | 0 | session->Apply(insert); |
1074 | 0 | } |
1075 | | |
1076 | | void ApplyUpdateToSession(YBSession* session, |
1077 | | const TableHandle& table, |
1078 | | int row_key, |
1079 | 0 | int int_val) { |
1080 | 0 | auto update = table.NewUpdateOp(); |
1081 | 0 | QLAddInt32HashValue(update->mutable_request(), row_key); |
1082 | 0 | table.AddInt32ColumnValue(update->mutable_request(), "int_val", int_val); |
1083 | 0 | session->Apply(update); |
1084 | 0 | } |
1085 | | |
1086 | | void ApplyDeleteToSession(YBSession* session, |
1087 | | const TableHandle& table, |
1088 | 0 | int row_key) { |
1089 | 0 | auto del = table.NewDeleteOp(); |
1090 | 0 | QLAddInt32HashValue(del->mutable_request(), row_key); |
1091 | 0 | session->Apply(del); |
1092 | 0 | } |
1093 | | |
1094 | | } // namespace |
1095 | | |
1096 | 0 | TEST_F(ClientTest, TestWriteTimeout) { |
1097 | 0 | auto session = CreateSession(); |
1098 | |
|
1099 | 0 | LOG(INFO) << "Time out the lookup on the master side"; |
1100 | 0 | { |
1101 | 0 | google::FlagSaver saver; |
1102 | 0 | FLAGS_master_inject_latency_on_tablet_lookups_ms = 110; |
1103 | 0 | session->SetTimeout(100ms); |
1104 | 0 | ApplyInsertToSession(session.get(), client_table_, 1, 1, "row"); |
1105 | 0 | const auto flush_status = session->FlushAndGetOpsErrors(); |
1106 | 0 | ASSERT_TRUE(flush_status.status.IsIOError()) |
1107 | 0 | << "unexpected status: " << flush_status.status.ToString(); |
1108 | 0 | auto error = GetSingleErrorFromFlushStatus(flush_status); |
1109 | 0 | ASSERT_TRUE(error->status().IsTimedOut()) << error->status().ToString(); |
1110 | 0 | ASSERT_TRUE(std::regex_match( |
1111 | 0 | error->status().ToString(), |
1112 | 0 | std::regex(".*GetTableLocations \\{.*\\} failed: RPC timed out after deadline expired.*"))) |
1113 | 0 | << error->status().ToString(); |
1114 | 0 | } |
1115 | | |
1116 | 0 | LOG(INFO) << "Time out the actual write on the tablet server"; |
1117 | 0 | { |
1118 | 0 | google::FlagSaver saver; |
1119 | 0 | SetAtomicFlag(true, &FLAGS_log_inject_latency); |
1120 | 0 | SetAtomicFlag(110, &FLAGS_log_inject_latency_ms_mean); |
1121 | 0 | SetAtomicFlag(0, &FLAGS_log_inject_latency_ms_stddev); |
1122 | |
|
1123 | 0 | ApplyInsertToSession(session.get(), client_table_, 1, 1, "row"); |
1124 | 0 | const auto flush_status = session->FlushAndGetOpsErrors(); |
1125 | 0 | ASSERT_TRUE(flush_status.status.IsIOError()) << AsString(flush_status.status.ToString()); |
1126 | 0 | auto error = GetSingleErrorFromFlushStatus(flush_status); |
1127 | 0 | ASSERT_TRUE(error->status().IsTimedOut()) << error->status().ToString(); |
1128 | 0 | } |
1129 | 0 | } |
1130 | | |
1131 | | // Test which does an async flush and then drops the reference |
1132 | | // to the Session. This should still call the callback. |
1133 | 0 | TEST_F(ClientTest, TestAsyncFlushResponseAfterSessionDropped) { |
1134 | 0 | auto session = CreateSession(); |
1135 | 0 | ApplyInsertToSession(session.get(), client_table_, 1, 1, "row"); |
1136 | 0 | auto flush_future = session->FlushFuture(); |
1137 | 0 | session.reset(); |
1138 | 0 | ASSERT_OK(flush_future.get().status); |
1139 | | |
1140 | | // Try again, this time should not have an error response (to re-insert the same row). |
1141 | 0 | session = CreateSession(); |
1142 | 0 | ApplyInsertToSession(session.get(), client_table_, 1, 1, "row"); |
1143 | 0 | ASSERT_EQ(1, session->TEST_CountBufferedOperations()); |
1144 | 0 | ASSERT_TRUE(session->HasNotFlushedOperations()); |
1145 | 0 | flush_future = session->FlushFuture(); |
1146 | 0 | ASSERT_EQ(0, session->TEST_CountBufferedOperations()); |
1147 | 0 | ASSERT_FALSE(session->HasNotFlushedOperations()); |
1148 | 0 | session.reset(); |
1149 | 0 | ASSERT_OK(flush_future.get().status); |
1150 | 0 | } |
1151 | | |
1152 | 0 | TEST_F(ClientTest, TestSessionClose) { |
1153 | 0 | auto session = CreateSession(); |
1154 | 0 | ApplyInsertToSession(session.get(), client_table_, 1, 1, "row"); |
1155 | | // Closing the session now should return Status::IllegalState since we |
1156 | | // have a pending operation. |
1157 | 0 | ASSERT_TRUE(session->Close().IsIllegalState()); |
1158 | |
|
1159 | 0 | ASSERT_OK(session->Flush()); |
1160 | |
|
1161 | 0 | ASSERT_OK(session->Close()); |
1162 | 0 | } |
1163 | | |
1164 | | // Test which sends multiple batches through the same session, each of which |
1165 | | // contains multiple rows spread across multiple tablets. |
1166 | 0 | TEST_F(ClientTest, TestMultipleMultiRowManualBatches) { |
1167 | 0 | auto session = CreateSession(); |
1168 | |
|
1169 | 0 | const int kNumBatches = 5; |
1170 | 0 | const int kRowsPerBatch = 10; |
1171 | |
|
1172 | 0 | int row_key = 0; |
1173 | |
|
1174 | 0 | for (int batch_num = 0; batch_num < kNumBatches; batch_num++) { |
1175 | 0 | for (int i = 0; i < kRowsPerBatch; i++) { |
1176 | 0 | ApplyInsertToSession( |
1177 | 0 | session.get(), |
1178 | 0 | (row_key % 2 == 0) ? client_table_ : client_table2_, |
1179 | 0 | row_key, row_key * 10, "hello world"); |
1180 | 0 | row_key++; |
1181 | 0 | } |
1182 | 0 | ASSERT_TRUE(session->TEST_HasPendingOperations()) << "Should be pending until we Flush"; |
1183 | 0 | FlushSessionOrDie(session); |
1184 | 0 | ASSERT_FALSE(session->TEST_HasPendingOperations()) |
1185 | 0 | << "Should have no more pending ops after flush"; |
1186 | 0 | } |
1187 | |
|
1188 | 0 | const int kNumRowsPerTablet = kNumBatches * kRowsPerBatch / 2; |
1189 | 0 | ASSERT_EQ(kNumRowsPerTablet, CountRowsFromClient(client_table_)); |
1190 | 0 | ASSERT_EQ(kNumRowsPerTablet, CountRowsFromClient(client_table2_)); |
1191 | | |
1192 | | // Verify the data looks right. |
1193 | 0 | auto rows = ScanTableToStrings(client_table_); |
1194 | 0 | std::sort(rows.begin(), rows.end()); |
1195 | 0 | ASSERT_EQ(kNumRowsPerTablet, rows.size()); |
1196 | 0 | ASSERT_EQ("{ int32:0, int32:0, string:\"hello world\", null }", rows[0]); |
1197 | 0 | } |
1198 | | |
1199 | | // Test a batch where one of the inserted rows succeeds and duplicates succeed too. |
1200 | | TEST_F(ClientTest, TestBatchWithDuplicates) { |
1201 | | auto session = CreateSession(); |
1202 | | |
1203 | | // Insert a row with key "1" |
1204 | | ApplyInsertToSession(session.get(), client_table_, 1, 1, "original row"); |
1205 | | FlushSessionOrDie(session); |
1206 | | |
1207 | | // Now make a batch that has key "1" along with |
1208 | | // key "2" which will succeed. Flushing should not return an error. |
1209 | | ApplyInsertToSession(session.get(), client_table_, 1, 1, "Attempted dup"); |
1210 | | ApplyInsertToSession(session.get(), client_table_, 2, 1, "Should succeed"); |
1211 | | Status s = session->Flush(); |
1212 | | ASSERT_TRUE(s.ok()); |
1213 | | |
1214 | | // Verify that the other row was successfully inserted |
1215 | | auto rows = ScanTableToStrings(client_table_); |
1216 | | ASSERT_EQ(2, rows.size()); |
1217 | | std::sort(rows.begin(), rows.end()); |
1218 | | ASSERT_EQ("{ int32:1, int32:1, string:\"Attempted dup\", null }", rows[0]); |
1219 | | ASSERT_EQ("{ int32:2, int32:1, string:\"Should succeed\", null }", rows[1]); |
1220 | | } |
1221 | | |
1222 | | // Test flushing an empty batch (should be a no-op). |
1223 | 0 | TEST_F(ClientTest, TestEmptyBatch) { |
1224 | 0 | auto session = CreateSession(); |
1225 | 0 | FlushSessionOrDie(session); |
1226 | 0 | } |
1227 | | |
1228 | 0 | void ClientTest::DoTestWriteWithDeadServer(WhichServerToKill which) { |
1229 | 0 | DontVerifyClusterBeforeNextTearDown(); |
1230 | 0 | auto session = CreateSession(); |
1231 | 0 | session->SetTimeout(1s); |
1232 | | |
1233 | | // Shut down the server. |
1234 | 0 | switch (which) { |
1235 | 0 | case DEAD_MASTER: |
1236 | 0 | cluster_->mini_master()->Shutdown(); |
1237 | 0 | break; |
1238 | 0 | case DEAD_TSERVER: |
1239 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) { |
1240 | 0 | cluster_->mini_tablet_server(i)->Shutdown(); |
1241 | 0 | } |
1242 | 0 | break; |
1243 | 0 | } |
1244 | | |
1245 | | // Try a write. |
1246 | 0 | ApplyInsertToSession(session.get(), client_table_, 1, 1, "x"); |
1247 | 0 | const auto flush_status = session->FlushAndGetOpsErrors(); |
1248 | 0 | ASSERT_TRUE(flush_status.status.IsIOError()) << flush_status.status.ToString(); |
1249 | | |
1250 | 0 | auto error = GetSingleErrorFromFlushStatus(flush_status); |
1251 | 0 | switch (which) { |
1252 | 0 | case DEAD_MASTER: |
1253 | | // Only one master, so no retry for finding the new leader master. |
1254 | 0 | ASSERT_TRUE(error->status().IsTimedOut()); |
1255 | 0 | break; |
1256 | 0 | case DEAD_TSERVER: |
1257 | 0 | ASSERT_TRUE(error->status().IsTimedOut()); |
1258 | 0 | auto pos = error->status().ToString().find("Connection refused"); |
1259 | 0 | if (pos == std::string::npos) { |
1260 | 0 | pos = error->status().ToString().find("Broken pipe"); |
1261 | 0 | } |
1262 | 0 | ASSERT_NE(std::string::npos, pos); |
1263 | 0 | break; |
1264 | 0 | } |
1265 | | |
1266 | 0 | ASSERT_STR_CONTAINS(error->failed_op().ToString(), "QL_WRITE"); |
1267 | 0 | } |
1268 | | |
1269 | | // Test error handling cases where the master is down (tablet resolution fails) |
1270 | 0 | TEST_F(ClientTest, TestWriteWithDeadMaster) { |
1271 | 0 | client_->data_->default_admin_operation_timeout_ = MonoDelta::FromSeconds(1); |
1272 | 0 | DoTestWriteWithDeadServer(DEAD_MASTER); |
1273 | 0 | } |
1274 | | |
1275 | | // Test error handling when the TS is down (actual write fails its RPC) |
1276 | 0 | TEST_F(ClientTest, TestWriteWithDeadTabletServer) { |
1277 | 0 | DoTestWriteWithDeadServer(DEAD_TSERVER); |
1278 | 0 | } |
1279 | | |
1280 | 0 | void ClientTest::DoApplyWithoutFlushTest(int sleep_micros) { |
1281 | 0 | auto session = CreateSession(); |
1282 | 0 | ApplyInsertToSession(session.get(), client_table_, 1, 1, "x"); |
1283 | 0 | SleepFor(MonoDelta::FromMicroseconds(sleep_micros)); |
1284 | 0 | session.reset(); // should not crash! |
1285 | | |
1286 | | // Should have no rows. |
1287 | 0 | auto rows = ScanTableToStrings(client_table_); |
1288 | 0 | ASSERT_EQ(0, rows.size()); |
1289 | 0 | } |
1290 | | |
1291 | | |
1292 | | // Applies some updates to the session, and then drops the reference to the |
1293 | | // Session before flushing. Makes sure that the tablet resolution callbacks |
1294 | | // properly deal with the session disappearing underneath. |
1295 | | // |
1296 | | // This test doesn't sleep between applying the operations and dropping the |
1297 | | // reference, in hopes that the reference will be dropped while DNS is still |
1298 | | // in-flight, etc. |
1299 | 0 | TEST_F(ClientTest, TestApplyToSessionWithoutFlushing_OpsInFlight) { |
1300 | 0 | DoApplyWithoutFlushTest(0); |
1301 | 0 | } |
1302 | | |
1303 | | // Same as the above, but sleeps a little bit after applying the operations, |
1304 | | // so that the operations are already in the per-TS-buffer. |
1305 | 0 | TEST_F(ClientTest, TestApplyToSessionWithoutFlushing_OpsBuffered) { |
1306 | 0 | DoApplyWithoutFlushTest(10000); |
1307 | 0 | } |
1308 | | |
1309 | | // Apply a large amount of data without calling Flush(), and ensure |
1310 | | // that we get an error on Apply() rather than sending a too-large |
1311 | | // RPC to the server. |
1312 | 0 | TEST_F(ClientTest, DISABLED_TestApplyTooMuchWithoutFlushing) { |
1313 | | // Applying a bunch of small rows without a flush should result |
1314 | | // in an error. |
1315 | 0 | { |
1316 | 0 | bool got_expected_error = false; |
1317 | 0 | auto session = CreateSession(); |
1318 | 0 | for (int i = 0; i < 1000000; i++) { |
1319 | 0 | ApplyInsertToSession(session.get(), client_table_, 1, 1, "x"); |
1320 | 0 | } |
1321 | 0 | ASSERT_TRUE(got_expected_error); |
1322 | 0 | } |
1323 | | |
1324 | | // Writing a single very large row should also result in an error. |
1325 | 0 | { |
1326 | 0 | string huge_string(10 * 1024 * 1024, 'x'); |
1327 | |
|
1328 | 0 | shared_ptr<YBSession> session = client_->NewSession(); |
1329 | 0 | ApplyInsertToSession(session.get(), client_table_, 1, 1, huge_string.c_str()); |
1330 | 0 | } |
1331 | 0 | } |
1332 | | |
1333 | | // Test that update updates and delete deletes with expected use |
1334 | | TEST_F(ClientTest, TestMutationsWork) { |
1335 | | auto session = CreateSession(); |
1336 | | ApplyInsertToSession(session.get(), client_table_, 1, 1, "original row"); |
1337 | | FlushSessionOrDie(session); |
1338 | | |
1339 | | ApplyUpdateToSession(session.get(), client_table_, 1, 2); |
1340 | | FlushSessionOrDie(session); |
1341 | | auto rows = ScanTableToStrings(client_table_); |
1342 | | ASSERT_EQ(1, rows.size()); |
1343 | | ASSERT_EQ("{ int32:1, int32:2, string:\"original row\", null }", rows[0]); |
1344 | | rows.clear(); |
1345 | | |
1346 | | ApplyDeleteToSession(session.get(), client_table_, 1); |
1347 | | FlushSessionOrDie(session); |
1348 | | ScanTableToStrings(client_table_, &rows); |
1349 | | ASSERT_EQ(0, rows.size()); |
1350 | | } |
1351 | | |
1352 | | TEST_F(ClientTest, TestMutateDeletedRow) { |
1353 | | auto session = CreateSession(); |
1354 | | ApplyInsertToSession(session.get(), client_table_, 1, 1, "original row"); |
1355 | | FlushSessionOrDie(session); |
1356 | | ApplyDeleteToSession(session.get(), client_table_, 1); |
1357 | | FlushSessionOrDie(session); |
1358 | | auto rows = ScanTableToStrings(client_table_); |
1359 | | ASSERT_EQ(0, rows.size()); |
1360 | | |
1361 | | // Attempt update deleted row |
1362 | | ApplyUpdateToSession(session.get(), client_table_, 1, 2); |
1363 | | Status s = session->Flush(); |
1364 | | ASSERT_TRUE(s.ok()); |
1365 | | ScanTableToStrings(client_table_, &rows); |
1366 | | ASSERT_EQ(1, rows.size()); |
1367 | | |
1368 | | // Attempt delete deleted row |
1369 | | ApplyDeleteToSession(session.get(), client_table_, 1); |
1370 | | s = session->Flush(); |
1371 | | ASSERT_TRUE(s.ok()); |
1372 | | ScanTableToStrings(client_table_, &rows); |
1373 | | ASSERT_EQ(0, rows.size()); |
1374 | | } |
1375 | | |
1376 | | TEST_F(ClientTest, TestMutateNonexistentRow) { |
1377 | | auto session = CreateSession(); |
1378 | | |
1379 | | // Attempt update nonexistent row |
1380 | | ApplyUpdateToSession(session.get(), client_table_, 1, 2); |
1381 | | Status s = session->Flush(); |
1382 | | ASSERT_TRUE(s.ok()); |
1383 | | auto rows = ScanTableToStrings(client_table_); |
1384 | | ASSERT_EQ(1, rows.size()); |
1385 | | |
1386 | | // Attempt delete nonexistent row |
1387 | | ApplyDeleteToSession(session.get(), client_table_, 1); |
1388 | | s = session->Flush(); |
1389 | | ASSERT_TRUE(s.ok()); |
1390 | | ScanTableToStrings(client_table_, &rows); |
1391 | | ASSERT_EQ(0, rows.size()); |
1392 | | } |
1393 | | |
1394 | | // Do a write with a bad schema on the client side. This should make the Prepare |
1395 | | // phase of the write fail, which will result in an error on the RPC response. |
1396 | 0 | TEST_F(ClientTest, TestWriteWithBadSchema) { |
1397 | | // Remove the 'int_val' column. |
1398 | | // Now the schema on the client is "old" |
1399 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
1400 | 0 | ASSERT_OK(table_alterer->DropColumn("int_val")->Alter()); |
1401 | | |
1402 | | // Try to do a write with the bad schema. |
1403 | 0 | auto session = CreateSession(); |
1404 | 0 | std::shared_ptr<YBqlOp> op; |
1405 | 0 | ApplyInsertToSession(session.get(), client_table_, 12345, 12345, "x", &op); |
1406 | 0 | ASSERT_OK(session->Flush()); |
1407 | 0 | ASSERT_EQ(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH, op->response().status()); |
1408 | 0 | } |
1409 | | |
1410 | 0 | TEST_F(ClientTest, TestBasicAlterOperations) { |
1411 | | // test that having no steps throws an error |
1412 | 0 | { |
1413 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
1414 | 0 | Status s = table_alterer->Alter(); |
1415 | 0 | ASSERT_TRUE(s.IsInvalidArgument()); |
1416 | 0 | ASSERT_STR_CONTAINS(s.ToString(), "No alter steps provided"); |
1417 | 0 | } |
1418 | | |
1419 | | // test that remove key should throws an error |
1420 | 0 | { |
1421 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
1422 | 0 | Status s = table_alterer |
1423 | 0 | ->DropColumn("key") |
1424 | 0 | ->Alter(); |
1425 | 0 | ASSERT_TRUE(s.IsInvalidArgument()); |
1426 | 0 | ASSERT_STR_CONTAINS(s.ToString(), "cannot remove a key column"); |
1427 | 0 | } |
1428 | | |
1429 | | // test that renaming to an already-existing name throws an error |
1430 | 0 | { |
1431 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
1432 | 0 | table_alterer->AlterColumn("int_val")->RenameTo("string_val"); |
1433 | 0 | Status s = table_alterer->Alter(); |
1434 | 0 | ASSERT_TRUE(s.IsAlreadyPresent()); |
1435 | 0 | ASSERT_STR_CONTAINS(s.ToString(), "The column already exists: string_val"); |
1436 | 0 | } |
1437 | | |
1438 | | // Need a tablet peer for the next set of tests. |
1439 | 0 | string tablet_id = GetFirstTabletId(client_table_.get()); |
1440 | 0 | std::shared_ptr<TabletPeer> tablet_peer; |
1441 | |
|
1442 | 0 | for (auto& ts : cluster_->mini_tablet_servers()) { |
1443 | 0 | ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(tablet_id, &tablet_peer)); |
1444 | 0 | if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) { |
1445 | 0 | break; |
1446 | 0 | } |
1447 | 0 | } |
1448 | |
|
1449 | 0 | { |
1450 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
1451 | 0 | table_alterer->DropColumn("int_val") |
1452 | 0 | ->AddColumn("new_col")->Type(INT32); |
1453 | 0 | ASSERT_OK(table_alterer->Alter()); |
1454 | | // TODO(nspiegelberg): The below assert is flakey because of KUDU-1539. |
1455 | 0 | ASSERT_EQ(1, tablet_peer->tablet()->metadata()->schema_version()); |
1456 | 0 | } |
1457 | |
|
1458 | 0 | { |
1459 | 0 | const YBTableName kRenamedTableName(YQL_DATABASE_CQL, kKeyspaceName, "RenamedTable"); |
1460 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName)); |
1461 | 0 | ASSERT_OK(table_alterer |
1462 | 0 | ->RenameTo(kRenamedTableName) |
1463 | 0 | ->Alter()); |
1464 | | // TODO(nspiegelberg): The below assert is flakey because of KUDU-1539. |
1465 | 0 | ASSERT_EQ(2, tablet_peer->tablet()->metadata()->schema_version()); |
1466 | 0 | ASSERT_EQ(kRenamedTableName.table_name(), tablet_peer->tablet()->metadata()->table_name()); |
1467 | |
|
1468 | 0 | const auto tables = ASSERT_RESULT(client_->ListTables()); |
1469 | 0 | ASSERT_TRUE(::util::gtl::contains(tables.begin(), tables.end(), kRenamedTableName)); |
1470 | 0 | ASSERT_FALSE(::util::gtl::contains(tables.begin(), tables.end(), kTableName)); |
1471 | 0 | } |
1472 | 0 | } |
1473 | | |
1474 | 0 | TEST_F(ClientTest, TestDeleteTable) { |
1475 | | // Open the table before deleting it. |
1476 | 0 | ASSERT_OK(client_table_.Open(kTableName, client_.get())); |
1477 | | |
1478 | | // Insert a few rows, and scan them back. This is to populate the MetaCache. |
1479 | 0 | ASSERT_NO_FATALS(InsertTestRows(client_table_, 10)); |
1480 | 0 | auto rows = ScanTableToStrings(client_table_); |
1481 | 0 | ASSERT_EQ(10, rows.size()); |
1482 | | |
1483 | | // Remove the table |
1484 | | // NOTE that it returns when the operation is completed on the master side |
1485 | 0 | string tablet_id = GetFirstTabletId(client_table_.get()); |
1486 | 0 | ASSERT_OK(client_->DeleteTable(kTableName)); |
1487 | 0 | const auto tables = ASSERT_RESULT(client_->ListTables()); |
1488 | 0 | ASSERT_FALSE(::util::gtl::contains(tables.begin(), tables.end(), kTableName)); |
1489 | | |
1490 | | // Wait until the table is removed from the TS |
1491 | 0 | int wait_time = 1000; |
1492 | 0 | bool tablet_found = true; |
1493 | 0 | for (int i = 0; i < 80 && tablet_found; ++i) { |
1494 | 0 | std::shared_ptr<TabletPeer> tablet_peer; |
1495 | 0 | tablet_found = cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet( |
1496 | 0 | tablet_id, &tablet_peer); |
1497 | 0 | SleepFor(MonoDelta::FromMicroseconds(wait_time)); |
1498 | 0 | wait_time = std::min(wait_time * 5 / 4, 1000000); |
1499 | 0 | } |
1500 | 0 | ASSERT_FALSE(tablet_found); |
1501 | | |
1502 | | // Try to open the deleted table |
1503 | 0 | Status s = client_table_.Open(kTableName, client_.get()); |
1504 | 0 | ASSERT_TRUE(TableNotFound(s)) << s; |
1505 | | |
1506 | | // Create a new table with the same name. This is to ensure that the client |
1507 | | // doesn't cache anything inappropriately by table name (see KUDU-1055). |
1508 | 0 | ASSERT_NO_FATALS(CreateTable(kTableName, kNumTablets, &client_table_)); |
1509 | | |
1510 | | // Should be able to insert successfully into the new table. |
1511 | 0 | ASSERT_NO_FATALS(InsertTestRows(client_table_, 10)); |
1512 | 0 | } |
1513 | | |
1514 | 0 | TEST_F(ClientTest, TestGetTableSchema) { |
1515 | 0 | YBSchema schema; |
1516 | 0 | PartitionSchema partition_schema; |
1517 | | |
1518 | | // Verify the schema for the current table |
1519 | 0 | ASSERT_OK(client_->GetTableSchema(kTableName, &schema, &partition_schema)); |
1520 | 0 | ASSERT_TRUE(schema_.Equals(schema)); |
1521 | | |
1522 | | // Verify that a get schema request for a missing table throws not found |
1523 | 0 | Status s = client_->GetTableSchema( |
1524 | 0 | YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "MissingTableName"), &schema, &partition_schema); |
1525 | 0 | ASSERT_TRUE(TableNotFound(s)) << s; |
1526 | 0 | } |
1527 | | |
1528 | 0 | TEST_F(ClientTest, TestGetTableSchemaByIdAsync) { |
1529 | 0 | Synchronizer sync; |
1530 | 0 | auto table_info = std::make_shared<YBTableInfo>(); |
1531 | 0 | ASSERT_OK(client_->GetTableSchemaById( |
1532 | 0 | client_table_.table()->id(), table_info, sync.AsStatusCallback())); |
1533 | 0 | ASSERT_OK(sync.Wait()); |
1534 | 0 | ASSERT_TRUE(schema_.Equals(table_info->schema)); |
1535 | 0 | } |
1536 | | |
1537 | 0 | TEST_F(ClientTest, TestGetTableSchemaByIdMissingTable) { |
1538 | | // Verify that a get schema request for a missing table throws not found. |
1539 | 0 | Synchronizer sync; |
1540 | 0 | auto table_info = std::make_shared<YBTableInfo>(); |
1541 | 0 | ASSERT_OK(client_->GetTableSchemaById("MissingTableId", table_info, sync.AsStatusCallback())); |
1542 | 0 | Status s = sync.Wait(); |
1543 | 0 | ASSERT_TRUE(TableNotFound(s)) << s; |
1544 | 0 | } |
1545 | | |
1546 | 0 | TEST_F(ClientTest, TestCreateCDCStreamAsync) { |
1547 | 0 | std::promise<Result<CDCStreamId>> promise; |
1548 | 0 | std::unordered_map<std::string, std::string> options; |
1549 | 0 | client_->CreateCDCStream( |
1550 | 0 | client_table_.table()->id(), options, |
1551 | 0 | [&promise](const auto& stream) { promise.set_value(stream); }); |
1552 | 0 | auto stream = promise.get_future().get(); |
1553 | 0 | ASSERT_OK(stream); |
1554 | 0 | ASSERT_FALSE(stream->empty()); |
1555 | 0 | } |
1556 | | |
1557 | | TEST_F(ClientTest, TestCreateCDCStreamMissingTable) { |
1558 | | std::promise<Result<CDCStreamId>> promise; |
1559 | | std::unordered_map<std::string, std::string> options; |
1560 | | client_->CreateCDCStream( |
1561 | | "MissingTableId", options, |
1562 | 0 | [&promise](const auto& stream) { promise.set_value(stream); }); |
1563 | | auto stream = promise.get_future().get(); |
1564 | | ASSERT_NOK(stream); |
1565 | | ASSERT_TRUE(TableNotFound(stream.status())) << stream.status(); |
1566 | | } |
1567 | | |
1568 | 0 | TEST_F(ClientTest, TestDeleteCDCStreamAsync) { |
1569 | 0 | std::unordered_map<std::string, std::string> options; |
1570 | 0 | auto result = client_->CreateCDCStream(client_table_.table()->id(), options); |
1571 | 0 | ASSERT_TRUE(result.ok()); |
1572 | | |
1573 | | // Delete the created CDC stream. |
1574 | 0 | Synchronizer sync; |
1575 | 0 | client_->DeleteCDCStream(*result, sync.AsStatusCallback()); |
1576 | 0 | ASSERT_OK(sync.Wait()); |
1577 | 0 | } |
1578 | | |
1579 | 0 | TEST_F(ClientTest, TestDeleteCDCStreamMissingId) { |
1580 | | // Try to delete a non-existent CDC stream. |
1581 | 0 | Synchronizer sync; |
1582 | 0 | client_->DeleteCDCStream("MissingStreamId", sync.AsStatusCallback()); |
1583 | 0 | Status s = sync.Wait(); |
1584 | 0 | ASSERT_TRUE(TableNotFound(s)) << s; |
1585 | 0 | } |
1586 | | |
1587 | 0 | TEST_F(ClientTest, TestStaleLocations) { |
1588 | 0 | string tablet_id = GetFirstTabletId(client_table2_.get()); |
1589 | | |
1590 | | // The Tablet is up and running the location should not be stale |
1591 | 0 | master::TabletLocationsPB locs_pb; |
1592 | 0 | ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTabletLocations( |
1593 | 0 | tablet_id, &locs_pb)); |
1594 | 0 | ASSERT_FALSE(locs_pb.stale()); |
1595 | | |
1596 | | // On Master restart and no tablet report we expect the locations to be stale |
1597 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) { |
1598 | 0 | cluster_->mini_tablet_server(i)->Shutdown(); |
1599 | 0 | } |
1600 | 0 | ASSERT_OK(cluster_->mini_master()->Restart()); |
1601 | 0 | ASSERT_OK(cluster_->mini_master()->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests()); |
1602 | 0 | locs_pb.Clear(); |
1603 | 0 | ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTabletLocations(tablet_id, &locs_pb)); |
1604 | 0 | ASSERT_TRUE(locs_pb.stale()); |
1605 | | |
1606 | | // Restart the TS and Wait for the tablets to be reported to the master. |
1607 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) { |
1608 | 0 | ASSERT_OK(cluster_->mini_tablet_server(i)->Start()); |
1609 | 0 | } |
1610 | 0 | ASSERT_OK(cluster_->WaitForTabletServerCount(cluster_->num_tablet_servers())); |
1611 | 0 | locs_pb.Clear(); |
1612 | 0 | ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTabletLocations(tablet_id, &locs_pb)); |
1613 | | |
1614 | | // It may take a while to bootstrap the tablet and send the location report |
1615 | | // so spin until we get a non-stale location. |
1616 | 0 | int wait_time = 1000; |
1617 | 0 | for (int i = 0; i < 80; ++i) { |
1618 | 0 | locs_pb.Clear(); |
1619 | 0 | ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTabletLocations(tablet_id, &locs_pb)); |
1620 | 0 | if (!locs_pb.stale()) { |
1621 | 0 | break; |
1622 | 0 | } |
1623 | 0 | SleepFor(MonoDelta::FromMicroseconds(wait_time)); |
1624 | 0 | wait_time = std::min(wait_time * 5 / 4, 1000000); |
1625 | 0 | } |
1626 | 0 | ASSERT_FALSE(locs_pb.stale()); |
1627 | 0 | } |
1628 | | |
1629 | | // Test creating and accessing a table which has multiple tablets, |
1630 | | // each of which is replicated. |
1631 | | // |
1632 | | // TODO: this should probably be the default for _all_ of the tests |
1633 | | // in this file. However, some things like alter table are not yet |
1634 | | // working on replicated tables - see KUDU-304 |
1635 | 0 | TEST_F(ClientTest, TestReplicatedMultiTabletTable) { |
1636 | 0 | const YBTableName kReplicatedTable(YQL_DATABASE_CQL, "replicated"); |
1637 | 0 | const int kNumRowsToWrite = 100; |
1638 | |
|
1639 | 0 | TableHandle table; |
1640 | 0 | ASSERT_NO_FATALS(CreateTable(kReplicatedTable, |
1641 | 0 | kNumTablets, |
1642 | 0 | &table)); |
1643 | | |
1644 | | // Should have no rows to begin with. |
1645 | 0 | ASSERT_EQ(0, CountRowsFromClient(table)); |
1646 | | |
1647 | | // Insert some data. |
1648 | 0 | ASSERT_NO_FATALS(InsertTestRows(table, kNumRowsToWrite)); |
1649 | | |
1650 | | // Should now see the data. |
1651 | 0 | ASSERT_EQ(kNumRowsToWrite, CountRowsFromClient(table)); |
1652 | | |
1653 | | // TODO: once leader re-election is in, should somehow force a re-election |
1654 | | // and ensure that the client handles refreshing the leader. |
1655 | 0 | } |
1656 | | |
1657 | 0 | TEST_F(ClientTest, TestReplicatedMultiTabletTableFailover) { |
1658 | 0 | const YBTableName kReplicatedTable(YQL_DATABASE_CQL, "replicated_failover_on_reads"); |
1659 | 0 | const int kNumRowsToWrite = 100; |
1660 | 0 | const int kNumTries = 100; |
1661 | |
|
1662 | 0 | TableHandle table; |
1663 | 0 | ASSERT_NO_FATALS(CreateTable(kReplicatedTable, |
1664 | 0 | kNumTablets, |
1665 | 0 | &table)); |
1666 | | |
1667 | | // Insert some data. |
1668 | 0 | ASSERT_NO_FATALS(InsertTestRows(table, kNumRowsToWrite)); |
1669 | | |
1670 | | // Find the leader of the first tablet. |
1671 | 0 | auto remote_tablet = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), table.table()).get()); |
1672 | 0 | internal::RemoteTabletServer *remote_tablet_server = remote_tablet->LeaderTServer(); |
1673 | | |
1674 | | // Kill the leader of the first tablet. |
1675 | 0 | ASSERT_OK(KillTServer(remote_tablet_server->permanent_uuid())); |
1676 | | |
1677 | | // We wait until we fail over to the new leader(s). |
1678 | 0 | int tries = 0; |
1679 | 0 | for (;;) { |
1680 | 0 | tries++; |
1681 | 0 | auto num_rows = CountRowsFromClient(table); |
1682 | 0 | if (num_rows == kNumRowsToWrite) { |
1683 | 0 | LOG(INFO) << "Found expected number of rows: " << num_rows; |
1684 | 0 | break; |
1685 | 0 | } else { |
1686 | 0 | LOG(INFO) << "Only found " << num_rows << " rows on try " |
1687 | 0 | << tries << ", retrying"; |
1688 | 0 | ASSERT_LE(tries, kNumTries); |
1689 | 0 | SleepFor(MonoDelta::FromMilliseconds(10 * tries)); // sleep a bit more with each attempt. |
1690 | 0 | } |
1691 | 0 | } |
1692 | 0 | } |
1693 | | |
1694 | | // This test that we can keep writing to a tablet when the leader |
1695 | | // tablet dies. |
1696 | | // This currently forces leader promotion through RPC and creates |
1697 | | // a new client afterwards. |
1698 | | // TODO Remove the leader promotion part when we have automated |
1699 | | // leader election. |
1700 | 0 | TEST_F(ClientTest, TestReplicatedTabletWritesAndAltersWithLeaderElection) { |
1701 | 0 | const YBTableName kReplicatedTable(YQL_DATABASE_CQL, kKeyspaceName, |
1702 | 0 | "replicated_failover_on_writes"); |
1703 | 0 | const int kNumRowsToWrite = 100; |
1704 | |
|
1705 | 0 | TableHandle table; |
1706 | 0 | ASSERT_NO_FATALS(CreateTable(kReplicatedTable, |
1707 | 0 | 1, |
1708 | 0 | &table)); |
1709 | | |
1710 | | // Insert some data. |
1711 | 0 | ASSERT_NO_FATALS(InsertTestRows(table, kNumRowsToWrite)); |
1712 | | |
1713 | | // TODO: we have to sleep here to make sure that the leader has time to |
1714 | | // propagate the writes to the followers. We can remove this once the |
1715 | | // followers run a leader election on their own and handle advancing |
1716 | | // the commit index. |
1717 | 0 | SleepFor(MonoDelta::FromMilliseconds(1500)); |
1718 | | |
1719 | | // Find the leader replica |
1720 | 0 | auto remote_tablet = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), table.table()).get()); |
1721 | 0 | internal::RemoteTabletServer *remote_tablet_server; |
1722 | 0 | set<string> blacklist; |
1723 | 0 | vector<internal::RemoteTabletServer*> candidates; |
1724 | 0 | ASSERT_OK(client_->data_->GetTabletServer(client_.get(), |
1725 | 0 | remote_tablet, |
1726 | 0 | YBClient::LEADER_ONLY, |
1727 | 0 | blacklist, |
1728 | 0 | &candidates, |
1729 | 0 | &remote_tablet_server)); |
1730 | |
|
1731 | 0 | string killed_uuid = remote_tablet_server->permanent_uuid(); |
1732 | | // Kill the tserver that is serving the leader tablet. |
1733 | 0 | ASSERT_OK(KillTServer(killed_uuid)); |
1734 | | |
1735 | | // Since we waited before, hopefully all replicas will be up to date |
1736 | | // and we can just promote another replica. |
1737 | 0 | auto client_messenger = rpc::CreateAutoShutdownMessengerHolder( |
1738 | 0 | ASSERT_RESULT(CreateMessenger("client"))); |
1739 | 0 | ssize_t new_leader_idx = -1; |
1740 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
1741 | 0 | MiniTabletServer* ts = cluster_->mini_tablet_server(i); |
1742 | 0 | LOG(INFO) << "GOT TS " << i << " WITH UUID ???"; |
1743 | 0 | if (ts->is_started()) { |
1744 | 0 | const string& uuid = ts->server()->instance_pb().permanent_uuid(); |
1745 | 0 | LOG(INFO) << uuid; |
1746 | 0 | if (uuid != killed_uuid) { |
1747 | 0 | new_leader_idx = i; |
1748 | 0 | break; |
1749 | 0 | } |
1750 | 0 | } |
1751 | 0 | } |
1752 | 0 | ASSERT_NE(-1, new_leader_idx); |
1753 | |
|
1754 | 0 | MiniTabletServer* new_leader = cluster_->mini_tablet_server(new_leader_idx); |
1755 | 0 | ASSERT_TRUE(new_leader != nullptr); |
1756 | 0 | rpc::ProxyCache proxy_cache(client_messenger.get()); |
1757 | 0 | consensus::ConsensusServiceProxy new_leader_proxy( |
1758 | 0 | &proxy_cache, HostPort::FromBoundEndpoint(new_leader->bound_rpc_addr())); |
1759 | |
|
1760 | 0 | consensus::RunLeaderElectionRequestPB req; |
1761 | 0 | consensus::RunLeaderElectionResponsePB resp; |
1762 | 0 | rpc::RpcController controller; |
1763 | |
|
1764 | 0 | LOG(INFO) << "Promoting server at index " << new_leader_idx << " listening at " |
1765 | 0 | << new_leader->bound_rpc_addr() << " ..."; |
1766 | 0 | req.set_dest_uuid(new_leader->server()->fs_manager()->uuid()); |
1767 | 0 | req.set_tablet_id(remote_tablet->tablet_id()); |
1768 | 0 | ASSERT_OK(new_leader_proxy.RunLeaderElection(req, &resp, &controller)); |
1769 | 0 | ASSERT_FALSE(resp.has_error()) << "Got error. Response: " << resp.ShortDebugString(); |
1770 | | |
1771 | 0 | LOG(INFO) << "Inserting additional rows..."; |
1772 | 0 | ASSERT_NO_FATALS(InsertTestRows(table, |
1773 | 0 | kNumRowsToWrite, |
1774 | 0 | kNumRowsToWrite)); |
1775 | | |
1776 | | // TODO: we have to sleep here to make sure that the leader has time to |
1777 | | // propagate the writes to the followers. We can remove this once the |
1778 | | // followers run a leader election on their own and handle advancing |
1779 | | // the commit index. |
1780 | 0 | SleepFor(MonoDelta::FromMilliseconds(1500)); |
1781 | |
|
1782 | 0 | LOG(INFO) << "Counting rows..."; |
1783 | 0 | ASSERT_EQ(2 * kNumRowsToWrite, CountRowsFromClient(table, |
1784 | 0 | YBConsistencyLevel::CONSISTENT_PREFIX, |
1785 | 0 | kNoBound, kNoBound)); |
1786 | | |
1787 | | // Test altering the table metadata and ensure that meta operations are resilient as well. |
1788 | 0 | { |
1789 | 0 | std::shared_ptr<TabletPeer> tablet_peer; |
1790 | 0 | ASSERT_TRUE(new_leader->server()->tablet_manager()->LookupTablet(remote_tablet->tablet_id(), |
1791 | 0 | &tablet_peer)); |
1792 | 0 | auto old_version = tablet_peer->tablet()->metadata()->schema_version(); |
1793 | 0 | std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kReplicatedTable)); |
1794 | 0 | table_alterer->AddColumn("new_col")->Type(INT32); |
1795 | 0 | ASSERT_OK(table_alterer->Alter()); |
1796 | 0 | ASSERT_EQ(old_version + 1, tablet_peer->tablet()->metadata()->schema_version()); |
1797 | 0 | } |
1798 | 0 | } |
1799 | | |
1800 | | namespace { |
1801 | | |
1802 | 0 | void CheckCorrectness(const TableHandle& table, int expected[], int nrows) { |
1803 | 0 | int readrows = 0; |
1804 | |
|
1805 | 0 | for (const auto& row : TableRange(table)) { |
1806 | 0 | ASSERT_LE(readrows, nrows); |
1807 | 0 | int32_t key = row.column(0).int32_value(); |
1808 | 0 | ASSERT_NE(key, -1) << "Deleted key found in table in table " << key; |
1809 | 0 | ASSERT_EQ(expected[key], row.column(1).int32_value()) |
1810 | 0 | << "Incorrect int value for key " << key; |
1811 | 0 | ASSERT_EQ(row.column(2).string_value(), "") |
1812 | 0 | << "Incorrect string value for key " << key; |
1813 | 0 | ++readrows; |
1814 | 0 | } |
1815 | 0 | ASSERT_EQ(readrows, nrows); |
1816 | 0 | } |
1817 | | |
1818 | | } // anonymous namespace |
1819 | | |
1820 | | // Randomized mutations accuracy testing |
1821 | 0 | TEST_F(ClientTest, TestRandomWriteOperation) { |
1822 | 0 | auto session = CreateSession(); |
1823 | 0 | int row[FLAGS_test_scan_num_rows]; // -1 indicates empty |
1824 | 0 | int nrows; |
1825 | | |
1826 | | // First half-fill |
1827 | 0 | for (int i = 0; i < FLAGS_test_scan_num_rows/2; ++i) { |
1828 | 0 | ApplyInsertToSession(session.get(), client_table_, i, i, ""); |
1829 | 0 | row[i] = i; |
1830 | 0 | } |
1831 | 0 | for (int i = FLAGS_test_scan_num_rows/2; i < FLAGS_test_scan_num_rows; ++i) { |
1832 | 0 | row[i] = -1; |
1833 | 0 | } |
1834 | 0 | nrows = FLAGS_test_scan_num_rows/2; |
1835 | | |
1836 | | // Randomized testing |
1837 | 0 | LOG(INFO) << "Randomized mutations testing."; |
1838 | 0 | unsigned int seed = SeedRandom(); |
1839 | 0 | for (int i = 0; i <= 1000; ++i) { |
1840 | | // Test correctness every so often |
1841 | 0 | if (i % 50 == 0) { |
1842 | 0 | LOG(INFO) << "Correctness test " << i; |
1843 | 0 | FlushSessionOrDie(session); |
1844 | 0 | ASSERT_NO_FATALS(CheckCorrectness(client_table_, row, nrows)); |
1845 | 0 | LOG(INFO) << "...complete"; |
1846 | 0 | } |
1847 | |
|
1848 | 0 | int change = rand_r(&seed) % FLAGS_test_scan_num_rows; |
1849 | | // Insert if empty |
1850 | 0 | if (row[change] == -1) { |
1851 | 0 | ApplyInsertToSession(session.get(), client_table_, change, change, ""); |
1852 | 0 | row[change] = change; |
1853 | 0 | ++nrows; |
1854 | 0 | VLOG(1) << "Insert " << change; |
1855 | 0 | } else { |
1856 | | // Update or delete otherwise |
1857 | 0 | int update = rand_r(&seed) & 1; |
1858 | 0 | if (update) { |
1859 | 0 | ApplyUpdateToSession(session.get(), client_table_, change, ++row[change]); |
1860 | 0 | VLOG(1) << "Update " << change; |
1861 | 0 | } else { |
1862 | 0 | ApplyDeleteToSession(session.get(), client_table_, change); |
1863 | 0 | row[change] = -1; |
1864 | 0 | --nrows; |
1865 | 0 | VLOG(1) << "Delete " << change; |
1866 | 0 | } |
1867 | 0 | } |
1868 | 0 | } |
1869 | | |
1870 | | // And one more time for the last batch. |
1871 | 0 | FlushSessionOrDie(session); |
1872 | 0 | ASSERT_NO_FATALS(CheckCorrectness(client_table_, row, nrows)); |
1873 | 0 | } |
1874 | | |
1875 | | // Test whether a batch can handle several mutations in a batch |
1876 | | TEST_F(ClientTest, TestSeveralRowMutatesPerBatch) { |
1877 | | auto session = CreateSession(); |
1878 | | |
1879 | | // Test insert/update |
1880 | | LOG(INFO) << "Testing insert/update in same batch, key " << 1 << "."; |
1881 | | ApplyInsertToSession(session.get(), client_table_, 1, 1, ""); |
1882 | | ApplyUpdateToSession(session.get(), client_table_, 1, 2); |
1883 | | FlushSessionOrDie(session); |
1884 | | auto rows = ScanTableToStrings(client_table_); |
1885 | | ASSERT_EQ(1, rows.size()); |
1886 | | ASSERT_EQ("{ int32:1, int32:2, string:\"\", null }", rows[0]); |
1887 | | rows.clear(); |
1888 | | |
1889 | | |
1890 | | LOG(INFO) << "Testing insert/delete in same batch, key " << 2 << "."; |
1891 | | // Test insert/delete |
1892 | | ApplyInsertToSession(session.get(), client_table_, 2, 1, ""); |
1893 | | ApplyDeleteToSession(session.get(), client_table_, 2); |
1894 | | FlushSessionOrDie(session); |
1895 | | ScanTableToStrings(client_table_, &rows); |
1896 | | ASSERT_EQ(1, rows.size()); |
1897 | | ASSERT_EQ("{ int32:1, int32:2, string:\"\", null }", rows[0]); |
1898 | | rows.clear(); |
1899 | | |
1900 | | // Test update/delete |
1901 | | LOG(INFO) << "Testing update/delete in same batch, key " << 1 << "."; |
1902 | | ApplyUpdateToSession(session.get(), client_table_, 1, 1); |
1903 | | ApplyDeleteToSession(session.get(), client_table_, 1); |
1904 | | FlushSessionOrDie(session); |
1905 | | ScanTableToStrings(client_table_, &rows); |
1906 | | ASSERT_EQ(0, rows.size()); |
1907 | | |
1908 | | // Test delete/insert (insert a row first) |
1909 | | LOG(INFO) << "Inserting row for delete/insert test, key " << 1 << "."; |
1910 | | ApplyInsertToSession(session.get(), client_table_, 1, 1, ""); |
1911 | | FlushSessionOrDie(session); |
1912 | | ScanTableToStrings(client_table_, &rows); |
1913 | | ASSERT_EQ(1, rows.size()); |
1914 | | ASSERT_EQ("{ int32:1, int32:1, string:\"\", null }", rows[0]); |
1915 | | rows.clear(); |
1916 | | LOG(INFO) << "Testing delete/insert in same batch, key " << 1 << "."; |
1917 | | ApplyDeleteToSession(session.get(), client_table_, 1); |
1918 | | ApplyInsertToSession(session.get(), client_table_, 1, 2, ""); |
1919 | | FlushSessionOrDie(session); |
1920 | | ScanTableToStrings(client_table_, &rows); |
1921 | | ASSERT_EQ(1, rows.size()); |
1922 | | ASSERT_EQ("{ int32:1, int32:2, string:\"\", null }", rows[0]); |
1923 | | rows.clear(); |
1924 | | } |
1925 | | |
1926 | | // Tests that master permits are properly released after a whole bunch of |
1927 | | // rows are inserted. |
1928 | 0 | TEST_F(ClientTest, TestMasterLookupPermits) { |
1929 | 0 | int initial_value = client_->data_->meta_cache_->master_lookup_sem_.GetValue(); |
1930 | 0 | ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows)); |
1931 | 0 | ASSERT_EQ(initial_value, |
1932 | 0 | client_->data_->meta_cache_->master_lookup_sem_.GetValue()); |
1933 | 0 | } |
1934 | | |
1935 | | // Define callback for deadlock simulation, as well as various helper methods. |
1936 | | namespace { |
1937 | | |
1938 | | class DeadlockSimulationCallback { |
1939 | | public: |
1940 | 0 | explicit DeadlockSimulationCallback(Atomic32* i) : i_(i) {} |
1941 | | |
1942 | 0 | void operator()(FlushStatus* flush_status) const { |
1943 | 0 | CHECK_OK(flush_status->status); |
1944 | 0 | NoBarrier_AtomicIncrement(i_, 1); |
1945 | 0 | } |
1946 | | private: |
1947 | | Atomic32* const i_; |
1948 | | }; |
1949 | | |
1950 | | // Returns col1 value of first row. |
1951 | 0 | int32_t ReadFirstRowKeyFirstCol(const TableHandle& tbl) { |
1952 | 0 | TableRange range(tbl); |
1953 | |
|
1954 | 0 | auto it = range.begin(); |
1955 | 0 | EXPECT_NE(it, range.end()); |
1956 | 0 | return it->column(1).int32_value(); |
1957 | 0 | } |
1958 | | |
1959 | | // Checks that all rows have value equal to expected, return number of rows. |
1960 | | int CheckRowsEqual(const TableHandle& tbl, int32_t expected) { |
1961 | | int cnt = 0; |
1962 | | for (const auto& row : TableRange(tbl)) { |
1963 | | EXPECT_EQ(row.column(1).int32_value(), expected); |
1964 | | EXPECT_EQ(row.column(2).string_value(), ""); |
1965 | | EXPECT_EQ(row.column(3).int32_value(), 12345); |
1966 | | ++cnt; |
1967 | | } |
1968 | | return cnt; |
1969 | | } |
1970 | | |
1971 | | // Return a session "loaded" with updates. Sets the session timeout |
1972 | | // to the parameter value. Larger timeouts decrease false positives. |
1973 | | shared_ptr<YBSession> LoadedSession(YBClient* client, |
1974 | | const TableHandle& tbl, |
1975 | 0 | bool fwd, int max, MonoDelta timeout) { |
1976 | 0 | shared_ptr<YBSession> session = client->NewSession(); |
1977 | 0 | session->SetTimeout(timeout); |
1978 | 0 | for (int i = 0; i < max; ++i) { |
1979 | 0 | int key = fwd ? i : max - i; |
1980 | 0 | ApplyUpdateToSession(session.get(), tbl, key, fwd); |
1981 | 0 | } |
1982 | 0 | return session; |
1983 | 0 | } |
1984 | | |
1985 | | } // anonymous namespace |
1986 | | |
1987 | | // Starts many clients which update a table in parallel. |
1988 | | // Half of the clients update rows in ascending order while the other |
1989 | | // half update rows in descending order. |
1990 | | // This ensures that we don't hit a deadlock in such a situation. |
1991 | 0 | TEST_F(ClientTest, TestDeadlockSimulation) { |
1992 | 0 | if (!AllowSlowTests()) { |
1993 | 0 | LOG(WARNING) << "TestDeadlockSimulation disabled since slow."; |
1994 | 0 | return; |
1995 | 0 | } |
1996 | | |
1997 | | // Make reverse client who will make batches that update rows |
1998 | | // in reverse order. Separate client used so rpc calls come in at same time. |
1999 | 0 | auto rev_client = ASSERT_RESULT(YBClientBuilder() |
2000 | 0 | .add_master_server_addr(ToString(cluster_->mini_master()->bound_rpc_addr())) |
2001 | 0 | .Build()); |
2002 | 0 | TableHandle rev_table; |
2003 | 0 | ASSERT_OK(rev_table.Open(kTableName, client_.get())); |
2004 | | |
2005 | | // Load up some rows |
2006 | 0 | const int kNumRows = 300; |
2007 | 0 | const auto kTimeout = 60s; |
2008 | 0 | auto session = CreateSession(); |
2009 | 0 | for (int i = 0; i < kNumRows; ++i) |
2010 | 0 | ApplyInsertToSession(session.get(), client_table_, i, i, ""); |
2011 | 0 | FlushSessionOrDie(session); |
2012 | | |
2013 | | // Check both clients see rows |
2014 | 0 | auto fwd = CountRowsFromClient(client_table_); |
2015 | 0 | ASSERT_EQ(kNumRows, fwd); |
2016 | 0 | auto rev = CountRowsFromClient(rev_table); |
2017 | 0 | ASSERT_EQ(kNumRows, rev); |
2018 | | |
2019 | | // Generate sessions |
2020 | 0 | const int kNumSessions = 100; |
2021 | 0 | shared_ptr<YBSession> fwd_sessions[kNumSessions]; |
2022 | 0 | shared_ptr<YBSession> rev_sessions[kNumSessions]; |
2023 | 0 | for (int i = 0; i < kNumSessions; ++i) { |
2024 | 0 | fwd_sessions[i] = LoadedSession(client_.get(), client_table_, true, kNumRows, kTimeout); |
2025 | 0 | rev_sessions[i] = LoadedSession(rev_client.get(), rev_table, true, kNumRows, kTimeout); |
2026 | 0 | } |
2027 | | |
2028 | | // Run async calls - one thread updates sequentially, another in reverse. |
2029 | 0 | Atomic32 ctr1, ctr2; |
2030 | 0 | NoBarrier_Store(&ctr1, 0); |
2031 | 0 | NoBarrier_Store(&ctr2, 0); |
2032 | 0 | for (int i = 0; i < kNumSessions; ++i) { |
2033 | | // The callbacks are freed after they are invoked. |
2034 | 0 | fwd_sessions[i]->FlushAsync(DeadlockSimulationCallback(&ctr1)); |
2035 | 0 | rev_sessions[i]->FlushAsync(DeadlockSimulationCallback(&ctr2)); |
2036 | 0 | } |
2037 | | |
2038 | | // Spin while waiting for ops to complete. |
2039 | 0 | int lctr1, lctr2, prev1 = 0, prev2 = 0; |
2040 | 0 | do { |
2041 | 0 | lctr1 = NoBarrier_Load(&ctr1); |
2042 | 0 | lctr2 = NoBarrier_Load(&ctr2); |
2043 | | // Display progress in 10% increments. |
2044 | 0 | if (prev1 == 0 || lctr1 + lctr2 - prev1 - prev2 > kNumSessions / 10) { |
2045 | 0 | LOG(INFO) << "# updates: " << lctr1 << " fwd, " << lctr2 << " rev"; |
2046 | 0 | prev1 = lctr1; |
2047 | 0 | prev2 = lctr2; |
2048 | 0 | } |
2049 | 0 | SleepFor(MonoDelta::FromMilliseconds(100)); |
2050 | 0 | } while (lctr1 != kNumSessions|| lctr2 != kNumSessions); |
2051 | 0 | int32_t expected = ReadFirstRowKeyFirstCol(client_table_); |
2052 | | |
2053 | | // Check transaction from forward client. |
2054 | 0 | fwd = CheckRowsEqual(client_table_, expected); |
2055 | 0 | ASSERT_EQ(fwd, kNumRows); |
2056 | | |
2057 | | // Check from reverse client side. |
2058 | 0 | rev = CheckRowsEqual(rev_table, expected); |
2059 | 0 | ASSERT_EQ(rev, kNumRows); |
2060 | 0 | } |
2061 | | |
2062 | 0 | TEST_F(ClientTest, TestCreateDuplicateTable) { |
2063 | 0 | std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
2064 | 0 | ASSERT_TRUE(table_creator->table_name(kTableName) |
2065 | 0 | .schema(&schema_) |
2066 | 0 | .Create().IsAlreadyPresent()); |
2067 | 0 | } |
2068 | | |
2069 | 0 | TEST_F(ClientTest, CreateTableWithoutTservers) { |
2070 | 0 | DoTearDown(); |
2071 | |
|
2072 | 0 | YBMiniClusterTestBase::SetUp(); |
2073 | |
|
2074 | 0 | MiniClusterOptions options; |
2075 | 0 | options.num_tablet_servers = 0; |
2076 | | // Start minicluster with only master (to simulate tserver not yet heartbeating). |
2077 | 0 | cluster_.reset(new MiniCluster(options)); |
2078 | 0 | ASSERT_OK(cluster_->Start()); |
2079 | | |
2080 | | // Connect to the cluster. |
2081 | 0 | client_ = ASSERT_RESULT(YBClientBuilder() |
2082 | 0 | .add_master_server_addr(yb::ToString(cluster_->mini_master()->bound_rpc_addr())) |
2083 | 0 | .Build()); |
2084 | |
|
2085 | 0 | std::unique_ptr<client::YBTableCreator> table_creator(client_->NewTableCreator()); |
2086 | 0 | Status s = table_creator->table_name(YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "foobar")) |
2087 | 0 | .schema(&schema_) |
2088 | 0 | .Create(); |
2089 | 0 | ASSERT_TRUE(s.IsInvalidArgument()); |
2090 | 0 | ASSERT_STR_CONTAINS(s.ToString(), "num_tablets should be greater than 0."); |
2091 | 0 | } |
2092 | | |
2093 | 0 | TEST_F(ClientTest, TestCreateTableWithTooManyTablets) { |
2094 | 0 | FLAGS_max_create_tablets_per_ts = 1; |
2095 | 0 | auto many_tablets = FLAGS_replication_factor + 1; |
2096 | |
|
2097 | 0 | std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
2098 | 0 | Status s = table_creator->table_name(YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "foobar")) |
2099 | 0 | .schema(&schema_) |
2100 | 0 | .num_tablets(many_tablets) |
2101 | 0 | .Create(); |
2102 | 0 | ASSERT_TRUE(s.IsInvalidArgument()); |
2103 | 0 | ASSERT_STR_CONTAINS( |
2104 | 0 | s.ToString(), |
2105 | 0 | strings::Substitute( |
2106 | 0 | "The requested number of tablets ($0) is over the permitted maximum ($1)", many_tablets, |
2107 | 0 | FLAGS_replication_factor)); |
2108 | 0 | } |
2109 | | |
2110 | | // TODO(bogdan): Disabled until ENG-2687 |
2111 | 0 | TEST_F(ClientTest, DISABLED_TestCreateTableWithTooManyReplicas) { |
2112 | 0 | std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
2113 | 0 | Status s = table_creator->table_name(YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "foobar")) |
2114 | 0 | .schema(&schema_) |
2115 | 0 | .num_tablets(2) |
2116 | 0 | .Create(); |
2117 | 0 | ASSERT_TRUE(s.IsInvalidArgument()); |
2118 | 0 | ASSERT_STR_CONTAINS(s.ToString(), |
2119 | 0 | "Not enough live tablet servers to create table with the requested " |
2120 | 0 | "replication factor 3. 1 tablet servers are alive"); |
2121 | 0 | } |
2122 | | |
2123 | | // Test that scanners will retry after receiving ERROR_SERVER_TOO_BUSY from an |
2124 | | // overloaded tablet server. Regression test for KUDU-1079. |
2125 | 0 | TEST_F(ClientTest, TestServerTooBusyRetry) { |
2126 | 0 | ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows)); |
2127 | | |
2128 | | // Introduce latency in each scan to increase the likelihood of |
2129 | | // ERROR_SERVER_TOO_BUSY. |
2130 | 0 | FLAGS_TEST_scanner_inject_latency_on_each_batch_ms = 10; |
2131 | | |
2132 | | // Reduce the service queue length of each tablet server in order to increase |
2133 | | // the likelihood of ERROR_SERVER_TOO_BUSY. |
2134 | 0 | FLAGS_tablet_server_svc_queue_length = 1; |
2135 | | // Set the backoff limits to be small for this test, so that we finish in a reasonable |
2136 | | // amount of time. |
2137 | 0 | FLAGS_min_backoff_ms_exponent = 0; |
2138 | 0 | FLAGS_max_backoff_ms_exponent = 3; |
2139 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
2140 | 0 | MiniTabletServer* ts = cluster_->mini_tablet_server(i); |
2141 | 0 | ASSERT_OK(ts->Restart()); |
2142 | 0 | ASSERT_OK(ts->WaitStarted()); |
2143 | 0 | } |
2144 | |
|
2145 | 0 | TestThreadHolder thread_holder; |
2146 | 0 | std::mutex idle_threads_mutex; |
2147 | 0 | std::vector<CountDownLatch*> idle_threads; |
2148 | 0 | std::atomic<int> running_threads{0}; |
2149 | |
|
2150 | 0 | while (!thread_holder.stop_flag().load()) { |
2151 | 0 | CountDownLatch* latch; |
2152 | 0 | { |
2153 | 0 | std::lock_guard<std::mutex> lock(idle_threads_mutex); |
2154 | 0 | if (!idle_threads.empty()) { |
2155 | 0 | latch = idle_threads.back(); |
2156 | 0 | idle_threads.pop_back(); |
2157 | 0 | } else { |
2158 | 0 | latch = nullptr; |
2159 | 0 | } |
2160 | 0 | } |
2161 | 0 | if (latch) { |
2162 | 0 | latch->CountDown(); |
2163 | 0 | } else { |
2164 | 0 | auto num_threads = ++running_threads; |
2165 | 0 | LOG(INFO) << "Start " << num_threads << " thread"; |
2166 | 0 | thread_holder.AddThreadFunctor([this, &idle_threads, &idle_threads_mutex, |
2167 | 0 | &stop = thread_holder.stop_flag(), &running_threads]() { |
2168 | 0 | CountDownLatch latch(1); |
2169 | 0 | while (!stop.load()) { |
2170 | 0 | CheckRowCount(client_table_); |
2171 | 0 | latch.Reset(1); |
2172 | 0 | { |
2173 | 0 | std::lock_guard<std::mutex> lock(idle_threads_mutex); |
2174 | 0 | idle_threads.push_back(&latch); |
2175 | 0 | } |
2176 | 0 | latch.Wait(); |
2177 | 0 | } |
2178 | 0 | --running_threads; |
2179 | 0 | }); |
2180 | 0 | std::this_thread::sleep_for(10ms); |
2181 | 0 | } |
2182 | |
|
2183 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
2184 | 0 | scoped_refptr<Counter> counter = METRIC_rpcs_queue_overflow.Instantiate( |
2185 | 0 | cluster_->mini_tablet_server(i)->server()->metric_entity()); |
2186 | 0 | if (counter->value() > 0) { |
2187 | 0 | thread_holder.stop_flag().store(true, std::memory_order_release); |
2188 | 0 | break; |
2189 | 0 | } |
2190 | 0 | } |
2191 | 0 | } |
2192 | |
|
2193 | 0 | while (running_threads.load() > 0) { |
2194 | 0 | LOG(INFO) << "Left to stop " << running_threads.load() << " threads"; |
2195 | 0 | { |
2196 | 0 | std::lock_guard<std::mutex> lock(idle_threads_mutex); |
2197 | 0 | while (!idle_threads.empty()) { |
2198 | 0 | idle_threads.back()->CountDown(1); |
2199 | 0 | idle_threads.pop_back(); |
2200 | 0 | } |
2201 | 0 | } |
2202 | 0 | std::this_thread::sleep_for(10ms); |
2203 | 0 | } |
2204 | 0 | thread_holder.JoinAll(); |
2205 | 0 | } |
2206 | | |
2207 | 0 | TEST_F(ClientTest, TestReadFromFollower) { |
2208 | | // Create table and write some rows. |
2209 | 0 | const YBTableName kReadFromFollowerTable(YQL_DATABASE_CQL, "TestReadFromFollower"); |
2210 | 0 | TableHandle table; |
2211 | 0 | ASSERT_NO_FATALS(CreateTable(kReadFromFollowerTable, 1, &table)); |
2212 | 0 | ASSERT_NO_FATALS(InsertTestRows(table, FLAGS_test_scan_num_rows)); |
2213 | | |
2214 | | // Find the followers. |
2215 | 0 | GetTableLocationsRequestPB req; |
2216 | 0 | GetTableLocationsResponsePB resp; |
2217 | 0 | table->name().SetIntoTableIdentifierPB(req.mutable_table()); |
2218 | 0 | CHECK_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp)); |
2219 | 0 | ASSERT_EQ(1, resp.tablet_locations_size()); |
2220 | 0 | ASSERT_EQ(3, resp.tablet_locations(0).replicas_size()); |
2221 | 0 | const string& tablet_id = resp.tablet_locations(0).tablet_id(); |
2222 | |
|
2223 | 0 | vector<master::TSInfoPB> followers; |
2224 | 0 | for (const auto& replica : resp.tablet_locations(0).replicas()) { |
2225 | 0 | if (replica.role() == PeerRole::FOLLOWER) { |
2226 | 0 | followers.push_back(replica.ts_info()); |
2227 | 0 | } |
2228 | 0 | } |
2229 | 0 | ASSERT_EQ(cluster_->num_tablet_servers() - 1, followers.size()); |
2230 | |
|
2231 | 0 | auto client_messenger = |
2232 | 0 | CreateAutoShutdownMessengerHolder(ASSERT_RESULT(CreateMessenger("client"))); |
2233 | 0 | rpc::ProxyCache proxy_cache(client_messenger.get()); |
2234 | 0 | for (const master::TSInfoPB& ts_info : followers) { |
2235 | | // Try to read from followers. |
2236 | 0 | auto tserver_proxy = std::make_unique<tserver::TabletServerServiceProxy>( |
2237 | 0 | &proxy_cache, HostPortFromPB(ts_info.private_rpc_addresses(0))); |
2238 | |
|
2239 | 0 | std::unique_ptr<QLRowBlock> row_block; |
2240 | 0 | ASSERT_OK(WaitFor([&]() -> bool { |
2241 | | // Setup read request. |
2242 | 0 | tserver::ReadRequestPB req; |
2243 | 0 | tserver::ReadResponsePB resp; |
2244 | 0 | rpc::RpcController controller; |
2245 | 0 | req.set_tablet_id(tablet_id); |
2246 | 0 | req.set_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX); |
2247 | 0 | QLReadRequestPB *ql_read = req.mutable_ql_batch()->Add(); |
2248 | 0 | std::shared_ptr<std::vector<ColumnSchema>> selected_cols = |
2249 | 0 | std::make_shared<std::vector<ColumnSchema>>(schema_.columns()); |
2250 | 0 | QLRSRowDescPB *rsrow_desc = ql_read->mutable_rsrow_desc(); |
2251 | 0 | for (size_t i = 0; i < schema_.num_columns(); i++) { |
2252 | 0 | ql_read->add_selected_exprs()->set_column_id(narrow_cast<int32_t>(kFirstColumnId + i)); |
2253 | 0 | ql_read->mutable_column_refs()->add_ids(narrow_cast<int32_t>(kFirstColumnId + i)); |
2254 | |
|
2255 | 0 | QLRSColDescPB *rscol_desc = rsrow_desc->add_rscol_descs(); |
2256 | 0 | rscol_desc->set_name((*selected_cols)[i].name()); |
2257 | 0 | (*selected_cols)[i].type()->ToQLTypePB(rscol_desc->mutable_ql_type()); |
2258 | 0 | } |
2259 | |
|
2260 | 0 | EXPECT_OK(tserver_proxy->Read(req, &resp, &controller)); |
2261 | | |
2262 | | // Verify response. |
2263 | 0 | EXPECT_FALSE(resp.has_error()); |
2264 | 0 | EXPECT_EQ(1, resp.ql_batch_size()); |
2265 | 0 | const QLResponsePB &ql_resp = resp.ql_batch(0); |
2266 | 0 | EXPECT_EQ(QLResponsePB_QLStatus_YQL_STATUS_OK, ql_resp.status()); |
2267 | 0 | EXPECT_TRUE(ql_resp.has_rows_data_sidecar()); |
2268 | |
|
2269 | 0 | EXPECT_TRUE(controller.finished()); |
2270 | 0 | Slice rows_data = EXPECT_RESULT(controller.GetSidecar(ql_resp.rows_data_sidecar())); |
2271 | 0 | ql::RowsResult rows_result(kReadFromFollowerTable, selected_cols, rows_data.ToBuffer()); |
2272 | 0 | row_block = rows_result.GetRowBlock(); |
2273 | 0 | return implicit_cast<size_t>(FLAGS_test_scan_num_rows) == row_block->row_count(); |
2274 | 0 | }, MonoDelta::FromSeconds(30), "Waiting for replication to followers")); |
2275 | |
|
2276 | 0 | std::vector<bool> seen_key(row_block->row_count()); |
2277 | 0 | for (size_t i = 0; i < row_block->row_count(); i++) { |
2278 | 0 | const QLRow& row = row_block->row(i); |
2279 | 0 | auto key = row.column(0).int32_value(); |
2280 | 0 | ASSERT_LT(key, seen_key.size()); |
2281 | 0 | ASSERT_FALSE(seen_key[key]); |
2282 | 0 | seen_key[key] = true; |
2283 | 0 | ASSERT_EQ(key * 2, row.column(1).int32_value()); |
2284 | 0 | ASSERT_EQ(StringPrintf("hello %d", key), row.column(2).string_value()); |
2285 | 0 | ASSERT_EQ(key * 3, row.column(3).int32_value()); |
2286 | 0 | } |
2287 | 0 | } |
2288 | 0 | } |
2289 | | |
2290 | 0 | TEST_F(ClientTest, Capability) { |
2291 | 0 | constexpr CapabilityId kFakeCapability = 0x9c40e9a7; |
2292 | |
|
2293 | 0 | auto rt = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), client_table_.table()).get()); |
2294 | 0 | ASSERT_TRUE(rt.get() != nullptr); |
2295 | 0 | auto tservers = rt->GetRemoteTabletServers(); |
2296 | 0 | ASSERT_EQ(tservers.size(), 3); |
2297 | 0 | for (const auto& replica : tservers) { |
2298 | | // Capability is related to executable, so it should be present since we run mini cluster for |
2299 | | // this test. |
2300 | 0 | ASSERT_TRUE(replica->HasCapability(CAPABILITY_ClientTest)); |
2301 | | |
2302 | | // Check that fake capability is not reported. |
2303 | 0 | ASSERT_FALSE(replica->HasCapability(kFakeCapability)); |
2304 | | |
2305 | | // This capability is defined on the TServer, passed to the Master on registration, |
2306 | | // then propagated to the YBClient. Ensure that this runtime pipeline holds. |
2307 | 0 | ASSERT_TRUE(replica->HasCapability(CAPABILITY_TabletReportLimit)); |
2308 | 0 | } |
2309 | 0 | } |
2310 | | |
2311 | 0 | TEST_F(ClientTest, TestCreateTableWithRangePartition) { |
2312 | 0 | std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
2313 | 0 | const std::string kPgsqlTableName = "pgsqlrangepartitionedtable"; |
2314 | 0 | const std::string kPgsqlTableId = "pgsqlrangepartitionedtableid"; |
2315 | 0 | const size_t kColIdx = 1; |
2316 | 0 | const int64_t kKeyValue = 48238; |
2317 | 0 | auto pgsql_table_name = YBTableName( |
2318 | 0 | YQL_DATABASE_PGSQL, kPgsqlKeyspaceID, kPgsqlKeyspaceName, kPgsqlTableName); |
2319 | |
|
2320 | 0 | auto yql_table_name = YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "yqlrangepartitionedtable"); |
2321 | |
|
2322 | 0 | YBSchemaBuilder schemaBuilder; |
2323 | 0 | schemaBuilder.AddColumn("key")->PrimaryKey()->Type(yb::STRING)->NotNull(); |
2324 | 0 | schemaBuilder.AddColumn("value")->Type(yb::INT64)->NotNull(); |
2325 | 0 | YBSchema schema; |
2326 | 0 | EXPECT_OK(client_->CreateNamespaceIfNotExists(kPgsqlKeyspaceName, |
2327 | 0 | YQLDatabase::YQL_DATABASE_PGSQL, |
2328 | 0 | "" /* creator_role_name */, |
2329 | 0 | kPgsqlKeyspaceID)); |
2330 | | // Create a PGSQL table using range partition. |
2331 | 0 | EXPECT_OK(schemaBuilder.Build(&schema)); |
2332 | 0 | Status s = table_creator->table_name(pgsql_table_name) |
2333 | 0 | .table_id(kPgsqlTableId) |
2334 | 0 | .schema(&schema) |
2335 | 0 | .set_range_partition_columns({"key"}) |
2336 | 0 | .table_type(YBTableType::PGSQL_TABLE_TYPE) |
2337 | 0 | .num_tablets(1) |
2338 | 0 | .Create(); |
2339 | 0 | EXPECT_OK(s); |
2340 | | |
2341 | | // Write to the PGSQL table. |
2342 | 0 | shared_ptr<YBTable> pgsq_table; |
2343 | 0 | EXPECT_OK(client_->OpenTable(kPgsqlTableId , &pgsq_table)); |
2344 | 0 | std::shared_ptr<YBPgsqlWriteOp> pgsql_write_op(client::YBPgsqlWriteOp::NewInsert(pgsq_table)); |
2345 | 0 | PgsqlWriteRequestPB* psql_write_request = pgsql_write_op->mutable_request(); |
2346 | |
|
2347 | 0 | psql_write_request->add_range_column_values()->mutable_value()->set_string_value("pgsql_key1"); |
2348 | 0 | PgsqlColumnValuePB* pgsql_column = psql_write_request->add_column_values(); |
2349 | | // 1 is the index for column value. |
2350 | |
|
2351 | 0 | pgsql_column->set_column_id(pgsq_table->schema().ColumnId(kColIdx)); |
2352 | 0 | pgsql_column->mutable_expr()->mutable_value()->set_int64_value(kKeyValue); |
2353 | 0 | std::shared_ptr<YBSession> session = CreateSession(client_.get()); |
2354 | 0 | session->Apply(pgsql_write_op); |
2355 | | |
2356 | | // Create a YQL table using range partition. |
2357 | 0 | s = table_creator->table_name(yql_table_name) |
2358 | 0 | .schema(&schema) |
2359 | 0 | .set_range_partition_columns({"key"}) |
2360 | 0 | .table_type(YBTableType::YQL_TABLE_TYPE) |
2361 | 0 | .num_tablets(1) |
2362 | 0 | .Create(); |
2363 | 0 | EXPECT_OK(s); |
2364 | | |
2365 | | // Write to the YQL table. |
2366 | 0 | client::TableHandle table; |
2367 | 0 | EXPECT_OK(table.Open(yql_table_name, client_.get())); |
2368 | 0 | std::shared_ptr<YBqlWriteOp> write_op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
2369 | 0 | QLWriteRequestPB* const req = write_op->mutable_request(); |
2370 | 0 | req->add_range_column_values()->mutable_value()->set_string_value("key1"); |
2371 | 0 | QLColumnValuePB* column = req->add_column_values(); |
2372 | | // 1 is the index for column value. |
2373 | 0 | column->set_column_id(pgsq_table->schema().ColumnId(kColIdx)); |
2374 | 0 | column->mutable_expr()->mutable_value()->set_int64_value(kKeyValue); |
2375 | 0 | session->Apply(write_op); |
2376 | 0 | } |
2377 | | |
2378 | | // TODO(jason): enable the test in clang when we use clang version at least 9 (otherwise, there is a |
2379 | | // compilation error: P0428R2). |
2380 | | #if !defined(__clang__) |
2381 | | TEST_F(ClientTest, FlushTable) { |
2382 | | const tablet::Tablet* tablet; |
2383 | | constexpr int kTimeoutSecs = 30; |
2384 | | int current_row = 0; |
2385 | | |
2386 | | { |
2387 | | std::shared_ptr<TabletPeer> tablet_peer; |
2388 | | string tablet_id = GetFirstTabletId(client_table2_.get()); |
2389 | | for (auto& ts : cluster_->mini_tablet_servers()) { |
2390 | | ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(tablet_id, &tablet_peer)); |
2391 | | if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) { |
2392 | | break; |
2393 | | } |
2394 | | } |
2395 | | tablet = tablet_peer->tablet(); |
2396 | | } |
2397 | | |
2398 | | auto test_good_flush_and_compact = ([&]<class T>(T table_id_or_name) { |
2399 | | int initial_num_sst_files = tablet->GetCurrentVersionNumSSTFiles(); |
2400 | | |
2401 | | // Test flush table. |
2402 | | InsertTestRows(client_table2_, 1, current_row++); |
2403 | | ASSERT_EQ(tablet->GetCurrentVersionNumSSTFiles(), initial_num_sst_files); |
2404 | | ASSERT_OK(client_->FlushTables( |
2405 | | {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, false /* is_compaction */)); |
2406 | | ASSERT_EQ(tablet->GetCurrentVersionNumSSTFiles(), initial_num_sst_files + 1); |
2407 | | |
2408 | | // Insert and flush more rows. |
2409 | | InsertTestRows(client_table2_, 1, current_row++); |
2410 | | ASSERT_OK(client_->FlushTables( |
2411 | | {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, false /* is_compaction */)); |
2412 | | InsertTestRows(client_table2_, 1, current_row++); |
2413 | | ASSERT_OK(client_->FlushTables( |
2414 | | {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, false /* is_compaction */)); |
2415 | | |
2416 | | // Test compact table. |
2417 | | ASSERT_EQ(tablet->GetCurrentVersionNumSSTFiles(), initial_num_sst_files + 3); |
2418 | | ASSERT_OK(client_->FlushTables( |
2419 | | {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, true /* is_compaction */)); |
2420 | | ASSERT_EQ(tablet->GetCurrentVersionNumSSTFiles(), 1); |
2421 | | }); |
2422 | | |
2423 | | test_good_flush_and_compact(client_table2_.table()->id()); |
2424 | | test_good_flush_and_compact(client_table2_.table()->name()); |
2425 | | |
2426 | | auto test_bad_flush_and_compact = ([&]<class T>(T table_id_or_name) { |
2427 | | // Test flush table. |
2428 | | ASSERT_NOK(client_->FlushTables( |
2429 | | {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, false /* is_compaction */)); |
2430 | | // Test compact table. |
2431 | | ASSERT_NOK(client_->FlushTables( |
2432 | | {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, true /* is_compaction */)); |
2433 | | }); |
2434 | | |
2435 | | test_bad_flush_and_compact("bad table id"); |
2436 | | test_bad_flush_and_compact(YBTableName( |
2437 | | YQLDatabase::YQL_DATABASE_CQL, |
2438 | | "bad namespace name", |
2439 | | "bad table name")); |
2440 | | } |
2441 | | #endif // !defined(__clang__) |
2442 | | |
2443 | 0 | TEST_F(ClientTest, GetNamespaceInfo) { |
2444 | 0 | GetNamespaceInfoResponsePB resp; |
2445 | | |
2446 | | // Setup. |
2447 | 0 | ASSERT_OK(client_->CreateNamespace(kPgsqlKeyspaceName, |
2448 | 0 | YQLDatabase::YQL_DATABASE_PGSQL, |
2449 | 0 | "" /* creator_role_name */, |
2450 | 0 | kPgsqlKeyspaceID, |
2451 | 0 | "" /* source_namespace_id */, |
2452 | 0 | boost::none /* next_pg_oid */, |
2453 | 0 | nullptr /* txn */, |
2454 | 0 | true /* colocated */)); |
2455 | | |
2456 | | // CQL non-colocated. |
2457 | 0 | ASSERT_OK(client_->GetNamespaceInfo( |
2458 | 0 | "" /* namespace_id */, kKeyspaceName, YQL_DATABASE_CQL, &resp)); |
2459 | 0 | ASSERT_EQ(resp.namespace_().name(), kKeyspaceName); |
2460 | 0 | ASSERT_EQ(resp.namespace_().database_type(), YQL_DATABASE_CQL); |
2461 | 0 | ASSERT_FALSE(resp.colocated()); |
2462 | | |
2463 | | // SQL colocated. |
2464 | 0 | ASSERT_OK(client_->GetNamespaceInfo( |
2465 | 0 | kPgsqlKeyspaceID, "" /* namespace_name */, YQL_DATABASE_PGSQL, &resp)); |
2466 | 0 | ASSERT_EQ(resp.namespace_().id(), kPgsqlKeyspaceID); |
2467 | 0 | ASSERT_EQ(resp.namespace_().name(), kPgsqlKeyspaceName); |
2468 | 0 | ASSERT_EQ(resp.namespace_().database_type(), YQL_DATABASE_PGSQL); |
2469 | 0 | ASSERT_TRUE(resp.colocated()); |
2470 | 0 | } |
2471 | | |
2472 | 0 | TEST_F(ClientTest, BadMasterAddress) { |
2473 | 0 | auto messenger = ASSERT_RESULT(CreateMessenger("test-messenger")); |
2474 | 0 | auto host = "should.not.resolve"; |
2475 | | |
2476 | | // Put host entry in cache. |
2477 | 0 | ASSERT_NOK(messenger->resolver().Resolve(host)); |
2478 | |
|
2479 | 0 | { |
2480 | 0 | struct TestServerOptions : public server::ServerBaseOptions { |
2481 | 0 | TestServerOptions() : server::ServerBaseOptions(1) {} |
2482 | 0 | }; |
2483 | 0 | TestServerOptions opts; |
2484 | 0 | auto master_addr = std::make_shared<server::MasterAddresses>(); |
2485 | | // Put several hosts, so resolve would take place. |
2486 | 0 | master_addr->push_back({HostPort(host, 1)}); |
2487 | 0 | master_addr->push_back({HostPort(host, 2)}); |
2488 | 0 | opts.SetMasterAddresses(master_addr); |
2489 | |
|
2490 | 0 | AsyncClientInitialiser async_init( |
2491 | 0 | "test-client", /* num_reactors= */ 1, /* timeout_seconds= */ 1, "UUID", &opts, |
2492 | 0 | /* metric_entity= */ nullptr, /* parent_mem_tracker= */ nullptr, messenger.get()); |
2493 | 0 | async_init.Start(); |
2494 | 0 | async_init.get_client_future().wait_for(1s); |
2495 | 0 | } |
2496 | |
|
2497 | 0 | messenger->Shutdown(); |
2498 | 0 | } |
2499 | | |
2500 | 0 | TEST_F(ClientTest, RefreshPartitions) { |
2501 | 0 | const auto kLookupTimeout = 10s; |
2502 | 0 | const auto kNumLookupThreads = 2; |
2503 | 0 | const auto kNumLookups = 100; |
2504 | |
|
2505 | 0 | std::atomic<bool> stop_requested{false}; |
2506 | 0 | std::atomic<size_t> num_lookups_called{0}; |
2507 | 0 | std::atomic<size_t> num_lookups_done{0}; |
2508 | |
|
2509 | 0 | const auto callback = [&num_lookups_done]( |
2510 | 0 | size_t lookup_idx, const Result<internal::RemoteTabletPtr>& tablet) { |
2511 | 0 | const auto prefix = Format("Lookup $0 got ", lookup_idx); |
2512 | 0 | if (tablet.ok()) { |
2513 | 0 | LOG(INFO) << prefix << "tablet: " << (*tablet)->tablet_id(); |
2514 | 0 | } else { |
2515 | 0 | LOG(INFO) << prefix << "error: " << AsString(tablet.status()); |
2516 | 0 | } |
2517 | 0 | num_lookups_done.fetch_add(1); |
2518 | 0 | }; |
2519 | |
|
2520 | 0 | const auto lookup_func = [&]() { |
2521 | 0 | while(!stop_requested) { |
2522 | 0 | const auto hash_code = RandomUniformInt<uint16_t>(0, PartitionSchema::kMaxPartitionKey); |
2523 | 0 | const auto partition_key = PartitionSchema::EncodeMultiColumnHashValue(hash_code); |
2524 | 0 | client_->LookupTabletByKey( |
2525 | 0 | client_table_.table(), partition_key, CoarseMonoClock::now() + kLookupTimeout, |
2526 | 0 | std::bind(callback, num_lookups_called.fetch_add(1), std::placeholders::_1)); |
2527 | 0 | } |
2528 | 0 | }; |
2529 | |
|
2530 | 0 | const auto marker_func = [&]() { |
2531 | 0 | while(!stop_requested) { |
2532 | 0 | const auto table = client_table_.table(); |
2533 | 0 | table->MarkPartitionsAsStale(); |
2534 | |
|
2535 | 0 | Synchronizer synchronizer; |
2536 | 0 | table->RefreshPartitions(client_table_.client(), synchronizer.AsStdStatusCallback()); |
2537 | 0 | const auto status = synchronizer.Wait(); |
2538 | 0 | if (!status.ok()) { |
2539 | 0 | LOG(INFO) << status; |
2540 | 0 | } |
2541 | 0 | } |
2542 | 0 | }; |
2543 | |
|
2544 | 0 | LOG(INFO) << "Starting threads"; |
2545 | |
|
2546 | 0 | std::vector<std::thread> threads; |
2547 | 0 | for (int i = 0; i < kNumLookupThreads; ++i) { |
2548 | 0 | threads.push_back(std::thread(lookup_func)); |
2549 | 0 | } |
2550 | 0 | threads.push_back(std::thread(marker_func)); |
2551 | |
|
2552 | 0 | ASSERT_OK(LoggedWaitFor([&num_lookups_called]{ |
2553 | 0 | return num_lookups_called >= kNumLookups; |
2554 | 0 | }, 120s, "Tablet lookup calls")); |
2555 | |
|
2556 | 0 | LOG(INFO) << "Stopping threads"; |
2557 | 0 | stop_requested = true; |
2558 | 0 | for (auto& thread : threads) { |
2559 | 0 | thread.join(); |
2560 | 0 | } |
2561 | 0 | LOG(INFO) << "Stopped threads"; |
2562 | |
|
2563 | 0 | ASSERT_OK(LoggedWaitFor([&num_lookups_done, num_lookups = num_lookups_called.load()] { |
2564 | 0 | return num_lookups_done >= num_lookups; |
2565 | 0 | }, kLookupTimeout, "Tablet lookup responses")); |
2566 | |
|
2567 | 0 | LOG(INFO) << "num_lookups_done: " << num_lookups_done; |
2568 | 0 | } |
2569 | | |
2570 | | // There should be only one lookup RPC asking for colocated tables tablet locations. |
2571 | | // When we ask for tablet lookup for other tables colocated with the first one we asked, MetaCache |
2572 | | // should be able to respond without sending RPCs to master again. |
2573 | 0 | TEST_F(ClientTest, ColocatedTablesLookupTablet) { |
2574 | 0 | const auto kTabletLookupTimeout = 10s; |
2575 | 0 | const auto kNumTables = 10; |
2576 | |
|
2577 | 0 | YBTableName common_table_name( |
2578 | 0 | YQLDatabase::YQL_DATABASE_PGSQL, kPgsqlKeyspaceID, kPgsqlKeyspaceName, "table_name"); |
2579 | 0 | ASSERT_OK(client_->CreateNamespace( |
2580 | 0 | common_table_name.namespace_name(), |
2581 | 0 | common_table_name.namespace_type(), |
2582 | 0 | /* creator_role_name =*/ "", |
2583 | 0 | common_table_name.namespace_id(), |
2584 | 0 | /* source_namespace_id =*/ "", |
2585 | 0 | /* next_pg_oid =*/ boost::none, |
2586 | 0 | /* txn =*/ nullptr, |
2587 | 0 | /* colocated =*/ true)); |
2588 | |
|
2589 | 0 | YBSchemaBuilder schemaBuilder; |
2590 | 0 | schemaBuilder.AddColumn("key")->PrimaryKey()->Type(yb::INT64); |
2591 | 0 | schemaBuilder.AddColumn("value")->Type(yb::INT64); |
2592 | 0 | YBSchema schema; |
2593 | 0 | ASSERT_OK(schemaBuilder.Build(&schema)); |
2594 | |
|
2595 | 0 | std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator()); |
2596 | 0 | std::vector<YBTableName> table_names; |
2597 | 0 | for (auto i = 0; i < kNumTables; ++i) { |
2598 | 0 | const auto name = Format("table_$0", i); |
2599 | 0 | auto table_name = common_table_name; |
2600 | | // Autogenerated ids will fail the IsPgsqlId() CHECKs so we need to generate oids. |
2601 | 0 | table_name.set_table_id(GetPgsqlTableId(i, i)); |
2602 | 0 | table_name.set_table_name(name); |
2603 | 0 | ASSERT_OK(table_creator->table_name(table_name) |
2604 | 0 | .table_id(table_name.table_id()) |
2605 | 0 | .schema(&schema) |
2606 | 0 | .set_range_partition_columns({"key"}) |
2607 | 0 | .table_type(YBTableType::PGSQL_TABLE_TYPE) |
2608 | 0 | .num_tablets(1) |
2609 | 0 | .Create()); |
2610 | 0 | table_names.push_back(table_name); |
2611 | 0 | } |
2612 | |
|
2613 | 0 | const auto lookup_serial_start = client::internal::TEST_GetLookupSerial(); |
2614 | |
|
2615 | 0 | TabletId colocated_tablet_id; |
2616 | 0 | for (const auto& table_name : table_names) { |
2617 | 0 | auto table = ASSERT_RESULT(client_->OpenTable(table_name)); |
2618 | 0 | auto tablet = ASSERT_RESULT(client_->LookupTabletByKeyFuture( |
2619 | 0 | table, /* partition_key =*/ "", |
2620 | 0 | CoarseMonoClock::now() + kTabletLookupTimeout).get()); |
2621 | 0 | const auto tablet_id = tablet->tablet_id(); |
2622 | 0 | if (colocated_tablet_id.empty()) { |
2623 | 0 | colocated_tablet_id = tablet_id; |
2624 | 0 | } else { |
2625 | 0 | ASSERT_EQ(tablet_id, colocated_tablet_id); |
2626 | 0 | } |
2627 | 0 | } |
2628 | |
|
2629 | 0 | const auto lookup_serial_stop = client::internal::TEST_GetLookupSerial(); |
2630 | 0 | ASSERT_EQ(lookup_serial_stop, lookup_serial_start + 1); |
2631 | 0 | } |
2632 | | |
2633 | | class ClientTestWithHashAndRangePk : public ClientTest { |
2634 | | public: |
2635 | 1 | void SetUp() override { |
2636 | 1 | YBSchemaBuilder b; |
2637 | 1 | b.AddColumn("h")->Type(INT32)->HashPrimaryKey()->NotNull(); |
2638 | 1 | b.AddColumn("r")->Type(INT32)->PrimaryKey()->NotNull(); |
2639 | 1 | b.AddColumn("v")->Type(INT32); |
2640 | | |
2641 | 1 | CHECK_OK(b.Build(&schema_)); |
2642 | | |
2643 | 1 | ClientTest::SetUp(); |
2644 | 1 | } |
2645 | | |
2646 | 0 | shared_ptr<YBqlWriteOp> BuildTestRow(const TableHandle& table, int h, int r, int v) { |
2647 | 0 | auto insert = table.NewInsertOp(); |
2648 | 0 | auto req = insert->mutable_request(); |
2649 | 0 | QLAddInt32HashValue(req, h); |
2650 | 0 | const auto& columns = table.schema().columns(); |
2651 | 0 | table.AddInt32ColumnValue(req, columns[1].name(), r); |
2652 | 0 | table.AddInt32ColumnValue(req, columns[2].name(), v); |
2653 | 0 | return insert; |
2654 | 0 | } |
2655 | | }; |
2656 | | |
2657 | | // We concurrently execute batches of insert operations, each batch targeting the same hash |
2658 | | // partition key. Concurrently we emulate table partition list version increase and meta cache |
2659 | | // invalidation. |
2660 | | // This tests https://github.com/yugabyte/yugabyte-db/issues/9806 with a scenario |
2661 | | // when the batcher is emptied due to part of tablet lookups failing, but callback was not called. |
2662 | 0 | TEST_F_EX(ClientTest, EmptiedBatcherFlush, ClientTestWithHashAndRangePk) { |
2663 | 0 | constexpr auto kNumRowsPerBatch = 100; |
2664 | 0 | constexpr auto kWriters = 4; |
2665 | 0 | const auto kTotalNumBatches = 50; |
2666 | 0 | const auto kFlushTimeout = 10s * kTimeMultiplier; |
2667 | |
|
2668 | 0 | TestThreadHolder thread_holder; |
2669 | 0 | std::atomic<int> next_batch_hash_key{10000}; |
2670 | 0 | const auto stop_at_batch_hash_key = next_batch_hash_key.load() + kTotalNumBatches; |
2671 | |
|
2672 | 0 | for (int i = 0; i != kWriters; ++i) { |
2673 | 0 | thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag(), &next_batch_hash_key, |
2674 | 0 | num_rows_per_batch = kNumRowsPerBatch, stop_at_batch_hash_key, |
2675 | 0 | kFlushTimeout] { |
2676 | 0 | SetFlagOnExit set_flag_on_exit(&stop); |
2677 | |
|
2678 | 0 | while (!stop.load(std::memory_order_acquire)) { |
2679 | 0 | auto batch_hash_key = next_batch_hash_key.fetch_add(1); |
2680 | 0 | if (batch_hash_key >= stop_at_batch_hash_key) { |
2681 | 0 | break; |
2682 | 0 | } |
2683 | 0 | auto session = CreateSession(client_.get()); |
2684 | 0 | for (int r = 0; r < num_rows_per_batch; r++) { |
2685 | 0 | session->Apply(BuildTestRow(client_table_, batch_hash_key, r, 0)); |
2686 | 0 | } |
2687 | 0 | auto flush_future = session->FlushFuture(); |
2688 | 0 | ASSERT_EQ(flush_future.wait_for(kFlushTimeout), std::future_status::ready) |
2689 | 0 | << "batch_hash_key: " << batch_hash_key; |
2690 | 0 | const auto& flush_status = flush_future.get(); |
2691 | 0 | if (!flush_status.status.ok() || !flush_status.errors.empty()) { |
2692 | 0 | LogSessionErrorsAndDie(flush_status); |
2693 | 0 | } |
2694 | 0 | } |
2695 | 0 | }); |
2696 | 0 | } |
2697 | |
|
2698 | 0 | thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] { |
2699 | 0 | SetFlagOnExit set_flag_on_exit(&stop); |
2700 | |
|
2701 | 0 | const auto table = client_table_.table(); |
2702 | |
|
2703 | 0 | while (!stop.load(std::memory_order_acquire)) { |
2704 | 0 | ASSERT_OK(cluster_->mini_master()->catalog_manager() |
2705 | 0 | .TEST_IncrementTablePartitionListVersion(table->id())); |
2706 | 0 | table->MarkPartitionsAsStale(); |
2707 | 0 | SleepFor(10ms); |
2708 | 0 | } |
2709 | 0 | }); |
2710 | |
|
2711 | 0 | thread_holder.JoinAll(); |
2712 | 0 | } |
2713 | | |
2714 | | class ClientTestWithThreeMasters : public ClientTest { |
2715 | | protected: |
2716 | 1 | int NumMasters() override { |
2717 | 1 | return 3; |
2718 | 1 | } |
2719 | | |
2720 | 0 | Status InitClient() override { |
2721 | | // Connect to the cluster using hostnames. |
2722 | 0 | string master_addrs; |
2723 | 0 | for (int i = 1; i <= NumMasters(); ++i) { |
2724 | | // TEST_RpcAddress is 1-indexed, but mini_master is 0-indexed. |
2725 | 0 | master_addrs += server::TEST_RpcAddress(i, server::Private::kFalse) + |
2726 | 0 | ":" + yb::ToString(cluster_->mini_master(i - 1)->bound_rpc_addr().port()); |
2727 | 0 | if (i < NumMasters()) { |
2728 | 0 | master_addrs += ","; |
2729 | 0 | } |
2730 | 0 | } |
2731 | |
|
2732 | 0 | client_ = VERIFY_RESULT(YBClientBuilder() |
2733 | 0 | .add_master_server_addr(master_addrs) |
2734 | 0 | .Build()); |
2735 | |
|
2736 | 0 | return Status::OK(); |
2737 | 0 | } |
2738 | | }; |
2739 | | |
2740 | 0 | TEST_F_EX(ClientTest, IsMultiMasterWithFailingHostnameResolution, ClientTestWithThreeMasters) { |
2741 | 0 | google::FlagSaver flag_saver; |
2742 | | // TEST_RpcAddress is 1-indexed. |
2743 | 0 | string hostname = server::TEST_RpcAddress(cluster_->LeaderMasterIdx() + 1, |
2744 | 0 | server::Private::kFalse); |
2745 | | |
2746 | | // Shutdown the master leader, and wait for new leader to get elected. |
2747 | 0 | ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Shutdown(); |
2748 | 0 | ASSERT_RESULT(cluster_->GetLeaderMiniMaster()); |
2749 | | |
2750 | | // Fail resolution of the old leader master's hostname. |
2751 | 0 | FLAGS_TEST_fail_to_fast_resolve_address = hostname; |
2752 | 0 | LOG(INFO) << "Setting FLAGS_TEST_fail_to_fast_resolve_address to: " |
2753 | 0 | << FLAGS_TEST_fail_to_fast_resolve_address; |
2754 | | |
2755 | | // Make a client request to the leader master, since that master is no longer the leader, we will |
2756 | | // check that we have a MultiMaster setup. That check should not fail even though one of the |
2757 | | // master addresses currently doesn't resolve. Thus, we should be able to find the new master |
2758 | | // leader and complete the request. |
2759 | 0 | ASSERT_RESULT(client_->ListTables()); |
2760 | 0 | } |
2761 | | |
2762 | | } // namespace client |
2763 | | } // namespace yb |