YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/client-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <algorithm>
34
#include <functional>
35
#include <regex>
36
#include <set>
37
#include <thread>
38
#include <vector>
39
40
#include <gflags/gflags.h>
41
#include <gtest/gtest.h>
42
43
#include "yb/client/async_initializer.h"
44
#include "yb/client/client-internal.h"
45
#include "yb/client/client-test-util.h"
46
#include "yb/client/client.h"
47
#include "yb/client/client_utils.h"
48
#include "yb/client/error.h"
49
#include "yb/client/meta_cache.h"
50
#include "yb/client/schema.h"
51
#include "yb/client/session.h"
52
#include "yb/client/table.h"
53
#include "yb/client/table_alterer.h"
54
#include "yb/client/table_creator.h"
55
#include "yb/client/table_handle.h"
56
#include "yb/client/table_info.h"
57
#include "yb/client/tablet_server.h"
58
#include "yb/client/value.h"
59
#include "yb/client/yb_op.h"
60
61
#include "yb/common/partial_row.h"
62
#include "yb/common/ql_type.h"
63
#include "yb/common/ql_value.h"
64
#include "yb/common/schema.h"
65
#include "yb/common/wire_protocol.h"
66
67
#include "yb/consensus/consensus.proxy.h"
68
69
#include "yb/gutil/algorithm.h"
70
#include "yb/gutil/atomicops.h"
71
#include "yb/gutil/stl_util.h"
72
#include "yb/gutil/strings/substitute.h"
73
74
#include "yb/integration-tests/mini_cluster.h"
75
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
76
77
#include "yb/master/catalog_manager_if.h"
78
#include "yb/master/master.h"
79
#include "yb/master/master_client.pb.h"
80
#include "yb/master/master_ddl.pb.h"
81
#include "yb/master/master_error.h"
82
#include "yb/master/mini_master.h"
83
84
#include "yb/rpc/messenger.h"
85
#include "yb/rpc/proxy.h"
86
#include "yb/rpc/rpc_controller.h"
87
#include "yb/rpc/rpc_test_util.h"
88
89
#include "yb/tablet/tablet.h"
90
#include "yb/tablet/tablet_metadata.h"
91
#include "yb/tablet/tablet_peer.h"
92
93
#include "yb/tserver/mini_tablet_server.h"
94
#include "yb/tserver/tablet_server.h"
95
#include "yb/tserver/ts_tablet_manager.h"
96
#include "yb/tserver/tserver_service.proxy.h"
97
98
#include "yb/util/capabilities.h"
99
#include "yb/util/metrics.h"
100
#include "yb/util/net/dns_resolver.h"
101
#include "yb/util/net/sockaddr.h"
102
#include "yb/util/random_util.h"
103
#include "yb/util/status.h"
104
#include "yb/util/status_log.h"
105
#include "yb/util/stopwatch.h"
106
#include "yb/util/test_thread_holder.h"
107
#include "yb/util/test_util.h"
108
#include "yb/util/thread.h"
109
#include "yb/util/tostring.h"
110
#include "yb/util/tsan_util.h"
111
112
#include "yb/yql/cql/ql/util/statement_result.h"
113
114
DECLARE_bool(enable_data_block_fsync);
115
DECLARE_bool(log_inject_latency);
116
DECLARE_double(leader_failure_max_missed_heartbeat_periods);
117
DECLARE_int32(heartbeat_interval_ms);
118
DECLARE_int32(log_inject_latency_ms_mean);
119
DECLARE_int32(log_inject_latency_ms_stddev);
120
DECLARE_int32(master_inject_latency_on_tablet_lookups_ms);
121
DECLARE_int32(max_create_tablets_per_ts);
122
DECLARE_int32(TEST_scanner_inject_latency_on_each_batch_ms);
123
DECLARE_int32(scanner_max_batch_size_bytes);
124
DECLARE_int32(scanner_ttl_ms);
125
DECLARE_int32(tablet_server_svc_queue_length);
126
DECLARE_int32(replication_factor);
127
128
DEFINE_int32(test_scan_num_rows, 1000, "Number of rows to insert and scan");
129
DECLARE_int32(min_backoff_ms_exponent);
130
DECLARE_int32(max_backoff_ms_exponent);
131
DECLARE_bool(TEST_force_master_lookup_all_tablets);
132
DECLARE_double(TEST_simulate_lookup_timeout_probability);
133
DECLARE_string(TEST_fail_to_fast_resolve_address);
134
135
METRIC_DECLARE_counter(rpcs_queue_overflow);
136
137
DEFINE_CAPABILITY(ClientTest, 0x1523c5ae);
138
DECLARE_CAPABILITY(TabletReportLimit);
139
140
using namespace std::literals; // NOLINT
141
using namespace std::placeholders;
142
143
namespace yb {
144
namespace client {
145
146
using std::string;
147
using std::set;
148
using std::vector;
149
150
using base::subtle::Atomic32;
151
using base::subtle::NoBarrier_AtomicIncrement;
152
using base::subtle::NoBarrier_Load;
153
using base::subtle::NoBarrier_Store;
154
using master::CatalogManager;
155
using master::GetNamespaceInfoResponsePB;
156
using master::GetTableLocationsRequestPB;
157
using master::GetTableLocationsResponsePB;
158
using master::TabletLocationsPB;
159
using master::TSInfoPB;
160
using std::shared_ptr;
161
using tablet::TabletPeer;
162
using tserver::MiniTabletServer;
163
164
namespace {
165
166
constexpr int32_t kNoBound = kint32max;
167
constexpr int kNumTablets = 2;
168
169
const std::string kKeyspaceName = "my_keyspace";
170
const std::string kPgsqlKeyspaceID = "1234";
171
const std::string kPgsqlKeyspaceName = "psql" + kKeyspaceName;
172
173
} // namespace
174
175
class ClientTest: public YBMiniClusterTestBase<MiniCluster> {
176
 public:
177
61
  ClientTest() {
178
61
    YBSchemaBuilder b;
179
61
    b.AddColumn("key")->Type(INT32)->NotNull()->HashPrimaryKey();
180
61
    b.AddColumn("int_val")->Type(INT32)->NotNull();
181
61
    b.AddColumn("string_val")->Type(STRING)->Nullable();
182
61
    b.AddColumn("non_null_with_default")->Type(INT32)->NotNull();
183
61
    CHECK_OK(b.Build(&schema_));
184
185
61
    FLAGS_enable_data_block_fsync = false; // Keep unit tests fast.
186
61
  }
187
188
61
  void SetUp() override {
189
61
    YBMiniClusterTestBase::SetUp();
190
191
    // Reduce the TS<->Master heartbeat interval
192
61
    FLAGS_heartbeat_interval_ms = 10;
193
194
    // Start minicluster and wait for tablet servers to connect to master.
195
61
    auto opts = MiniClusterOptions();
196
61
    opts.num_tablet_servers = 3;
197
61
    opts.num_masters = NumMasters();
198
61
    cluster_.reset(new MiniCluster(opts));
199
61
    ASSERT_OK(cluster_->Start());
200
201
    // Connect to the cluster.
202
0
    ASSERT_OK(InitClient());
203
204
    // Create a keyspace;
205
0
    ASSERT_OK(client_->CreateNamespace(kKeyspaceName));
206
207
0
    ASSERT_NO_FATALS(CreateTable(kTableName, kNumTablets, &client_table_));
208
0
    ASSERT_NO_FATALS(CreateTable(kTable2Name, 1, &client_table2_));
209
0
  }
210
211
5
  void DoTearDown() override {
212
5
    client_.reset();
213
5
    if (cluster_) {
214
5
      cluster_->Shutdown();
215
5
      cluster_.reset();
216
5
    }
217
5
    YBMiniClusterTestBase::DoTearDown();
218
5
  }
219
220
 protected:
221
  static const YBTableName kTableName;
222
  static const YBTableName kTable2Name;
223
  static const YBTableName kTable3Name;
224
225
60
  virtual int NumMasters() {
226
60
    return 1;
227
60
  }
228
229
0
  virtual Status InitClient() {
230
0
    client_ = VERIFY_RESULT(YBClientBuilder()
231
0
        .add_master_server_addr(yb::ToString(cluster_->mini_master()->bound_rpc_addr()))
232
0
        .Build());
233
0
    return Status::OK();
234
0
  }
235
236
0
  string GetFirstTabletId(YBTable* table) {
237
0
    GetTableLocationsRequestPB req;
238
0
    GetTableLocationsResponsePB resp;
239
0
    table->name().SetIntoTableIdentifierPB(req.mutable_table());
240
0
    CHECK_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp));
241
0
    CHECK_GT(resp.tablet_locations_size(), 0);
242
0
    return resp.tablet_locations(0).tablet_id();
243
0
  }
244
245
0
  void CheckNoRpcOverflow() {
246
0
    for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
247
0
      MiniTabletServer* server = cluster_->mini_tablet_server(i);
248
0
      if (server->is_started()) {
249
0
        ASSERT_EQ(0, server->server()->rpc_server()->
250
0
            TEST_service_pool("yb.tserver.TabletServerService")->
251
0
            RpcsQueueOverflowMetric()->value());
252
0
      }
253
0
    }
254
0
  }
255
256
0
  YBSessionPtr CreateSession(YBClient* client = nullptr) {
257
0
    if (client == nullptr) {
258
0
      client = client_.get();
259
0
    }
260
0
    std::shared_ptr<YBSession> session = client->NewSession();
261
0
    session->SetTimeout(10s);
262
0
    return session;
263
0
  }
264
265
  // Inserts 'num_rows' test rows using 'client'
266
0
  void InsertTestRows(YBClient* client, const TableHandle& table, int num_rows, int first_row = 0) {
267
0
    auto session = CreateSession(client);
268
0
    for (int i = first_row; i < num_rows + first_row; i++) {
269
0
      session->Apply(BuildTestRow(table, i));
270
0
    }
271
0
    FlushSessionOrDie(session);
272
0
    ASSERT_NO_FATALS(CheckNoRpcOverflow());
273
0
  }
274
275
  // Inserts 'num_rows' using the default client.
276
0
  void InsertTestRows(const TableHandle& table, int num_rows, int first_row = 0) {
277
0
    InsertTestRows(client_.get(), table, num_rows, first_row);
278
0
  }
279
280
0
  void UpdateTestRows(const TableHandle& table, int lo, int hi) {
281
0
    auto session = CreateSession();
282
0
    for (int i = lo; i < hi; i++) {
283
0
      session->Apply(UpdateTestRow(table, i));
284
0
    }
285
0
    FlushSessionOrDie(session);
286
0
    ASSERT_NO_FATALS(CheckNoRpcOverflow());
287
0
  }
288
289
0
  void DeleteTestRows(const TableHandle& table, int lo, int hi) {
290
0
    auto session = CreateSession();
291
0
    for (int i = lo; i < hi; i++) {
292
0
      session->Apply(DeleteTestRow(table, i));
293
0
    }
294
0
    FlushSessionOrDie(session);
295
0
    ASSERT_NO_FATALS(CheckNoRpcOverflow());
296
0
  }
297
298
0
  shared_ptr<YBqlWriteOp> BuildTestRow(const TableHandle& table, int index) {
299
0
    auto insert = table.NewInsertOp();
300
0
    auto req = insert->mutable_request();
301
0
    QLAddInt32HashValue(req, index);
302
0
    const auto& columns = table.schema().columns();
303
0
    table.AddInt32ColumnValue(req, columns[1].name(), index * 2);
304
0
    table.AddStringColumnValue(req, columns[2].name(), StringPrintf("hello %d", index));
305
0
    table.AddInt32ColumnValue(req, columns[3].name(), index * 3);
306
0
    return insert;
307
0
  }
308
309
0
  shared_ptr<YBqlWriteOp> UpdateTestRow(const TableHandle& table, int index) {
310
0
    auto update = table.NewUpdateOp();
311
0
    auto req = update->mutable_request();
312
0
    QLAddInt32HashValue(req, index);
313
0
    const auto& columns = table.schema().columns();
314
0
    table.AddInt32ColumnValue(req, columns[1].name(), index * 2 + 1);
315
0
    table.AddStringColumnValue(req, columns[2].name(), StringPrintf("hello again %d", index));
316
0
    return update;
317
0
  }
318
319
0
  shared_ptr<YBqlWriteOp> DeleteTestRow(const TableHandle& table, int index) {
320
0
    auto del = table.NewDeleteOp();
321
0
    QLAddInt32HashValue(del->mutable_request(), index);
322
0
    return del;
323
0
  }
324
325
0
  void DoTestScanWithoutPredicates() {
326
0
    client::TableIteratorOptions options;
327
0
    options.columns = std::vector<std::string>{"key"};
328
0
    LOG_TIMING(INFO, "Scanning with no predicates") {
329
0
      uint64_t sum = 0;
330
0
      for (const auto& row : client::TableRange(client_table_, options)) {
331
0
        sum += row.column(0).int32_value();
332
0
      }
333
      // The sum should be the sum of the arithmetic series from
334
      // 0..FLAGS_test_scan_num_rows-1
335
0
      uint64_t expected = FLAGS_test_scan_num_rows *
336
0
          (0 + (FLAGS_test_scan_num_rows - 1)) / 2;
337
0
      ASSERT_EQ(expected, sum);
338
0
    }
339
0
  }
340
341
0
  void DoTestScanWithStringPredicate() {
342
0
    TableIteratorOptions options;
343
0
    options.filter = FilterBetween("hello 2"s, Inclusive::kFalse,
344
0
                                   "hello 3"s, Inclusive::kFalse,
345
0
                                   "string_val");
346
347
0
    bool found = false;
348
0
    LOG_TIMING(INFO, "Scanning with string predicate") {
349
0
      for (const auto& row : TableRange(client_table_, options)) {
350
0
        found = true;
351
0
        Slice slice(row.column(2).string_value());
352
0
        if (!slice.starts_with("hello 2") && !slice.starts_with("hello 3")) {
353
0
          FAIL() << row.ToString();
354
0
        }
355
0
      }
356
0
    }
357
0
    ASSERT_TRUE(found);
358
0
  }
359
360
0
  void DoTestScanWithKeyPredicate() {
361
0
    auto op = client_table_.NewReadOp();
362
0
    auto req = op->mutable_request();
363
364
0
    auto* const condition = req->mutable_where_expr()->mutable_condition();
365
0
    condition->set_op(QL_OP_AND);
366
0
    client_table_.AddInt32Condition(condition, "key", QL_OP_GREATER_THAN_EQUAL, 5);
367
0
    client_table_.AddInt32Condition(condition, "key", QL_OP_LESS_THAN_EQUAL, 10);
368
0
    client_table_.AddColumns({"key"}, req);
369
0
    auto session = client_->NewSession();
370
0
    session->SetTimeout(60s);
371
0
    ASSERT_OK(session->ApplyAndFlush(op));
372
0
    ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status());
373
0
    auto rowblock = ql::RowsResult(op.get()).GetRowBlock();
374
0
    for (const auto& row : rowblock->rows()) {
375
0
      int32_t key = row.column(0).int32_value();
376
0
      ASSERT_GE(key, 5);
377
0
      ASSERT_LE(key, 10);
378
0
    }
379
0
  }
380
381
  // Creates a table with RF=FLAGS_replication_factor, split into tablets based on 'split_rows'
382
  // (or single tablet if 'split_rows' is empty).
383
  void CreateTable(const YBTableName& table_name_orig,
384
                   int num_tablets,
385
0
                   TableHandle* table) {
386
0
    size_t num_replicas = FLAGS_replication_factor;
387
    // The implementation allows table name without a keyspace.
388
0
    YBTableName table_name(table_name_orig.namespace_type(), table_name_orig.has_namespace() ?
389
0
        table_name_orig.namespace_name() : kKeyspaceName, table_name_orig.table_name());
390
391
0
    bool added_replicas = false;
392
    // Add more tablet servers to satisfy all replicas, if necessary.
393
0
    while (cluster_->num_tablet_servers() < num_replicas) {
394
0
      ASSERT_OK(cluster_->AddTabletServer());
395
0
      added_replicas = true;
396
0
    }
397
398
0
    if (added_replicas) {
399
0
      ASSERT_OK(cluster_->WaitForTabletServerCount(num_replicas));
400
0
    }
401
402
0
    ASSERT_OK(table->Create(table_name, num_tablets, schema_, client_.get()));
403
0
  }
404
405
  // Kills a tablet server.
406
  // Boolean flags control whether to restart the tserver, and if so, whether to wait for it to
407
  // finish bootstrapping.
408
0
  Status KillTServerImpl(const string& uuid, const bool restart, const bool wait_started) {
409
0
    bool ts_found = false;
410
0
    for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
411
0
      MiniTabletServer* ts = cluster_->mini_tablet_server(i);
412
0
      if (ts->server()->instance_pb().permanent_uuid() == uuid) {
413
0
        if (restart) {
414
0
          LOG(INFO) << "Restarting TS at " << ts->bound_rpc_addr();
415
0
              RETURN_NOT_OK(ts->Restart());
416
0
          if (wait_started) {
417
0
            LOG(INFO) << "Waiting for TS " << ts->bound_rpc_addr() << " to finish bootstrapping";
418
0
                RETURN_NOT_OK(ts->WaitStarted());
419
0
          }
420
0
        } else {
421
0
          LOG(INFO) << "Killing TS " << uuid << " at " << ts->bound_rpc_addr();
422
0
          ts->Shutdown();
423
0
        }
424
0
        ts_found = true;
425
0
        break;
426
0
      }
427
0
    }
428
0
    if (!ts_found) {
429
0
      return STATUS(InvalidArgument, strings::Substitute("Could not find tablet server $1", uuid));
430
0
    }
431
432
0
    return Status::OK();
433
0
  }
434
435
0
  Status RestartTServerAndWait(const string& uuid) {
436
0
    return KillTServerImpl(uuid, true, true);
437
0
  }
438
439
0
  Status RestartTServerAsync(const string& uuid) {
440
0
    return KillTServerImpl(uuid, true, false);
441
0
  }
442
443
0
  Status KillTServer(const string& uuid) {
444
0
    return KillTServerImpl(uuid, false, false);
445
0
  }
446
447
  void DoApplyWithoutFlushTest(int sleep_micros);
448
449
0
  Result<std::unique_ptr<rpc::Messenger>> CreateMessenger(const std::string& name) {
450
0
    return rpc::MessengerBuilder(name).Build();
451
0
  }
452
453
  void VerifyKeyRangeFiltering(const std::vector<string>& sorted_partitions,
454
                               const std::vector<internal::RemoteTabletPtr>& tablets,
455
0
                               const string& start_key, const string& end_key) {
456
0
    auto start_idx = FindPartitionStartIndex(sorted_partitions, start_key);
457
0
    auto end_idx = FindPartitionStartIndexExclusiveBound(sorted_partitions, end_key);
458
0
    auto filtered_tablets =
459
0
        ASSERT_RESULT(FilterTabletsByHashPartitionKeyRange(tablets, start_key, end_key));
460
0
    std::vector<string> filtered_partitions;
461
0
    std::transform(filtered_tablets.begin(), filtered_tablets.end(),
462
0
                   std::back_inserter(filtered_partitions),
463
0
                   [](const auto& tablet) { return tablet->partition().partition_key_start(); });
464
0
    std::sort(filtered_partitions.begin(), filtered_partitions.end());
465
466
0
    ASSERT_EQ(filtered_partitions,
467
0
              std::vector<string>(&sorted_partitions[start_idx], &sorted_partitions[end_idx + 1]));
468
0
  }
469
470
  size_t FindPartitionStartIndexExclusiveBound(
471
    const std::vector<std::string>& partitions,
472
0
    const std::string& partition_key) {
473
0
    if (partition_key.empty()) {
474
0
      return partitions.size() - 1;
475
0
    }
476
477
0
    auto it = std::lower_bound(partitions.begin(), partitions.end(), partition_key);
478
0
    if (it == partitions.end() || *it >= partition_key) {
479
0
      if (it == partitions.begin()) {
480
0
        return 0;
481
0
      }
482
0
      --it;
483
0
    }
484
0
    return it - partitions.begin();
485
0
  }
486
487
  enum WhichServerToKill {
488
    DEAD_MASTER,
489
    DEAD_TSERVER
490
  };
491
  void DoTestWriteWithDeadServer(WhichServerToKill which);
492
493
  YBSchema schema_;
494
495
  std::unique_ptr<MiniCluster> cluster_;
496
  std::unique_ptr<YBClient> client_;
497
  TableHandle client_table_;
498
  TableHandle client_table2_;
499
  TableHandle client_table3_;
500
};
501
502
503
const YBTableName ClientTest::kTableName(YQL_DATABASE_CQL, kKeyspaceName, "client-testtb");
504
const YBTableName ClientTest::kTable2Name(YQL_DATABASE_CQL, kKeyspaceName, "client-testtb2");
505
const YBTableName ClientTest::kTable3Name(YQL_DATABASE_CQL, kKeyspaceName, "client-testtb3");
506
507
namespace {
508
509
0
TableFilter MakeFilter(int32_t lower_bound, int32_t upper_bound, std::string column = "key") {
510
0
  if (lower_bound != kNoBound) {
511
0
    if (upper_bound != kNoBound) {
512
0
      return FilterBetween(lower_bound, Inclusive::kTrue, upper_bound, Inclusive::kTrue,
513
0
                           std::move(column));
514
0
    } else {
515
0
      return FilterGreater(lower_bound, Inclusive::kTrue, std::move(column));
516
0
    }
517
0
  }
518
0
  if (upper_bound != kNoBound) {
519
0
    return FilterLess(upper_bound, Inclusive::kTrue, std::move(column));
520
0
  }
521
0
  return TableFilter();
522
0
}
523
524
size_t CountRowsFromClient(const TableHandle& table, YBConsistencyLevel consistency,
525
0
                        int32_t lower_bound, int32_t upper_bound) {
526
0
  TableIteratorOptions options;
527
0
  options.consistency = consistency;
528
0
  options.columns = std::vector<std::string>{"key"};
529
0
  options.filter = MakeFilter(lower_bound, upper_bound);
530
0
  return boost::size(TableRange(table, options));
531
0
}
532
533
0
size_t CountRowsFromClient(const TableHandle& table, int32_t lower_bound, int32_t upper_bound) {
534
0
  return CountRowsFromClient(table, YBConsistencyLevel::STRONG, lower_bound, upper_bound);
535
0
}
536
537
0
size_t CountRowsFromClient(const TableHandle& table) {
538
0
  return CountRowsFromClient(table, kNoBound, kNoBound);
539
0
}
540
541
// Count the rows of a table, checking that the operation succeeds.
542
//
543
// Must be public to use as a thread closure.
544
0
void CheckRowCount(const TableHandle& table) {
545
0
  CountRowsFromClient(table);
546
0
}
547
548
} // namespace
549
550
constexpr int kLookupWaitTimeSecs = 30;
551
constexpr int kNumTabletsPerTable = 8;
552
constexpr int kNumIterations = 1000;
553
554
class ClientTestForceMasterLookup :
555
    public ClientTest, public ::testing::WithParamInterface<bool /* force_master_lookup */> {
556
 public:
557
2
  void SetUp() override {
558
2
    ClientTest::SetUp();
559
    // Do we want to force going to the master instead of using cache.
560
2
    SetAtomicFlag(GetParam(), &FLAGS_TEST_force_master_lookup_all_tablets);
561
2
    SetAtomicFlag(0.5, &FLAGS_TEST_simulate_lookup_timeout_probability);
562
2
  }
563
564
565
0
  void PerformManyLookups(const std::shared_ptr<YBTable>& table, bool point_lookup) {
566
0
    for (int i = 0; i < kNumIterations; i++) {
567
0
      if (point_lookup) {
568
0
          auto key_rt = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), table).get());
569
0
          ASSERT_NOTNULL(key_rt);
570
0
      } else {
571
0
        auto tablets = ASSERT_RESULT(client_->LookupAllTabletsFuture(
572
0
            table, CoarseMonoClock::Now() + MonoDelta::FromSeconds(kLookupWaitTimeSecs)).get());
573
0
        ASSERT_EQ(tablets.size(), kNumTabletsPerTable);
574
0
      }
575
0
    }
576
0
  }
577
};
578
579
INSTANTIATE_TEST_CASE_P(ForceMasterLookup, ClientTestForceMasterLookup, ::testing::Bool());
580
581
0
TEST_P(ClientTestForceMasterLookup, TestConcurrentLookups) {
582
0
  ASSERT_NO_FATALS(CreateTable(kTable3Name, kNumTabletsPerTable, &client_table3_));
583
584
0
  std::shared_ptr<YBTable> table;
585
0
  ASSERT_OK(client_->OpenTable(kTable3Name, &table));
586
587
0
  ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()->
588
0
            WaitUntilCatalogManagerIsLeaderAndReadyForTests());
589
590
0
  auto t1 = std::thread([&]() { ASSERT_NO_FATALS(
591
0
      PerformManyLookups(table, true /* point_lookup */)); });
592
0
  auto t2 = std::thread([&]() { ASSERT_NO_FATALS(
593
0
      PerformManyLookups(table, false /* point_lookup */)); });
594
595
0
  t1.join();
596
0
  t2.join();
597
0
}
598
599
0
TEST_F(ClientTest, TestLookupAllTablets) {
600
0
  ASSERT_NO_FATALS(CreateTable(kTable3Name, kNumTabletsPerTable, &client_table3_));
601
602
0
  std::shared_ptr<YBTable> table;
603
0
  ASSERT_OK(client_->OpenTable(kTable3Name, &table));
604
605
0
  ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()->
606
0
            WaitUntilCatalogManagerIsLeaderAndReadyForTests());
607
608
0
  auto future = client_->LookupAllTabletsFuture(
609
0
      table, CoarseMonoClock::Now() + MonoDelta::FromSeconds(kLookupWaitTimeSecs));
610
611
0
  auto tablets = ASSERT_RESULT(future.get());
612
0
  ASSERT_EQ(tablets.size(), 8);
613
0
}
614
615
0
TEST_F(ClientTest, TestPointThenRangeLookup) {
616
0
  ASSERT_NO_FATALS(CreateTable(kTable3Name, kNumTabletsPerTable, &client_table3_));
617
618
0
  std::shared_ptr<YBTable> table;
619
0
  ASSERT_OK(client_->OpenTable(kTable3Name, &table));
620
621
0
  ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()->
622
0
            WaitUntilCatalogManagerIsLeaderAndReadyForTests());
623
624
0
  auto key_rt = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), table).get());
625
0
  ASSERT_NOTNULL(key_rt);
626
627
0
  auto tablets = ASSERT_RESULT(client_->LookupAllTabletsFuture(
628
0
      table, CoarseMonoClock::Now() + MonoDelta::FromSeconds(kLookupWaitTimeSecs)).get());
629
630
0
  ASSERT_EQ(tablets.size(), kNumTabletsPerTable);
631
0
}
632
633
0
TEST_F(ClientTest, TestKeyRangeFiltering) {
634
0
  ASSERT_NO_FATALS(CreateTable(kTable3Name, 8, &client_table3_));
635
636
0
  std::shared_ptr<YBTable> table;
637
0
  ASSERT_OK(client_->OpenTable(kTable3Name, &table));
638
639
0
  ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->master()->
640
0
            WaitUntilCatalogManagerIsLeaderAndReadyForTests());
641
642
0
  auto tablets = ASSERT_RESULT(client_->LookupAllTabletsFuture(
643
0
      table, CoarseMonoClock::Now() + MonoDelta::FromSeconds(kLookupWaitTimeSecs)).get());
644
  // First, verify, that using empty bounds on both sides returns all tablets.
645
0
  auto filtered_tablets =
646
0
      ASSERT_RESULT(FilterTabletsByHashPartitionKeyRange(tablets, std::string(), std::string()));
647
0
  ASSERT_EQ(kNumTabletsPerTable, filtered_tablets.size());
648
649
0
  std::vector<std::string> partition_starts;
650
0
  for (const auto& tablet : tablets) {
651
0
    partition_starts.push_back(tablet->partition().partition_key_start());
652
0
  }
653
0
  std::sort(partition_starts.begin(), partition_starts.end());
654
655
0
  auto start_key = partition_starts[0];
656
0
  auto end_key = partition_starts[2];
657
0
  ASSERT_NO_FATALS(VerifyKeyRangeFiltering(partition_starts, tablets, start_key, end_key));
658
659
0
  start_key = partition_starts[5];
660
0
  end_key = partition_starts[7];
661
0
  ASSERT_NO_FATALS(VerifyKeyRangeFiltering(partition_starts, tablets, start_key, end_key));
662
663
0
  auto fixed_key = PartitionSchema::EncodeMultiColumnHashValue(10);
664
0
  ASSERT_NOK(FilterTabletsByHashPartitionKeyRange(tablets, fixed_key, fixed_key));
665
666
0
  for (int i = 0; i < kNumIterations; i++) {
667
0
    auto start_idx = RandomUniformInt(0, PartitionSchema::kMaxPartitionKey - 1);
668
0
    auto end_idx = RandomUniformInt(start_idx + 1, PartitionSchema::kMaxPartitionKey);
669
0
    ASSERT_NO_FATALS(VerifyKeyRangeFiltering(partition_starts, tablets,
670
0
                     PartitionSchema::EncodeMultiColumnHashValue(start_idx),
671
0
                     PartitionSchema::EncodeMultiColumnHashValue(end_idx)));
672
0
  }
673
0
}
674
675
0
TEST_F(ClientTest, TestListTables) {
676
0
  auto tables = ASSERT_RESULT(client_->ListTables());
677
0
  std::sort(tables.begin(), tables.end(), [](const YBTableName& n1, const YBTableName& n2) {
678
0
    return n1.ToString() < n2.ToString();
679
0
  });
680
0
  ASSERT_EQ(2 + master::kNumSystemTablesWithTxn, tables.size());
681
0
  ASSERT_EQ(kTableName, tables[0]) << "Tables:" << AsString(tables);
682
0
  ASSERT_EQ(kTable2Name, tables[1]) << "Tables:" << AsString(tables);
683
0
  tables.clear();
684
0
  tables = ASSERT_RESULT(client_->ListTables("testtb2"));
685
0
  ASSERT_EQ(1, tables.size());
686
0
  ASSERT_EQ(kTable2Name, tables[0]) << "Tables:" << AsString(tables);
687
0
}
688
689
0
TEST_F(ClientTest, TestListTabletServers) {
690
0
  auto tss = ASSERT_RESULT(client_->ListTabletServers());
691
0
  ASSERT_EQ(3, tss.size());
692
0
  set<string> actual_ts_uuids;
693
0
  set<string> actual_ts_hostnames;
694
0
  set<string> expected_ts_uuids;
695
0
  set<string> expected_ts_hostnames;
696
0
  for (size_t i = 0; i < tss.size(); ++i) {
697
0
    auto server = cluster_->mini_tablet_server(i)->server();
698
0
    expected_ts_uuids.insert(server->instance_pb().permanent_uuid());
699
0
    actual_ts_uuids.insert(tss[i].uuid);
700
0
    expected_ts_hostnames.insert(server->options().broadcast_addresses[0].host());
701
0
    actual_ts_hostnames.insert(tss[i].hostname);
702
0
  }
703
0
  ASSERT_EQ(expected_ts_uuids, actual_ts_uuids);
704
0
  ASSERT_EQ(expected_ts_hostnames, actual_ts_hostnames);
705
0
}
706
707
0
bool TableNotFound(const Status& status) {
708
0
  return status.IsNotFound()
709
0
         && (master::MasterError(status) == master::MasterErrorPB::OBJECT_NOT_FOUND);
710
0
}
711
712
TEST_F(ClientTest, TestBadTable) {
713
  shared_ptr<YBTable> t;
714
  Status s = client_->OpenTable(
715
      YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "xxx-does-not-exist"), &t);
716
  ASSERT_TRUE(TableNotFound(s)) << s;
717
}
718
719
// Test that, if the master is down, we experience a network error talking
720
// to it (no "find the new leader master" since there's only one master).
721
0
TEST_F(ClientTest, TestMasterDown) {
722
0
  DontVerifyClusterBeforeNextTearDown();
723
0
  cluster_->mini_master()->Shutdown();
724
0
  shared_ptr<YBTable> t;
725
0
  client_->data_->default_admin_operation_timeout_ = MonoDelta::FromSeconds(1);
726
0
  Status s = client_->OpenTable(YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "other-tablet"), &t);
727
0
  ASSERT_TRUE(s.IsTimedOut());
728
0
}
729
730
// TODO scan with predicates is not supported.
731
0
TEST_F(ClientTest, TestScan) {
732
0
  ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows));
733
734
0
  ASSERT_EQ(FLAGS_test_scan_num_rows, CountRowsFromClient(client_table_));
735
736
  // Scan after insert
737
0
  DoTestScanWithoutPredicates();
738
0
  DoTestScanWithStringPredicate();
739
0
  DoTestScanWithKeyPredicate();
740
741
  // Scan after update
742
0
  UpdateTestRows(client_table_, 0, FLAGS_test_scan_num_rows);
743
0
  DoTestScanWithKeyPredicate();
744
745
  // Scan after delete half
746
0
  DeleteTestRows(client_table_, 0, FLAGS_test_scan_num_rows / 2);
747
0
  DoTestScanWithKeyPredicate();
748
749
  // Scan after delete all
750
0
  DeleteTestRows(client_table_, FLAGS_test_scan_num_rows / 2 + 1, FLAGS_test_scan_num_rows);
751
0
  DoTestScanWithKeyPredicate();
752
753
  // Scan after re-insert
754
0
  InsertTestRows(client_table_, 1);
755
0
  DoTestScanWithKeyPredicate();
756
0
}
757
758
0
void CheckCounts(const TableHandle& table, const std::vector<int>& expected) {
759
0
  std::vector<std::pair<int, int>> bounds = {
760
0
    { kNoBound, kNoBound },
761
0
    { kNoBound, 15 },
762
0
    { 27, kNoBound },
763
0
    { 0, 15 },
764
0
    { 0, 10 },
765
0
    { 0, 20 },
766
0
    { 0, 30 },
767
0
    { 14, 30 },
768
0
    { 30, 30 },
769
0
    { 50, kNoBound },
770
0
  };
771
0
  ASSERT_EQ(bounds.size(), expected.size());
772
0
  for (size_t i = 0; i != bounds.size(); ++i) {
773
0
    ASSERT_EQ(expected[i], CountRowsFromClient(table, bounds[i].first, bounds[i].second));
774
0
  }
775
  // Run through various scans.
776
0
}
777
778
0
TEST_F(ClientTest, TestScanMultiTablet) {
779
  // 5 tablets, each with 10 rows worth of space.
780
0
  TableHandle table;
781
0
  ASSERT_NO_FATALS(CreateTable(YBTableName(YQL_DATABASE_CQL, "TestScanMultiTablet"), 5, &table));
782
783
  // Insert rows with keys 12, 13, 15, 17, 22, 23, 25, 27...47 into each
784
  // tablet, except the first which is empty.
785
0
    auto session = CreateSession();
786
0
  for (int i = 1; i < 5; i++) {
787
0
    session->Apply(BuildTestRow(table, 2 + (i * 10)));
788
0
    session->Apply(BuildTestRow(table, 3 + (i * 10)));
789
0
    session->Apply(BuildTestRow(table, 5 + (i * 10)));
790
0
    session->Apply(BuildTestRow(table, 7 + (i * 10)));
791
0
  }
792
0
  FlushSessionOrDie(session);
793
794
  // Run through various scans.
795
0
  CheckCounts(table, { 16, 3, 9, 3, 0, 4, 8, 6, 0, 0 });
796
797
  // Update every other row
798
0
  for (int i = 1; i < 5; ++i) {
799
0
    session->Apply(UpdateTestRow(table, 2 + i * 10));
800
0
    session->Apply(UpdateTestRow(table, 5 + i * 10));
801
0
  }
802
0
  FlushSessionOrDie(session);
803
804
  // Check all counts the same (make sure updates don't change # of rows)
805
0
  CheckCounts(table, { 16, 3, 9, 3, 0, 4, 8, 6, 0, 0 });
806
807
  // Delete half the rows
808
0
  for (int i = 1; i < 5; ++i) {
809
0
    session->Apply(DeleteTestRow(table, 5 + i*10));
810
0
    session->Apply(DeleteTestRow(table, 7 + i*10));
811
0
  }
812
0
  FlushSessionOrDie(session);
813
814
  // Check counts changed accordingly
815
0
  CheckCounts(table, { 8, 2, 4, 2, 0, 2, 4, 2, 0, 0 });
816
817
  // Delete rest of rows
818
0
  for (int i = 1; i < 5; ++i) {
819
0
    session->Apply(DeleteTestRow(table, 2 + i*10));
820
0
    session->Apply(DeleteTestRow(table, 3 + i*10));
821
0
  }
822
0
  FlushSessionOrDie(session);
823
824
  // Check counts changed accordingly
825
0
  CheckCounts(table, { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 });
826
0
}
827
828
0
TEST_F(ClientTest, TestScanEmptyTable) {
829
0
  TableIteratorOptions options;
830
0
  options.columns = std::vector<std::string>();
831
0
  ASSERT_EQ(boost::size(TableRange(client_table_, options)), 0);
832
0
}
833
834
// Test scanning with an empty projection. This should yield an empty
835
// row block with the proper number of rows filled in. Impala issues
836
// scans like this in order to implement COUNT(*).
837
0
TEST_F(ClientTest, TestScanEmptyProjection) {
838
0
  ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows));
839
0
  TableIteratorOptions options;
840
0
  options.columns = std::vector<std::string>();
841
0
  ASSERT_EQ(boost::size(TableRange(client_table_, options)), FLAGS_test_scan_num_rows);
842
0
}
843
844
// Test a scan where we have a predicate on a key column that is not
845
// in the projection.
846
0
TEST_F(ClientTest, TestScanPredicateKeyColNotProjected) {
847
0
  ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows));
848
849
0
  size_t nrows = 0;
850
0
  TableIteratorOptions options;
851
0
  options.columns = std::vector<std::string>{"key", "int_val"};
852
0
  options.filter = MakeFilter(5, 10);
853
0
  for (const auto& row : TableRange(client_table_, options)) {
854
0
    int32_t key = row.column(0).int32_value();
855
0
    int32_t val = row.column(1).int32_value();
856
0
    ASSERT_EQ(key * 2, val);
857
858
0
    ++nrows;
859
0
  }
860
861
0
  ASSERT_EQ(6, nrows);
862
0
}
863
864
// Test a scan where we have a predicate on a non-key column that is
865
// not in the projection.
866
0
TEST_F(ClientTest, TestScanPredicateNonKeyColNotProjected) {
867
0
  ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows));
868
869
0
  size_t nrows = 0;
870
0
  TableIteratorOptions options;
871
0
  options.columns = std::vector<std::string>{"key", "int_val"};
872
0
  options.filter = MakeFilter(10, 20, "int_val");
873
0
  TableRange range(client_table_, options);
874
0
  for (const auto& row : range) {
875
0
    int32_t key = row.column(0).int32_value();
876
0
    int32_t val = row.column(1).int32_value();
877
0
    ASSERT_EQ(key * 2, val);
878
879
0
    ++nrows;
880
0
  }
881
882
0
  ASSERT_EQ(nrows, 6);
883
0
}
884
885
0
TEST_F(ClientTest, TestGetTabletServerBlacklist) {
886
0
  TableHandle table;
887
0
  ASSERT_NO_FATALS(CreateTable(YBTableName(YQL_DATABASE_CQL, "blacklist"), kNumTablets, &table));
888
0
  InsertTestRows(table, 1, 0);
889
890
  // Look up the tablet and its replicas into the metadata cache.
891
  // We have to loop since some replicas may have been created slowly.
892
0
  scoped_refptr<internal::RemoteTablet> rt;
893
0
  while (true) {
894
0
    rt = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), table.table()).get());
895
0
    ASSERT_TRUE(rt.get() != nullptr);
896
0
    vector<internal::RemoteTabletServer*> tservers;
897
0
    rt->GetRemoteTabletServers(&tservers);
898
0
    if (tservers.size() == 3) {
899
0
      break;
900
0
    }
901
0
    rt->MarkStale();
902
0
    SleepFor(MonoDelta::FromMilliseconds(10));
903
0
  }
904
905
  // Get the Leader.
906
0
  internal::RemoteTabletServer *rts;
907
0
  set<string> blacklist;
908
0
  vector<internal::RemoteTabletServer*> candidates;
909
0
  vector<internal::RemoteTabletServer*> tservers;
910
0
  ASSERT_OK(client_->data_->GetTabletServer(client_.get(), rt,
911
0
                                            YBClient::LEADER_ONLY,
912
0
                                            blacklist, &candidates, &rts));
913
0
  tservers.push_back(rts);
914
  // Blacklist the leader, should not work.
915
0
  blacklist.insert(rts->permanent_uuid());
916
0
  {
917
0
    Status s = client_->data_->GetTabletServer(client_.get(), rt,
918
0
                                               YBClient::LEADER_ONLY,
919
0
                                               blacklist, &candidates, &rts);
920
0
    ASSERT_TRUE(s.IsServiceUnavailable());
921
0
  }
922
  // Keep blacklisting replicas until we run out.
923
0
  ASSERT_OK(client_->data_->GetTabletServer(client_.get(), rt,
924
0
                                            YBClient::CLOSEST_REPLICA,
925
0
                                            blacklist, &candidates, &rts));
926
0
  tservers.push_back(rts);
927
0
  blacklist.insert(rts->permanent_uuid());
928
0
  ASSERT_OK(client_->data_->GetTabletServer(client_.get(), rt,
929
0
                                            YBClient::FIRST_REPLICA,
930
0
                                            blacklist, &candidates, &rts));
931
0
  tservers.push_back(rts);
932
0
  blacklist.insert(rts->permanent_uuid());
933
934
  // Make sure none of the three modes work when all nodes are blacklisted.
935
0
  vector<YBClient::ReplicaSelection> selections;
936
0
  selections.push_back(YBClient::LEADER_ONLY);
937
0
  selections.push_back(YBClient::CLOSEST_REPLICA);
938
0
  selections.push_back(YBClient::FIRST_REPLICA);
939
0
  for (YBClient::ReplicaSelection selection : selections) {
940
0
    Status s = client_->data_->GetTabletServer(client_.get(), rt, selection,
941
0
                                               blacklist, &candidates, &rts);
942
0
    ASSERT_TRUE(s.IsServiceUnavailable());
943
0
  }
944
945
  // Make sure none of the modes work when all nodes are dead.
946
0
  for (internal::RemoteTabletServer* rt : tservers) {
947
0
    client_->data_->meta_cache_->MarkTSFailed(rt, STATUS(NetworkError, "test"));
948
0
  }
949
0
  blacklist.clear();
950
0
  for (YBClient::ReplicaSelection selection : selections) {
951
0
    Status s = client_->data_->GetTabletServer(client_.get(), rt,
952
0
                                               selection,
953
0
                                               blacklist, &candidates, &rts);
954
0
    ASSERT_TRUE(s.IsServiceUnavailable());
955
0
  }
956
0
}
957
958
TEST_F(ClientTest, TestScanWithEncodedRangePredicate) {
959
  TableHandle table;
960
  ASSERT_NO_FATALS(CreateTable(YBTableName(YQL_DATABASE_CQL, "split-table"),
961
                               kNumTablets,
962
                               &table));
963
964
  ASSERT_NO_FATALS(InsertTestRows(table, 100));
965
966
  TableRange all_range(table, {});
967
  auto all_rows = ScanToStrings(all_range);
968
  ASSERT_EQ(100, all_rows.size());
969
970
  // Test a double-sided range within first tablet
971
  {
972
    TableIteratorOptions options;
973
    options.filter = FilterBetween(5, Inclusive::kTrue, 8, Inclusive::kFalse);
974
    auto rows = ScanToStrings(TableRange(table, options));
975
    ASSERT_EQ(8 - 5, rows.size());
976
    EXPECT_EQ(all_rows[5], rows.front());
977
    EXPECT_EQ(all_rows[7], rows.back());
978
  }
979
980
  // Test a double-sided range spanning tablets
981
  {
982
    TableIteratorOptions options;
983
    options.filter = FilterBetween(5, Inclusive::kTrue, 15, Inclusive::kFalse);
984
    auto rows = ScanToStrings(TableRange(table, options));
985
    ASSERT_EQ(15 - 5, rows.size());
986
    EXPECT_EQ(all_rows[5], rows.front());
987
    EXPECT_EQ(all_rows[14], rows.back());
988
  }
989
990
  // Test a double-sided range within second tablet
991
  {
992
    TableIteratorOptions options;
993
    options.filter = FilterBetween(15, Inclusive::kTrue, 20, Inclusive::kFalse);
994
    auto rows = ScanToStrings(TableRange(table, options));
995
    ASSERT_EQ(20 - 15, rows.size());
996
    EXPECT_EQ(all_rows[15], rows.front());
997
    EXPECT_EQ(all_rows[19], rows.back());
998
  }
999
1000
  // Test a lower-bound only range.
1001
  {
1002
    TableIteratorOptions options;
1003
    options.filter = FilterGreater(5, Inclusive::kTrue);
1004
    auto rows = ScanToStrings(TableRange(table, options));
1005
    ASSERT_EQ(95, rows.size());
1006
    EXPECT_EQ(all_rows[5], rows.front());
1007
    EXPECT_EQ(all_rows[99], rows.back());
1008
  }
1009
1010
  // Test an upper-bound only range in first tablet.
1011
  {
1012
    TableIteratorOptions options;
1013
    options.filter = FilterLess(5, Inclusive::kFalse);
1014
    auto rows = ScanToStrings(TableRange(table, options));
1015
    ASSERT_EQ(5, rows.size());
1016
    EXPECT_EQ(all_rows[0], rows.front());
1017
    EXPECT_EQ(all_rows[4], rows.back());
1018
  }
1019
1020
  // Test an upper-bound only range in second tablet.
1021
  {
1022
    TableIteratorOptions options;
1023
    options.filter = FilterLess(15, Inclusive::kFalse);
1024
    auto rows = ScanToStrings(TableRange(table, options));
1025
    ASSERT_EQ(15, rows.size());
1026
    EXPECT_EQ(all_rows[0], rows.front());
1027
    EXPECT_EQ(all_rows[14], rows.back());
1028
  }
1029
}
1030
1031
0
static YBError* GetSingleErrorFromFlushStatus(const FlushStatus& flush_status) {
1032
0
  CHECK_EQ(1, flush_status.errors.size());
1033
0
  return flush_status.errors.front().get();
1034
0
}
1035
1036
// Simplest case of inserting through the client API: a single row
1037
// with manual batching.
1038
// TODO Actually we need to check that hash columns present during insert. But it is not done yet.
1039
0
TEST_F(ClientTest, DISABLED_TestInsertSingleRowManualBatch) {
1040
0
  auto session = CreateSession();
1041
0
  ASSERT_FALSE(session->TEST_HasPendingOperations());
1042
1043
0
  auto insert = client_table_.NewInsertOp();
1044
  // Try inserting without specifying a key: should fail.
1045
0
  client_table_.AddInt32ColumnValue(insert->mutable_request(), "int_val", 54321);
1046
0
  client_table_.AddStringColumnValue(insert->mutable_request(), "string_val", "hello world");
1047
0
  ASSERT_OK(session->ApplyAndFlush(insert));
1048
0
  ASSERT_EQ(QLResponsePB::YQL_STATUS_RUNTIME_ERROR, insert->response().status());
1049
1050
  // Retry
1051
0
  QLAddInt32HashValue(insert->mutable_request(), 12345);
1052
0
  session->Apply(insert);
1053
0
  ASSERT_TRUE(session->TEST_HasPendingOperations()) << "Should be pending until we Flush";
1054
1055
0
  FlushSessionOrDie(session, { insert });
1056
0
}
1057
1058
namespace {
1059
1060
void ApplyInsertToSession(YBSession* session,
1061
                                    const TableHandle& table,
1062
                                    int row_key,
1063
                                    int int_val,
1064
                                    const char* string_val,
1065
0
                                    std::shared_ptr<YBqlOp>* op = nullptr) {
1066
0
  auto insert = table.NewInsertOp();
1067
0
  QLAddInt32HashValue(insert->mutable_request(), row_key);
1068
0
  table.AddInt32ColumnValue(insert->mutable_request(), "int_val", int_val);
1069
0
  table.AddStringColumnValue(insert->mutable_request(), "string_val", string_val);
1070
0
  if (op) {
1071
0
    *op = insert;
1072
0
  }
1073
0
  session->Apply(insert);
1074
0
}
1075
1076
void ApplyUpdateToSession(YBSession* session,
1077
                                    const TableHandle& table,
1078
                                    int row_key,
1079
0
                                    int int_val) {
1080
0
  auto update = table.NewUpdateOp();
1081
0
  QLAddInt32HashValue(update->mutable_request(), row_key);
1082
0
  table.AddInt32ColumnValue(update->mutable_request(), "int_val", int_val);
1083
0
  session->Apply(update);
1084
0
}
1085
1086
void ApplyDeleteToSession(YBSession* session,
1087
                                    const TableHandle& table,
1088
0
                                    int row_key) {
1089
0
  auto del = table.NewDeleteOp();
1090
0
  QLAddInt32HashValue(del->mutable_request(), row_key);
1091
0
  session->Apply(del);
1092
0
}
1093
1094
} // namespace
1095
1096
0
TEST_F(ClientTest, TestWriteTimeout) {
1097
0
  auto session = CreateSession();
1098
1099
0
  LOG(INFO) << "Time out the lookup on the master side";
1100
0
  {
1101
0
    google::FlagSaver saver;
1102
0
    FLAGS_master_inject_latency_on_tablet_lookups_ms = 110;
1103
0
    session->SetTimeout(100ms);
1104
0
    ApplyInsertToSession(session.get(), client_table_, 1, 1, "row");
1105
0
    const auto flush_status = session->FlushAndGetOpsErrors();
1106
0
    ASSERT_TRUE(flush_status.status.IsIOError())
1107
0
        << "unexpected status: " << flush_status.status.ToString();
1108
0
    auto error = GetSingleErrorFromFlushStatus(flush_status);
1109
0
    ASSERT_TRUE(error->status().IsTimedOut()) << error->status().ToString();
1110
0
    ASSERT_TRUE(std::regex_match(
1111
0
        error->status().ToString(),
1112
0
        std::regex(".*GetTableLocations \\{.*\\} failed: RPC timed out after deadline expired.*")))
1113
0
        << error->status().ToString();
1114
0
  }
1115
1116
0
  LOG(INFO) << "Time out the actual write on the tablet server";
1117
0
  {
1118
0
    google::FlagSaver saver;
1119
0
    SetAtomicFlag(true, &FLAGS_log_inject_latency);
1120
0
    SetAtomicFlag(110, &FLAGS_log_inject_latency_ms_mean);
1121
0
    SetAtomicFlag(0, &FLAGS_log_inject_latency_ms_stddev);
1122
1123
0
    ApplyInsertToSession(session.get(), client_table_, 1, 1, "row");
1124
0
    const auto flush_status = session->FlushAndGetOpsErrors();
1125
0
    ASSERT_TRUE(flush_status.status.IsIOError()) << AsString(flush_status.status.ToString());
1126
0
    auto error = GetSingleErrorFromFlushStatus(flush_status);
1127
0
    ASSERT_TRUE(error->status().IsTimedOut()) << error->status().ToString();
1128
0
  }
1129
0
}
1130
1131
// Test which does an async flush and then drops the reference
1132
// to the Session. This should still call the callback.
1133
0
TEST_F(ClientTest, TestAsyncFlushResponseAfterSessionDropped) {
1134
0
  auto session = CreateSession();
1135
0
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "row");
1136
0
  auto flush_future = session->FlushFuture();
1137
0
  session.reset();
1138
0
  ASSERT_OK(flush_future.get().status);
1139
1140
  // Try again, this time should not have an error response (to re-insert the same row).
1141
0
  session = CreateSession();
1142
0
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "row");
1143
0
  ASSERT_EQ(1, session->TEST_CountBufferedOperations());
1144
0
  ASSERT_TRUE(session->HasNotFlushedOperations());
1145
0
  flush_future = session->FlushFuture();
1146
0
  ASSERT_EQ(0, session->TEST_CountBufferedOperations());
1147
0
  ASSERT_FALSE(session->HasNotFlushedOperations());
1148
0
  session.reset();
1149
0
  ASSERT_OK(flush_future.get().status);
1150
0
}
1151
1152
0
TEST_F(ClientTest, TestSessionClose) {
1153
0
  auto session = CreateSession();
1154
0
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "row");
1155
  // Closing the session now should return Status::IllegalState since we
1156
  // have a pending operation.
1157
0
  ASSERT_TRUE(session->Close().IsIllegalState());
1158
1159
0
  ASSERT_OK(session->Flush());
1160
1161
0
  ASSERT_OK(session->Close());
1162
0
}
1163
1164
// Test which sends multiple batches through the same session, each of which
1165
// contains multiple rows spread across multiple tablets.
1166
0
TEST_F(ClientTest, TestMultipleMultiRowManualBatches) {
1167
0
  auto session = CreateSession();
1168
1169
0
  const int kNumBatches = 5;
1170
0
  const int kRowsPerBatch = 10;
1171
1172
0
  int row_key = 0;
1173
1174
0
  for (int batch_num = 0; batch_num < kNumBatches; batch_num++) {
1175
0
    for (int i = 0; i < kRowsPerBatch; i++) {
1176
0
      ApplyInsertToSession(
1177
0
          session.get(),
1178
0
          (row_key % 2 == 0) ? client_table_ : client_table2_,
1179
0
          row_key, row_key * 10, "hello world");
1180
0
      row_key++;
1181
0
    }
1182
0
    ASSERT_TRUE(session->TEST_HasPendingOperations()) << "Should be pending until we Flush";
1183
0
    FlushSessionOrDie(session);
1184
0
    ASSERT_FALSE(session->TEST_HasPendingOperations())
1185
0
        << "Should have no more pending ops after flush";
1186
0
  }
1187
1188
0
  const int kNumRowsPerTablet = kNumBatches * kRowsPerBatch / 2;
1189
0
  ASSERT_EQ(kNumRowsPerTablet, CountRowsFromClient(client_table_));
1190
0
  ASSERT_EQ(kNumRowsPerTablet, CountRowsFromClient(client_table2_));
1191
1192
  // Verify the data looks right.
1193
0
  auto rows = ScanTableToStrings(client_table_);
1194
0
  std::sort(rows.begin(), rows.end());
1195
0
  ASSERT_EQ(kNumRowsPerTablet, rows.size());
1196
0
  ASSERT_EQ("{ int32:0, int32:0, string:\"hello world\", null }", rows[0]);
1197
0
}
1198
1199
// Test a batch where one of the inserted rows succeeds and duplicates succeed too.
1200
TEST_F(ClientTest, TestBatchWithDuplicates) {
1201
  auto session = CreateSession();
1202
1203
  // Insert a row with key "1"
1204
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "original row");
1205
  FlushSessionOrDie(session);
1206
1207
  // Now make a batch that has key "1" along with
1208
  // key "2" which will succeed. Flushing should not return an error.
1209
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "Attempted dup");
1210
  ApplyInsertToSession(session.get(), client_table_, 2, 1, "Should succeed");
1211
  Status s = session->Flush();
1212
  ASSERT_TRUE(s.ok());
1213
1214
  // Verify that the other row was successfully inserted
1215
  auto rows = ScanTableToStrings(client_table_);
1216
  ASSERT_EQ(2, rows.size());
1217
  std::sort(rows.begin(), rows.end());
1218
  ASSERT_EQ("{ int32:1, int32:1, string:\"Attempted dup\", null }", rows[0]);
1219
  ASSERT_EQ("{ int32:2, int32:1, string:\"Should succeed\", null }", rows[1]);
1220
}
1221
1222
// Test flushing an empty batch (should be a no-op).
1223
0
TEST_F(ClientTest, TestEmptyBatch) {
1224
0
  auto session = CreateSession();
1225
0
  FlushSessionOrDie(session);
1226
0
}
1227
1228
0
void ClientTest::DoTestWriteWithDeadServer(WhichServerToKill which) {
1229
0
  DontVerifyClusterBeforeNextTearDown();
1230
0
  auto session = CreateSession();
1231
0
  session->SetTimeout(1s);
1232
1233
  // Shut down the server.
1234
0
  switch (which) {
1235
0
    case DEAD_MASTER:
1236
0
      cluster_->mini_master()->Shutdown();
1237
0
      break;
1238
0
    case DEAD_TSERVER:
1239
0
      for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) {
1240
0
        cluster_->mini_tablet_server(i)->Shutdown();
1241
0
      }
1242
0
      break;
1243
0
  }
1244
1245
  // Try a write.
1246
0
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "x");
1247
0
  const auto flush_status = session->FlushAndGetOpsErrors();
1248
0
  ASSERT_TRUE(flush_status.status.IsIOError()) << flush_status.status.ToString();
1249
1250
0
  auto error = GetSingleErrorFromFlushStatus(flush_status);
1251
0
  switch (which) {
1252
0
    case DEAD_MASTER:
1253
      // Only one master, so no retry for finding the new leader master.
1254
0
      ASSERT_TRUE(error->status().IsTimedOut());
1255
0
      break;
1256
0
    case DEAD_TSERVER:
1257
0
      ASSERT_TRUE(error->status().IsTimedOut());
1258
0
      auto pos = error->status().ToString().find("Connection refused");
1259
0
      if (pos == std::string::npos) {
1260
0
        pos = error->status().ToString().find("Broken pipe");
1261
0
      }
1262
0
      ASSERT_NE(std::string::npos, pos);
1263
0
      break;
1264
0
  }
1265
1266
0
  ASSERT_STR_CONTAINS(error->failed_op().ToString(), "QL_WRITE");
1267
0
}
1268
1269
// Test error handling cases where the master is down (tablet resolution fails)
1270
0
TEST_F(ClientTest, TestWriteWithDeadMaster) {
1271
0
  client_->data_->default_admin_operation_timeout_ = MonoDelta::FromSeconds(1);
1272
0
  DoTestWriteWithDeadServer(DEAD_MASTER);
1273
0
}
1274
1275
// Test error handling when the TS is down (actual write fails its RPC)
1276
0
TEST_F(ClientTest, TestWriteWithDeadTabletServer) {
1277
0
  DoTestWriteWithDeadServer(DEAD_TSERVER);
1278
0
}
1279
1280
0
void ClientTest::DoApplyWithoutFlushTest(int sleep_micros) {
1281
0
  auto session = CreateSession();
1282
0
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "x");
1283
0
  SleepFor(MonoDelta::FromMicroseconds(sleep_micros));
1284
0
  session.reset(); // should not crash!
1285
1286
  // Should have no rows.
1287
0
  auto rows = ScanTableToStrings(client_table_);
1288
0
  ASSERT_EQ(0, rows.size());
1289
0
}
1290
1291
1292
// Applies some updates to the session, and then drops the reference to the
1293
// Session before flushing. Makes sure that the tablet resolution callbacks
1294
// properly deal with the session disappearing underneath.
1295
//
1296
// This test doesn't sleep between applying the operations and dropping the
1297
// reference, in hopes that the reference will be dropped while DNS is still
1298
// in-flight, etc.
1299
0
TEST_F(ClientTest, TestApplyToSessionWithoutFlushing_OpsInFlight) {
1300
0
  DoApplyWithoutFlushTest(0);
1301
0
}
1302
1303
// Same as the above, but sleeps a little bit after applying the operations,
1304
// so that the operations are already in the per-TS-buffer.
1305
0
TEST_F(ClientTest, TestApplyToSessionWithoutFlushing_OpsBuffered) {
1306
0
  DoApplyWithoutFlushTest(10000);
1307
0
}
1308
1309
// Apply a large amount of data without calling Flush(), and ensure
1310
// that we get an error on Apply() rather than sending a too-large
1311
// RPC to the server.
1312
0
TEST_F(ClientTest, DISABLED_TestApplyTooMuchWithoutFlushing) {
1313
  // Applying a bunch of small rows without a flush should result
1314
  // in an error.
1315
0
  {
1316
0
    bool got_expected_error = false;
1317
0
    auto session = CreateSession();
1318
0
    for (int i = 0; i < 1000000; i++) {
1319
0
      ApplyInsertToSession(session.get(), client_table_, 1, 1, "x");
1320
0
    }
1321
0
    ASSERT_TRUE(got_expected_error);
1322
0
  }
1323
1324
  // Writing a single very large row should also result in an error.
1325
0
  {
1326
0
    string huge_string(10 * 1024 * 1024, 'x');
1327
1328
0
    shared_ptr<YBSession> session = client_->NewSession();
1329
0
    ApplyInsertToSession(session.get(), client_table_, 1, 1, huge_string.c_str());
1330
0
  }
1331
0
}
1332
1333
// Test that update updates and delete deletes with expected use
1334
TEST_F(ClientTest, TestMutationsWork) {
1335
  auto session = CreateSession();
1336
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "original row");
1337
  FlushSessionOrDie(session);
1338
1339
  ApplyUpdateToSession(session.get(), client_table_, 1, 2);
1340
  FlushSessionOrDie(session);
1341
  auto rows = ScanTableToStrings(client_table_);
1342
  ASSERT_EQ(1, rows.size());
1343
  ASSERT_EQ("{ int32:1, int32:2, string:\"original row\", null }", rows[0]);
1344
  rows.clear();
1345
1346
  ApplyDeleteToSession(session.get(), client_table_, 1);
1347
  FlushSessionOrDie(session);
1348
  ScanTableToStrings(client_table_, &rows);
1349
  ASSERT_EQ(0, rows.size());
1350
}
1351
1352
TEST_F(ClientTest, TestMutateDeletedRow) {
1353
  auto session = CreateSession();
1354
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "original row");
1355
  FlushSessionOrDie(session);
1356
  ApplyDeleteToSession(session.get(), client_table_, 1);
1357
  FlushSessionOrDie(session);
1358
  auto rows = ScanTableToStrings(client_table_);
1359
  ASSERT_EQ(0, rows.size());
1360
1361
  // Attempt update deleted row
1362
  ApplyUpdateToSession(session.get(), client_table_, 1, 2);
1363
  Status s = session->Flush();
1364
  ASSERT_TRUE(s.ok());
1365
  ScanTableToStrings(client_table_, &rows);
1366
  ASSERT_EQ(1, rows.size());
1367
1368
  // Attempt delete deleted row
1369
  ApplyDeleteToSession(session.get(), client_table_, 1);
1370
  s = session->Flush();
1371
  ASSERT_TRUE(s.ok());
1372
  ScanTableToStrings(client_table_, &rows);
1373
  ASSERT_EQ(0, rows.size());
1374
}
1375
1376
TEST_F(ClientTest, TestMutateNonexistentRow) {
1377
  auto session = CreateSession();
1378
1379
  // Attempt update nonexistent row
1380
  ApplyUpdateToSession(session.get(), client_table_, 1, 2);
1381
  Status s = session->Flush();
1382
  ASSERT_TRUE(s.ok());
1383
  auto rows = ScanTableToStrings(client_table_);
1384
  ASSERT_EQ(1, rows.size());
1385
1386
  // Attempt delete nonexistent row
1387
  ApplyDeleteToSession(session.get(), client_table_, 1);
1388
  s = session->Flush();
1389
  ASSERT_TRUE(s.ok());
1390
  ScanTableToStrings(client_table_, &rows);
1391
  ASSERT_EQ(0, rows.size());
1392
}
1393
1394
// Do a write with a bad schema on the client side. This should make the Prepare
1395
// phase of the write fail, which will result in an error on the RPC response.
1396
0
TEST_F(ClientTest, TestWriteWithBadSchema) {
1397
  // Remove the 'int_val' column.
1398
  // Now the schema on the client is "old"
1399
0
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
1400
0
  ASSERT_OK(table_alterer->DropColumn("int_val")->Alter());
1401
1402
  // Try to do a write with the bad schema.
1403
0
  auto session = CreateSession();
1404
0
  std::shared_ptr<YBqlOp> op;
1405
0
  ApplyInsertToSession(session.get(), client_table_, 12345, 12345, "x", &op);
1406
0
  ASSERT_OK(session->Flush());
1407
0
  ASSERT_EQ(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH, op->response().status());
1408
0
}
1409
1410
0
TEST_F(ClientTest, TestBasicAlterOperations) {
1411
  // test that having no steps throws an error
1412
0
  {
1413
0
    std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
1414
0
    Status s = table_alterer->Alter();
1415
0
    ASSERT_TRUE(s.IsInvalidArgument());
1416
0
    ASSERT_STR_CONTAINS(s.ToString(), "No alter steps provided");
1417
0
  }
1418
1419
  // test that remove key should throws an error
1420
0
  {
1421
0
    std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
1422
0
    Status s = table_alterer
1423
0
      ->DropColumn("key")
1424
0
      ->Alter();
1425
0
    ASSERT_TRUE(s.IsInvalidArgument());
1426
0
    ASSERT_STR_CONTAINS(s.ToString(), "cannot remove a key column");
1427
0
  }
1428
1429
  // test that renaming to an already-existing name throws an error
1430
0
  {
1431
0
    std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
1432
0
    table_alterer->AlterColumn("int_val")->RenameTo("string_val");
1433
0
    Status s = table_alterer->Alter();
1434
0
    ASSERT_TRUE(s.IsAlreadyPresent());
1435
0
    ASSERT_STR_CONTAINS(s.ToString(), "The column already exists: string_val");
1436
0
  }
1437
1438
  // Need a tablet peer for the next set of tests.
1439
0
  string tablet_id = GetFirstTabletId(client_table_.get());
1440
0
  std::shared_ptr<TabletPeer> tablet_peer;
1441
1442
0
  for (auto& ts : cluster_->mini_tablet_servers()) {
1443
0
    ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(tablet_id, &tablet_peer));
1444
0
    if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
1445
0
      break;
1446
0
    }
1447
0
  }
1448
1449
0
  {
1450
0
    std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
1451
0
    table_alterer->DropColumn("int_val")
1452
0
      ->AddColumn("new_col")->Type(INT32);
1453
0
    ASSERT_OK(table_alterer->Alter());
1454
    // TODO(nspiegelberg): The below assert is flakey because of KUDU-1539.
1455
0
    ASSERT_EQ(1, tablet_peer->tablet()->metadata()->schema_version());
1456
0
  }
1457
1458
0
  {
1459
0
    const YBTableName kRenamedTableName(YQL_DATABASE_CQL, kKeyspaceName, "RenamedTable");
1460
0
    std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
1461
0
    ASSERT_OK(table_alterer
1462
0
              ->RenameTo(kRenamedTableName)
1463
0
              ->Alter());
1464
    // TODO(nspiegelberg): The below assert is flakey because of KUDU-1539.
1465
0
    ASSERT_EQ(2, tablet_peer->tablet()->metadata()->schema_version());
1466
0
    ASSERT_EQ(kRenamedTableName.table_name(), tablet_peer->tablet()->metadata()->table_name());
1467
1468
0
    const auto tables = ASSERT_RESULT(client_->ListTables());
1469
0
    ASSERT_TRUE(::util::gtl::contains(tables.begin(), tables.end(), kRenamedTableName));
1470
0
    ASSERT_FALSE(::util::gtl::contains(tables.begin(), tables.end(), kTableName));
1471
0
  }
1472
0
}
1473
1474
0
TEST_F(ClientTest, TestDeleteTable) {
1475
  // Open the table before deleting it.
1476
0
  ASSERT_OK(client_table_.Open(kTableName, client_.get()));
1477
1478
  // Insert a few rows, and scan them back. This is to populate the MetaCache.
1479
0
  ASSERT_NO_FATALS(InsertTestRows(client_table_, 10));
1480
0
  auto rows = ScanTableToStrings(client_table_);
1481
0
  ASSERT_EQ(10, rows.size());
1482
1483
  // Remove the table
1484
  // NOTE that it returns when the operation is completed on the master side
1485
0
  string tablet_id = GetFirstTabletId(client_table_.get());
1486
0
  ASSERT_OK(client_->DeleteTable(kTableName));
1487
0
  const auto tables = ASSERT_RESULT(client_->ListTables());
1488
0
  ASSERT_FALSE(::util::gtl::contains(tables.begin(), tables.end(), kTableName));
1489
1490
  // Wait until the table is removed from the TS
1491
0
  int wait_time = 1000;
1492
0
  bool tablet_found = true;
1493
0
  for (int i = 0; i < 80 && tablet_found; ++i) {
1494
0
    std::shared_ptr<TabletPeer> tablet_peer;
1495
0
    tablet_found = cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(
1496
0
                      tablet_id, &tablet_peer);
1497
0
    SleepFor(MonoDelta::FromMicroseconds(wait_time));
1498
0
    wait_time = std::min(wait_time * 5 / 4, 1000000);
1499
0
  }
1500
0
  ASSERT_FALSE(tablet_found);
1501
1502
  // Try to open the deleted table
1503
0
  Status s = client_table_.Open(kTableName, client_.get());
1504
0
  ASSERT_TRUE(TableNotFound(s)) << s;
1505
1506
  // Create a new table with the same name. This is to ensure that the client
1507
  // doesn't cache anything inappropriately by table name (see KUDU-1055).
1508
0
  ASSERT_NO_FATALS(CreateTable(kTableName, kNumTablets, &client_table_));
1509
1510
  // Should be able to insert successfully into the new table.
1511
0
  ASSERT_NO_FATALS(InsertTestRows(client_table_, 10));
1512
0
}
1513
1514
0
TEST_F(ClientTest, TestGetTableSchema) {
1515
0
  YBSchema schema;
1516
0
  PartitionSchema partition_schema;
1517
1518
  // Verify the schema for the current table
1519
0
  ASSERT_OK(client_->GetTableSchema(kTableName, &schema, &partition_schema));
1520
0
  ASSERT_TRUE(schema_.Equals(schema));
1521
1522
  // Verify that a get schema request for a missing table throws not found
1523
0
  Status s = client_->GetTableSchema(
1524
0
      YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "MissingTableName"), &schema, &partition_schema);
1525
0
  ASSERT_TRUE(TableNotFound(s)) << s;
1526
0
}
1527
1528
0
TEST_F(ClientTest, TestGetTableSchemaByIdAsync) {
1529
0
  Synchronizer sync;
1530
0
  auto table_info = std::make_shared<YBTableInfo>();
1531
0
  ASSERT_OK(client_->GetTableSchemaById(
1532
0
      client_table_.table()->id(), table_info, sync.AsStatusCallback()));
1533
0
  ASSERT_OK(sync.Wait());
1534
0
  ASSERT_TRUE(schema_.Equals(table_info->schema));
1535
0
}
1536
1537
0
TEST_F(ClientTest, TestGetTableSchemaByIdMissingTable) {
1538
  // Verify that a get schema request for a missing table throws not found.
1539
0
  Synchronizer sync;
1540
0
  auto table_info = std::make_shared<YBTableInfo>();
1541
0
  ASSERT_OK(client_->GetTableSchemaById("MissingTableId", table_info, sync.AsStatusCallback()));
1542
0
  Status s = sync.Wait();
1543
0
  ASSERT_TRUE(TableNotFound(s)) << s;
1544
0
}
1545
1546
0
TEST_F(ClientTest, TestCreateCDCStreamAsync) {
1547
0
  std::promise<Result<CDCStreamId>> promise;
1548
0
  std::unordered_map<std::string, std::string> options;
1549
0
  client_->CreateCDCStream(
1550
0
      client_table_.table()->id(), options,
1551
0
      [&promise](const auto& stream) { promise.set_value(stream); });
1552
0
  auto stream = promise.get_future().get();
1553
0
  ASSERT_OK(stream);
1554
0
  ASSERT_FALSE(stream->empty());
1555
0
}
1556
1557
TEST_F(ClientTest, TestCreateCDCStreamMissingTable) {
1558
  std::promise<Result<CDCStreamId>> promise;
1559
  std::unordered_map<std::string, std::string> options;
1560
  client_->CreateCDCStream(
1561
      "MissingTableId", options,
1562
0
      [&promise](const auto& stream) { promise.set_value(stream); });
1563
  auto stream = promise.get_future().get();
1564
  ASSERT_NOK(stream);
1565
  ASSERT_TRUE(TableNotFound(stream.status())) << stream.status();
1566
}
1567
1568
0
TEST_F(ClientTest, TestDeleteCDCStreamAsync) {
1569
0
  std::unordered_map<std::string, std::string> options;
1570
0
  auto result = client_->CreateCDCStream(client_table_.table()->id(), options);
1571
0
  ASSERT_TRUE(result.ok());
1572
1573
  // Delete the created CDC stream.
1574
0
  Synchronizer sync;
1575
0
  client_->DeleteCDCStream(*result, sync.AsStatusCallback());
1576
0
  ASSERT_OK(sync.Wait());
1577
0
}
1578
1579
0
TEST_F(ClientTest, TestDeleteCDCStreamMissingId) {
1580
  // Try to delete a non-existent CDC stream.
1581
0
  Synchronizer sync;
1582
0
  client_->DeleteCDCStream("MissingStreamId", sync.AsStatusCallback());
1583
0
  Status s = sync.Wait();
1584
0
  ASSERT_TRUE(TableNotFound(s)) << s;
1585
0
}
1586
1587
0
TEST_F(ClientTest, TestStaleLocations) {
1588
0
  string tablet_id = GetFirstTabletId(client_table2_.get());
1589
1590
  // The Tablet is up and running the location should not be stale
1591
0
  master::TabletLocationsPB locs_pb;
1592
0
  ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTabletLocations(
1593
0
                  tablet_id, &locs_pb));
1594
0
  ASSERT_FALSE(locs_pb.stale());
1595
1596
  // On Master restart and no tablet report we expect the locations to be stale
1597
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) {
1598
0
    cluster_->mini_tablet_server(i)->Shutdown();
1599
0
  }
1600
0
  ASSERT_OK(cluster_->mini_master()->Restart());
1601
0
  ASSERT_OK(cluster_->mini_master()->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests());
1602
0
  locs_pb.Clear();
1603
0
  ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTabletLocations(tablet_id, &locs_pb));
1604
0
  ASSERT_TRUE(locs_pb.stale());
1605
1606
  // Restart the TS and Wait for the tablets to be reported to the master.
1607
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) {
1608
0
    ASSERT_OK(cluster_->mini_tablet_server(i)->Start());
1609
0
  }
1610
0
  ASSERT_OK(cluster_->WaitForTabletServerCount(cluster_->num_tablet_servers()));
1611
0
  locs_pb.Clear();
1612
0
  ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTabletLocations(tablet_id, &locs_pb));
1613
1614
  // It may take a while to bootstrap the tablet and send the location report
1615
  // so spin until we get a non-stale location.
1616
0
  int wait_time = 1000;
1617
0
  for (int i = 0; i < 80; ++i) {
1618
0
    locs_pb.Clear();
1619
0
    ASSERT_OK(cluster_->mini_master()->catalog_manager().GetTabletLocations(tablet_id, &locs_pb));
1620
0
    if (!locs_pb.stale()) {
1621
0
      break;
1622
0
    }
1623
0
    SleepFor(MonoDelta::FromMicroseconds(wait_time));
1624
0
    wait_time = std::min(wait_time * 5 / 4, 1000000);
1625
0
  }
1626
0
  ASSERT_FALSE(locs_pb.stale());
1627
0
}
1628
1629
// Test creating and accessing a table which has multiple tablets,
1630
// each of which is replicated.
1631
//
1632
// TODO: this should probably be the default for _all_ of the tests
1633
// in this file. However, some things like alter table are not yet
1634
// working on replicated tables - see KUDU-304
1635
0
TEST_F(ClientTest, TestReplicatedMultiTabletTable) {
1636
0
  const YBTableName kReplicatedTable(YQL_DATABASE_CQL, "replicated");
1637
0
  const int kNumRowsToWrite = 100;
1638
1639
0
  TableHandle table;
1640
0
  ASSERT_NO_FATALS(CreateTable(kReplicatedTable,
1641
0
                               kNumTablets,
1642
0
                               &table));
1643
1644
  // Should have no rows to begin with.
1645
0
  ASSERT_EQ(0, CountRowsFromClient(table));
1646
1647
  // Insert some data.
1648
0
  ASSERT_NO_FATALS(InsertTestRows(table, kNumRowsToWrite));
1649
1650
  // Should now see the data.
1651
0
  ASSERT_EQ(kNumRowsToWrite, CountRowsFromClient(table));
1652
1653
  // TODO: once leader re-election is in, should somehow force a re-election
1654
  // and ensure that the client handles refreshing the leader.
1655
0
}
1656
1657
0
TEST_F(ClientTest, TestReplicatedMultiTabletTableFailover) {
1658
0
  const YBTableName kReplicatedTable(YQL_DATABASE_CQL, "replicated_failover_on_reads");
1659
0
  const int kNumRowsToWrite = 100;
1660
0
  const int kNumTries = 100;
1661
1662
0
  TableHandle table;
1663
0
  ASSERT_NO_FATALS(CreateTable(kReplicatedTable,
1664
0
                               kNumTablets,
1665
0
                               &table));
1666
1667
  // Insert some data.
1668
0
  ASSERT_NO_FATALS(InsertTestRows(table, kNumRowsToWrite));
1669
1670
  // Find the leader of the first tablet.
1671
0
  auto remote_tablet = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), table.table()).get());
1672
0
  internal::RemoteTabletServer *remote_tablet_server = remote_tablet->LeaderTServer();
1673
1674
  // Kill the leader of the first tablet.
1675
0
  ASSERT_OK(KillTServer(remote_tablet_server->permanent_uuid()));
1676
1677
  // We wait until we fail over to the new leader(s).
1678
0
  int tries = 0;
1679
0
  for (;;) {
1680
0
    tries++;
1681
0
    auto num_rows = CountRowsFromClient(table);
1682
0
    if (num_rows == kNumRowsToWrite) {
1683
0
      LOG(INFO) << "Found expected number of rows: " << num_rows;
1684
0
      break;
1685
0
    } else {
1686
0
      LOG(INFO) << "Only found " << num_rows << " rows on try "
1687
0
                << tries << ", retrying";
1688
0
      ASSERT_LE(tries, kNumTries);
1689
0
      SleepFor(MonoDelta::FromMilliseconds(10 * tries)); // sleep a bit more with each attempt.
1690
0
    }
1691
0
  }
1692
0
}
1693
1694
// This test that we can keep writing to a tablet when the leader
1695
// tablet dies.
1696
// This currently forces leader promotion through RPC and creates
1697
// a new client afterwards.
1698
// TODO Remove the leader promotion part when we have automated
1699
// leader election.
1700
0
TEST_F(ClientTest, TestReplicatedTabletWritesAndAltersWithLeaderElection) {
1701
0
  const YBTableName kReplicatedTable(YQL_DATABASE_CQL, kKeyspaceName,
1702
0
     "replicated_failover_on_writes");
1703
0
  const int kNumRowsToWrite = 100;
1704
1705
0
  TableHandle table;
1706
0
  ASSERT_NO_FATALS(CreateTable(kReplicatedTable,
1707
0
                               1,
1708
0
                               &table));
1709
1710
  // Insert some data.
1711
0
  ASSERT_NO_FATALS(InsertTestRows(table, kNumRowsToWrite));
1712
1713
  // TODO: we have to sleep here to make sure that the leader has time to
1714
  // propagate the writes to the followers. We can remove this once the
1715
  // followers run a leader election on their own and handle advancing
1716
  // the commit index.
1717
0
  SleepFor(MonoDelta::FromMilliseconds(1500));
1718
1719
  // Find the leader replica
1720
0
  auto remote_tablet = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), table.table()).get());
1721
0
  internal::RemoteTabletServer *remote_tablet_server;
1722
0
  set<string> blacklist;
1723
0
  vector<internal::RemoteTabletServer*> candidates;
1724
0
  ASSERT_OK(client_->data_->GetTabletServer(client_.get(),
1725
0
                                            remote_tablet,
1726
0
                                            YBClient::LEADER_ONLY,
1727
0
                                            blacklist,
1728
0
                                            &candidates,
1729
0
                                            &remote_tablet_server));
1730
1731
0
  string killed_uuid = remote_tablet_server->permanent_uuid();
1732
  // Kill the tserver that is serving the leader tablet.
1733
0
  ASSERT_OK(KillTServer(killed_uuid));
1734
1735
  // Since we waited before, hopefully all replicas will be up to date
1736
  // and we can just promote another replica.
1737
0
  auto client_messenger = rpc::CreateAutoShutdownMessengerHolder(
1738
0
      ASSERT_RESULT(CreateMessenger("client")));
1739
0
  ssize_t new_leader_idx = -1;
1740
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
1741
0
    MiniTabletServer* ts = cluster_->mini_tablet_server(i);
1742
0
    LOG(INFO) << "GOT TS " << i << " WITH UUID ???";
1743
0
    if (ts->is_started()) {
1744
0
      const string& uuid = ts->server()->instance_pb().permanent_uuid();
1745
0
      LOG(INFO) << uuid;
1746
0
      if (uuid != killed_uuid) {
1747
0
        new_leader_idx = i;
1748
0
        break;
1749
0
      }
1750
0
    }
1751
0
  }
1752
0
  ASSERT_NE(-1, new_leader_idx);
1753
1754
0
  MiniTabletServer* new_leader = cluster_->mini_tablet_server(new_leader_idx);
1755
0
  ASSERT_TRUE(new_leader != nullptr);
1756
0
  rpc::ProxyCache proxy_cache(client_messenger.get());
1757
0
  consensus::ConsensusServiceProxy new_leader_proxy(
1758
0
      &proxy_cache, HostPort::FromBoundEndpoint(new_leader->bound_rpc_addr()));
1759
1760
0
  consensus::RunLeaderElectionRequestPB req;
1761
0
  consensus::RunLeaderElectionResponsePB resp;
1762
0
  rpc::RpcController controller;
1763
1764
0
  LOG(INFO) << "Promoting server at index " << new_leader_idx << " listening at "
1765
0
            << new_leader->bound_rpc_addr() << " ...";
1766
0
  req.set_dest_uuid(new_leader->server()->fs_manager()->uuid());
1767
0
  req.set_tablet_id(remote_tablet->tablet_id());
1768
0
  ASSERT_OK(new_leader_proxy.RunLeaderElection(req, &resp, &controller));
1769
0
  ASSERT_FALSE(resp.has_error()) << "Got error. Response: " << resp.ShortDebugString();
1770
1771
0
  LOG(INFO) << "Inserting additional rows...";
1772
0
  ASSERT_NO_FATALS(InsertTestRows(table,
1773
0
                                  kNumRowsToWrite,
1774
0
                                  kNumRowsToWrite));
1775
1776
  // TODO: we have to sleep here to make sure that the leader has time to
1777
  // propagate the writes to the followers. We can remove this once the
1778
  // followers run a leader election on their own and handle advancing
1779
  // the commit index.
1780
0
  SleepFor(MonoDelta::FromMilliseconds(1500));
1781
1782
0
  LOG(INFO) << "Counting rows...";
1783
0
  ASSERT_EQ(2 * kNumRowsToWrite, CountRowsFromClient(table,
1784
0
                                                     YBConsistencyLevel::CONSISTENT_PREFIX,
1785
0
                                                     kNoBound, kNoBound));
1786
1787
  // Test altering the table metadata and ensure that meta operations are resilient as well.
1788
0
  {
1789
0
    std::shared_ptr<TabletPeer> tablet_peer;
1790
0
    ASSERT_TRUE(new_leader->server()->tablet_manager()->LookupTablet(remote_tablet->tablet_id(),
1791
0
        &tablet_peer));
1792
0
    auto old_version = tablet_peer->tablet()->metadata()->schema_version();
1793
0
    std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kReplicatedTable));
1794
0
    table_alterer->AddColumn("new_col")->Type(INT32);
1795
0
    ASSERT_OK(table_alterer->Alter());
1796
0
    ASSERT_EQ(old_version + 1, tablet_peer->tablet()->metadata()->schema_version());
1797
0
  }
1798
0
}
1799
1800
namespace {
1801
1802
0
void CheckCorrectness(const TableHandle& table, int expected[], int nrows) {
1803
0
  int readrows = 0;
1804
1805
0
  for (const auto& row : TableRange(table)) {
1806
0
    ASSERT_LE(readrows, nrows);
1807
0
    int32_t key = row.column(0).int32_value();
1808
0
    ASSERT_NE(key, -1) << "Deleted key found in table in table " << key;
1809
0
    ASSERT_EQ(expected[key], row.column(1).int32_value())
1810
0
        << "Incorrect int value for key " <<  key;
1811
0
    ASSERT_EQ(row.column(2).string_value(), "")
1812
0
        << "Incorrect string value for key " << key;
1813
0
    ++readrows;
1814
0
  }
1815
0
  ASSERT_EQ(readrows, nrows);
1816
0
}
1817
1818
} // anonymous namespace
1819
1820
// Randomized mutations accuracy testing
1821
0
TEST_F(ClientTest, TestRandomWriteOperation) {
1822
0
  auto session = CreateSession();
1823
0
  int row[FLAGS_test_scan_num_rows]; // -1 indicates empty
1824
0
  int nrows;
1825
1826
  // First half-fill
1827
0
  for (int i = 0; i < FLAGS_test_scan_num_rows/2; ++i) {
1828
0
    ApplyInsertToSession(session.get(), client_table_, i, i, "");
1829
0
    row[i] = i;
1830
0
  }
1831
0
  for (int i = FLAGS_test_scan_num_rows/2; i < FLAGS_test_scan_num_rows; ++i) {
1832
0
    row[i] = -1;
1833
0
  }
1834
0
  nrows = FLAGS_test_scan_num_rows/2;
1835
1836
  // Randomized testing
1837
0
  LOG(INFO) << "Randomized mutations testing.";
1838
0
  unsigned int seed = SeedRandom();
1839
0
  for (int i = 0; i <= 1000; ++i) {
1840
    // Test correctness every so often
1841
0
    if (i % 50 == 0) {
1842
0
      LOG(INFO) << "Correctness test " << i;
1843
0
      FlushSessionOrDie(session);
1844
0
      ASSERT_NO_FATALS(CheckCorrectness(client_table_, row, nrows));
1845
0
      LOG(INFO) << "...complete";
1846
0
    }
1847
1848
0
    int change = rand_r(&seed) % FLAGS_test_scan_num_rows;
1849
    // Insert if empty
1850
0
    if (row[change] == -1) {
1851
0
      ApplyInsertToSession(session.get(), client_table_, change, change, "");
1852
0
      row[change] = change;
1853
0
      ++nrows;
1854
0
      VLOG(1) << "Insert " << change;
1855
0
    } else {
1856
      // Update or delete otherwise
1857
0
      int update = rand_r(&seed) & 1;
1858
0
      if (update) {
1859
0
        ApplyUpdateToSession(session.get(), client_table_, change, ++row[change]);
1860
0
        VLOG(1) << "Update " << change;
1861
0
      } else {
1862
0
        ApplyDeleteToSession(session.get(), client_table_, change);
1863
0
        row[change] = -1;
1864
0
        --nrows;
1865
0
        VLOG(1) << "Delete " << change;
1866
0
      }
1867
0
    }
1868
0
  }
1869
1870
  // And one more time for the last batch.
1871
0
  FlushSessionOrDie(session);
1872
0
  ASSERT_NO_FATALS(CheckCorrectness(client_table_, row, nrows));
1873
0
}
1874
1875
// Test whether a batch can handle several mutations in a batch
1876
TEST_F(ClientTest, TestSeveralRowMutatesPerBatch) {
1877
  auto session = CreateSession();
1878
1879
  // Test insert/update
1880
  LOG(INFO) << "Testing insert/update in same batch, key " << 1 << ".";
1881
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "");
1882
  ApplyUpdateToSession(session.get(), client_table_, 1, 2);
1883
  FlushSessionOrDie(session);
1884
  auto rows = ScanTableToStrings(client_table_);
1885
  ASSERT_EQ(1, rows.size());
1886
  ASSERT_EQ("{ int32:1, int32:2, string:\"\", null }", rows[0]);
1887
  rows.clear();
1888
1889
1890
  LOG(INFO) << "Testing insert/delete in same batch, key " << 2 << ".";
1891
  // Test insert/delete
1892
  ApplyInsertToSession(session.get(), client_table_, 2, 1, "");
1893
  ApplyDeleteToSession(session.get(), client_table_, 2);
1894
  FlushSessionOrDie(session);
1895
  ScanTableToStrings(client_table_, &rows);
1896
  ASSERT_EQ(1, rows.size());
1897
  ASSERT_EQ("{ int32:1, int32:2, string:\"\", null }", rows[0]);
1898
  rows.clear();
1899
1900
  // Test update/delete
1901
  LOG(INFO) << "Testing update/delete in same batch, key " << 1 << ".";
1902
  ApplyUpdateToSession(session.get(), client_table_, 1, 1);
1903
  ApplyDeleteToSession(session.get(), client_table_, 1);
1904
  FlushSessionOrDie(session);
1905
  ScanTableToStrings(client_table_, &rows);
1906
  ASSERT_EQ(0, rows.size());
1907
1908
  // Test delete/insert (insert a row first)
1909
  LOG(INFO) << "Inserting row for delete/insert test, key " << 1 << ".";
1910
  ApplyInsertToSession(session.get(), client_table_, 1, 1, "");
1911
  FlushSessionOrDie(session);
1912
  ScanTableToStrings(client_table_, &rows);
1913
  ASSERT_EQ(1, rows.size());
1914
  ASSERT_EQ("{ int32:1, int32:1, string:\"\", null }", rows[0]);
1915
  rows.clear();
1916
  LOG(INFO) << "Testing delete/insert in same batch, key " << 1 << ".";
1917
  ApplyDeleteToSession(session.get(), client_table_, 1);
1918
  ApplyInsertToSession(session.get(), client_table_, 1, 2, "");
1919
  FlushSessionOrDie(session);
1920
  ScanTableToStrings(client_table_, &rows);
1921
  ASSERT_EQ(1, rows.size());
1922
  ASSERT_EQ("{ int32:1, int32:2, string:\"\", null }", rows[0]);
1923
  rows.clear();
1924
}
1925
1926
// Tests that master permits are properly released after a whole bunch of
1927
// rows are inserted.
1928
0
TEST_F(ClientTest, TestMasterLookupPermits) {
1929
0
  int initial_value = client_->data_->meta_cache_->master_lookup_sem_.GetValue();
1930
0
  ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows));
1931
0
  ASSERT_EQ(initial_value,
1932
0
            client_->data_->meta_cache_->master_lookup_sem_.GetValue());
1933
0
}
1934
1935
// Define callback for deadlock simulation, as well as various helper methods.
1936
namespace {
1937
1938
class DeadlockSimulationCallback {
1939
 public:
1940
0
  explicit DeadlockSimulationCallback(Atomic32* i) : i_(i) {}
1941
1942
0
  void operator()(FlushStatus* flush_status) const {
1943
0
    CHECK_OK(flush_status->status);
1944
0
    NoBarrier_AtomicIncrement(i_, 1);
1945
0
  }
1946
 private:
1947
  Atomic32* const i_;
1948
};
1949
1950
// Returns col1 value of first row.
1951
0
int32_t ReadFirstRowKeyFirstCol(const TableHandle& tbl) {
1952
0
  TableRange range(tbl);
1953
1954
0
  auto it = range.begin();
1955
0
  EXPECT_NE(it, range.end());
1956
0
  return it->column(1).int32_value();
1957
0
}
1958
1959
// Checks that all rows have value equal to expected, return number of rows.
1960
int CheckRowsEqual(const TableHandle& tbl, int32_t expected) {
1961
  int cnt = 0;
1962
  for (const auto& row : TableRange(tbl)) {
1963
    EXPECT_EQ(row.column(1).int32_value(), expected);
1964
    EXPECT_EQ(row.column(2).string_value(), "");
1965
    EXPECT_EQ(row.column(3).int32_value(), 12345);
1966
    ++cnt;
1967
  }
1968
  return cnt;
1969
}
1970
1971
// Return a session "loaded" with updates. Sets the session timeout
1972
// to the parameter value. Larger timeouts decrease false positives.
1973
shared_ptr<YBSession> LoadedSession(YBClient* client,
1974
                                    const TableHandle& tbl,
1975
0
                                    bool fwd, int max, MonoDelta timeout) {
1976
0
  shared_ptr<YBSession> session = client->NewSession();
1977
0
  session->SetTimeout(timeout);
1978
0
  for (int i = 0; i < max; ++i) {
1979
0
    int key = fwd ? i : max - i;
1980
0
    ApplyUpdateToSession(session.get(), tbl, key, fwd);
1981
0
  }
1982
0
  return session;
1983
0
}
1984
1985
} // anonymous namespace
1986
1987
// Starts many clients which update a table in parallel.
1988
// Half of the clients update rows in ascending order while the other
1989
// half update rows in descending order.
1990
// This ensures that we don't hit a deadlock in such a situation.
1991
0
TEST_F(ClientTest, TestDeadlockSimulation) {
1992
0
  if (!AllowSlowTests()) {
1993
0
    LOG(WARNING) << "TestDeadlockSimulation disabled since slow.";
1994
0
    return;
1995
0
  }
1996
1997
  // Make reverse client who will make batches that update rows
1998
  // in reverse order. Separate client used so rpc calls come in at same time.
1999
0
  auto rev_client = ASSERT_RESULT(YBClientBuilder()
2000
0
      .add_master_server_addr(ToString(cluster_->mini_master()->bound_rpc_addr()))
2001
0
      .Build());
2002
0
  TableHandle rev_table;
2003
0
  ASSERT_OK(rev_table.Open(kTableName, client_.get()));
2004
2005
  // Load up some rows
2006
0
  const int kNumRows = 300;
2007
0
  const auto kTimeout = 60s;
2008
0
  auto session = CreateSession();
2009
0
  for (int i = 0; i < kNumRows; ++i)
2010
0
    ApplyInsertToSession(session.get(), client_table_, i, i,  "");
2011
0
  FlushSessionOrDie(session);
2012
2013
  // Check both clients see rows
2014
0
  auto fwd = CountRowsFromClient(client_table_);
2015
0
  ASSERT_EQ(kNumRows, fwd);
2016
0
  auto rev = CountRowsFromClient(rev_table);
2017
0
  ASSERT_EQ(kNumRows, rev);
2018
2019
  // Generate sessions
2020
0
  const int kNumSessions = 100;
2021
0
  shared_ptr<YBSession> fwd_sessions[kNumSessions];
2022
0
  shared_ptr<YBSession> rev_sessions[kNumSessions];
2023
0
  for (int i = 0; i < kNumSessions; ++i) {
2024
0
    fwd_sessions[i] = LoadedSession(client_.get(), client_table_, true, kNumRows, kTimeout);
2025
0
    rev_sessions[i] = LoadedSession(rev_client.get(), rev_table, true, kNumRows, kTimeout);
2026
0
  }
2027
2028
  // Run async calls - one thread updates sequentially, another in reverse.
2029
0
  Atomic32 ctr1, ctr2;
2030
0
  NoBarrier_Store(&ctr1, 0);
2031
0
  NoBarrier_Store(&ctr2, 0);
2032
0
  for (int i = 0; i < kNumSessions; ++i) {
2033
    // The callbacks are freed after they are invoked.
2034
0
    fwd_sessions[i]->FlushAsync(DeadlockSimulationCallback(&ctr1));
2035
0
    rev_sessions[i]->FlushAsync(DeadlockSimulationCallback(&ctr2));
2036
0
  }
2037
2038
  // Spin while waiting for ops to complete.
2039
0
  int lctr1, lctr2, prev1 = 0, prev2 = 0;
2040
0
  do {
2041
0
    lctr1 = NoBarrier_Load(&ctr1);
2042
0
    lctr2 = NoBarrier_Load(&ctr2);
2043
    // Display progress in 10% increments.
2044
0
    if (prev1 == 0 || lctr1 + lctr2 - prev1 - prev2 > kNumSessions / 10) {
2045
0
      LOG(INFO) << "# updates: " << lctr1 << " fwd, " << lctr2 << " rev";
2046
0
      prev1 = lctr1;
2047
0
      prev2 = lctr2;
2048
0
    }
2049
0
    SleepFor(MonoDelta::FromMilliseconds(100));
2050
0
  } while (lctr1 != kNumSessions|| lctr2 != kNumSessions);
2051
0
  int32_t expected = ReadFirstRowKeyFirstCol(client_table_);
2052
2053
  // Check transaction from forward client.
2054
0
  fwd = CheckRowsEqual(client_table_, expected);
2055
0
  ASSERT_EQ(fwd, kNumRows);
2056
2057
  // Check from reverse client side.
2058
0
  rev = CheckRowsEqual(rev_table, expected);
2059
0
  ASSERT_EQ(rev, kNumRows);
2060
0
}
2061
2062
0
TEST_F(ClientTest, TestCreateDuplicateTable) {
2063
0
  std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
2064
0
  ASSERT_TRUE(table_creator->table_name(kTableName)
2065
0
              .schema(&schema_)
2066
0
              .Create().IsAlreadyPresent());
2067
0
}
2068
2069
0
TEST_F(ClientTest, CreateTableWithoutTservers) {
2070
0
  DoTearDown();
2071
2072
0
  YBMiniClusterTestBase::SetUp();
2073
2074
0
  MiniClusterOptions options;
2075
0
  options.num_tablet_servers = 0;
2076
  // Start minicluster with only master (to simulate tserver not yet heartbeating).
2077
0
  cluster_.reset(new MiniCluster(options));
2078
0
  ASSERT_OK(cluster_->Start());
2079
2080
  // Connect to the cluster.
2081
0
  client_ = ASSERT_RESULT(YBClientBuilder()
2082
0
      .add_master_server_addr(yb::ToString(cluster_->mini_master()->bound_rpc_addr()))
2083
0
      .Build());
2084
2085
0
  std::unique_ptr<client::YBTableCreator> table_creator(client_->NewTableCreator());
2086
0
  Status s = table_creator->table_name(YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "foobar"))
2087
0
      .schema(&schema_)
2088
0
      .Create();
2089
0
  ASSERT_TRUE(s.IsInvalidArgument());
2090
0
  ASSERT_STR_CONTAINS(s.ToString(), "num_tablets should be greater than 0.");
2091
0
}
2092
2093
0
TEST_F(ClientTest, TestCreateTableWithTooManyTablets) {
2094
0
  FLAGS_max_create_tablets_per_ts = 1;
2095
0
  auto many_tablets = FLAGS_replication_factor + 1;
2096
2097
0
  std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
2098
0
  Status s = table_creator->table_name(YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "foobar"))
2099
0
      .schema(&schema_)
2100
0
      .num_tablets(many_tablets)
2101
0
      .Create();
2102
0
  ASSERT_TRUE(s.IsInvalidArgument());
2103
0
  ASSERT_STR_CONTAINS(
2104
0
      s.ToString(),
2105
0
      strings::Substitute(
2106
0
          "The requested number of tablets ($0) is over the permitted maximum ($1)", many_tablets,
2107
0
          FLAGS_replication_factor));
2108
0
}
2109
2110
// TODO(bogdan): Disabled until ENG-2687
2111
0
TEST_F(ClientTest, DISABLED_TestCreateTableWithTooManyReplicas) {
2112
0
  std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
2113
0
  Status s = table_creator->table_name(YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "foobar"))
2114
0
      .schema(&schema_)
2115
0
      .num_tablets(2)
2116
0
      .Create();
2117
0
  ASSERT_TRUE(s.IsInvalidArgument());
2118
0
  ASSERT_STR_CONTAINS(s.ToString(),
2119
0
                      "Not enough live tablet servers to create table with the requested "
2120
0
                      "replication factor 3. 1 tablet servers are alive");
2121
0
}
2122
2123
// Test that scanners will retry after receiving ERROR_SERVER_TOO_BUSY from an
2124
// overloaded tablet server. Regression test for KUDU-1079.
2125
0
TEST_F(ClientTest, TestServerTooBusyRetry) {
2126
0
  ASSERT_NO_FATALS(InsertTestRows(client_table_, FLAGS_test_scan_num_rows));
2127
2128
  // Introduce latency in each scan to increase the likelihood of
2129
  // ERROR_SERVER_TOO_BUSY.
2130
0
  FLAGS_TEST_scanner_inject_latency_on_each_batch_ms = 10;
2131
2132
  // Reduce the service queue length of each tablet server in order to increase
2133
  // the likelihood of ERROR_SERVER_TOO_BUSY.
2134
0
  FLAGS_tablet_server_svc_queue_length = 1;
2135
  // Set the backoff limits to be small for this test, so that we finish in a reasonable
2136
  // amount of time.
2137
0
  FLAGS_min_backoff_ms_exponent = 0;
2138
0
  FLAGS_max_backoff_ms_exponent = 3;
2139
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
2140
0
    MiniTabletServer* ts = cluster_->mini_tablet_server(i);
2141
0
    ASSERT_OK(ts->Restart());
2142
0
    ASSERT_OK(ts->WaitStarted());
2143
0
  }
2144
2145
0
  TestThreadHolder thread_holder;
2146
0
  std::mutex idle_threads_mutex;
2147
0
  std::vector<CountDownLatch*> idle_threads;
2148
0
  std::atomic<int> running_threads{0};
2149
2150
0
  while (!thread_holder.stop_flag().load()) {
2151
0
    CountDownLatch* latch;
2152
0
    {
2153
0
      std::lock_guard<std::mutex> lock(idle_threads_mutex);
2154
0
      if (!idle_threads.empty()) {
2155
0
        latch = idle_threads.back();
2156
0
        idle_threads.pop_back();
2157
0
      } else {
2158
0
        latch = nullptr;
2159
0
      }
2160
0
    }
2161
0
    if (latch) {
2162
0
      latch->CountDown();
2163
0
    } else {
2164
0
      auto num_threads = ++running_threads;
2165
0
      LOG(INFO) << "Start " << num_threads << " thread";
2166
0
      thread_holder.AddThreadFunctor([this, &idle_threads, &idle_threads_mutex,
2167
0
                                      &stop = thread_holder.stop_flag(), &running_threads]() {
2168
0
        CountDownLatch latch(1);
2169
0
        while (!stop.load()) {
2170
0
          CheckRowCount(client_table_);
2171
0
          latch.Reset(1);
2172
0
          {
2173
0
            std::lock_guard<std::mutex> lock(idle_threads_mutex);
2174
0
            idle_threads.push_back(&latch);
2175
0
          }
2176
0
          latch.Wait();
2177
0
        }
2178
0
        --running_threads;
2179
0
      });
2180
0
      std::this_thread::sleep_for(10ms);
2181
0
    }
2182
2183
0
    for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
2184
0
      scoped_refptr<Counter> counter = METRIC_rpcs_queue_overflow.Instantiate(
2185
0
          cluster_->mini_tablet_server(i)->server()->metric_entity());
2186
0
      if (counter->value() > 0) {
2187
0
        thread_holder.stop_flag().store(true, std::memory_order_release);
2188
0
        break;
2189
0
      }
2190
0
    }
2191
0
  }
2192
2193
0
  while (running_threads.load() > 0) {
2194
0
    LOG(INFO) << "Left to stop " << running_threads.load() << " threads";
2195
0
    {
2196
0
      std::lock_guard<std::mutex> lock(idle_threads_mutex);
2197
0
      while (!idle_threads.empty()) {
2198
0
        idle_threads.back()->CountDown(1);
2199
0
        idle_threads.pop_back();
2200
0
      }
2201
0
    }
2202
0
    std::this_thread::sleep_for(10ms);
2203
0
  }
2204
0
  thread_holder.JoinAll();
2205
0
}
2206
2207
0
TEST_F(ClientTest, TestReadFromFollower) {
2208
  // Create table and write some rows.
2209
0
  const YBTableName kReadFromFollowerTable(YQL_DATABASE_CQL, "TestReadFromFollower");
2210
0
  TableHandle table;
2211
0
  ASSERT_NO_FATALS(CreateTable(kReadFromFollowerTable, 1, &table));
2212
0
  ASSERT_NO_FATALS(InsertTestRows(table, FLAGS_test_scan_num_rows));
2213
2214
  // Find the followers.
2215
0
  GetTableLocationsRequestPB req;
2216
0
  GetTableLocationsResponsePB resp;
2217
0
  table->name().SetIntoTableIdentifierPB(req.mutable_table());
2218
0
  CHECK_OK(cluster_->mini_master()->catalog_manager().GetTableLocations(&req, &resp));
2219
0
  ASSERT_EQ(1, resp.tablet_locations_size());
2220
0
  ASSERT_EQ(3, resp.tablet_locations(0).replicas_size());
2221
0
  const string& tablet_id = resp.tablet_locations(0).tablet_id();
2222
2223
0
  vector<master::TSInfoPB> followers;
2224
0
  for (const auto& replica : resp.tablet_locations(0).replicas()) {
2225
0
    if (replica.role() == PeerRole::FOLLOWER) {
2226
0
      followers.push_back(replica.ts_info());
2227
0
    }
2228
0
  }
2229
0
  ASSERT_EQ(cluster_->num_tablet_servers() - 1, followers.size());
2230
2231
0
  auto client_messenger =
2232
0
      CreateAutoShutdownMessengerHolder(ASSERT_RESULT(CreateMessenger("client")));
2233
0
  rpc::ProxyCache proxy_cache(client_messenger.get());
2234
0
  for (const master::TSInfoPB& ts_info : followers) {
2235
    // Try to read from followers.
2236
0
    auto tserver_proxy = std::make_unique<tserver::TabletServerServiceProxy>(
2237
0
        &proxy_cache, HostPortFromPB(ts_info.private_rpc_addresses(0)));
2238
2239
0
    std::unique_ptr<QLRowBlock> row_block;
2240
0
    ASSERT_OK(WaitFor([&]() -> bool {
2241
      // Setup read request.
2242
0
      tserver::ReadRequestPB req;
2243
0
      tserver::ReadResponsePB resp;
2244
0
      rpc::RpcController controller;
2245
0
      req.set_tablet_id(tablet_id);
2246
0
      req.set_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX);
2247
0
      QLReadRequestPB *ql_read = req.mutable_ql_batch()->Add();
2248
0
      std::shared_ptr<std::vector<ColumnSchema>> selected_cols =
2249
0
          std::make_shared<std::vector<ColumnSchema>>(schema_.columns());
2250
0
      QLRSRowDescPB *rsrow_desc = ql_read->mutable_rsrow_desc();
2251
0
      for (size_t i = 0; i < schema_.num_columns(); i++) {
2252
0
        ql_read->add_selected_exprs()->set_column_id(narrow_cast<int32_t>(kFirstColumnId + i));
2253
0
        ql_read->mutable_column_refs()->add_ids(narrow_cast<int32_t>(kFirstColumnId + i));
2254
2255
0
        QLRSColDescPB *rscol_desc = rsrow_desc->add_rscol_descs();
2256
0
        rscol_desc->set_name((*selected_cols)[i].name());
2257
0
        (*selected_cols)[i].type()->ToQLTypePB(rscol_desc->mutable_ql_type());
2258
0
      }
2259
2260
0
      EXPECT_OK(tserver_proxy->Read(req, &resp, &controller));
2261
2262
      // Verify response.
2263
0
      EXPECT_FALSE(resp.has_error());
2264
0
      EXPECT_EQ(1, resp.ql_batch_size());
2265
0
      const QLResponsePB &ql_resp = resp.ql_batch(0);
2266
0
      EXPECT_EQ(QLResponsePB_QLStatus_YQL_STATUS_OK, ql_resp.status());
2267
0
      EXPECT_TRUE(ql_resp.has_rows_data_sidecar());
2268
2269
0
      EXPECT_TRUE(controller.finished());
2270
0
      Slice rows_data = EXPECT_RESULT(controller.GetSidecar(ql_resp.rows_data_sidecar()));
2271
0
      ql::RowsResult rows_result(kReadFromFollowerTable, selected_cols, rows_data.ToBuffer());
2272
0
      row_block = rows_result.GetRowBlock();
2273
0
      return implicit_cast<size_t>(FLAGS_test_scan_num_rows) == row_block->row_count();
2274
0
    }, MonoDelta::FromSeconds(30), "Waiting for replication to followers"));
2275
2276
0
    std::vector<bool> seen_key(row_block->row_count());
2277
0
    for (size_t i = 0; i < row_block->row_count(); i++) {
2278
0
      const QLRow& row = row_block->row(i);
2279
0
      auto key = row.column(0).int32_value();
2280
0
      ASSERT_LT(key, seen_key.size());
2281
0
      ASSERT_FALSE(seen_key[key]);
2282
0
      seen_key[key] = true;
2283
0
      ASSERT_EQ(key * 2, row.column(1).int32_value());
2284
0
      ASSERT_EQ(StringPrintf("hello %d", key), row.column(2).string_value());
2285
0
      ASSERT_EQ(key * 3, row.column(3).int32_value());
2286
0
    }
2287
0
  }
2288
0
}
2289
2290
0
TEST_F(ClientTest, Capability) {
2291
0
  constexpr CapabilityId kFakeCapability = 0x9c40e9a7;
2292
2293
0
  auto rt = ASSERT_RESULT(LookupFirstTabletFuture(client_.get(), client_table_.table()).get());
2294
0
  ASSERT_TRUE(rt.get() != nullptr);
2295
0
  auto tservers = rt->GetRemoteTabletServers();
2296
0
  ASSERT_EQ(tservers.size(), 3);
2297
0
  for (const auto& replica : tservers) {
2298
    // Capability is related to executable, so it should be present since we run mini cluster for
2299
    // this test.
2300
0
    ASSERT_TRUE(replica->HasCapability(CAPABILITY_ClientTest));
2301
2302
    // Check that fake capability is not reported.
2303
0
    ASSERT_FALSE(replica->HasCapability(kFakeCapability));
2304
2305
    // This capability is defined on the TServer, passed to the Master on registration,
2306
    // then propagated to the YBClient.  Ensure that this runtime pipeline holds.
2307
0
    ASSERT_TRUE(replica->HasCapability(CAPABILITY_TabletReportLimit));
2308
0
  }
2309
0
}
2310
2311
0
TEST_F(ClientTest, TestCreateTableWithRangePartition) {
2312
0
  std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
2313
0
  const std::string kPgsqlTableName = "pgsqlrangepartitionedtable";
2314
0
  const std::string kPgsqlTableId = "pgsqlrangepartitionedtableid";
2315
0
  const size_t kColIdx = 1;
2316
0
  const int64_t kKeyValue = 48238;
2317
0
  auto pgsql_table_name = YBTableName(
2318
0
      YQL_DATABASE_PGSQL, kPgsqlKeyspaceID, kPgsqlKeyspaceName, kPgsqlTableName);
2319
2320
0
  auto yql_table_name = YBTableName(YQL_DATABASE_CQL, kKeyspaceName, "yqlrangepartitionedtable");
2321
2322
0
  YBSchemaBuilder schemaBuilder;
2323
0
  schemaBuilder.AddColumn("key")->PrimaryKey()->Type(yb::STRING)->NotNull();
2324
0
  schemaBuilder.AddColumn("value")->Type(yb::INT64)->NotNull();
2325
0
  YBSchema schema;
2326
0
  EXPECT_OK(client_->CreateNamespaceIfNotExists(kPgsqlKeyspaceName,
2327
0
                                                YQLDatabase::YQL_DATABASE_PGSQL,
2328
0
                                                "" /* creator_role_name */,
2329
0
                                                kPgsqlKeyspaceID));
2330
  // Create a PGSQL table using range partition.
2331
0
  EXPECT_OK(schemaBuilder.Build(&schema));
2332
0
  Status s = table_creator->table_name(pgsql_table_name)
2333
0
      .table_id(kPgsqlTableId)
2334
0
      .schema(&schema)
2335
0
      .set_range_partition_columns({"key"})
2336
0
      .table_type(YBTableType::PGSQL_TABLE_TYPE)
2337
0
      .num_tablets(1)
2338
0
      .Create();
2339
0
  EXPECT_OK(s);
2340
2341
  // Write to the PGSQL table.
2342
0
  shared_ptr<YBTable> pgsq_table;
2343
0
  EXPECT_OK(client_->OpenTable(kPgsqlTableId , &pgsq_table));
2344
0
  std::shared_ptr<YBPgsqlWriteOp> pgsql_write_op(client::YBPgsqlWriteOp::NewInsert(pgsq_table));
2345
0
  PgsqlWriteRequestPB* psql_write_request = pgsql_write_op->mutable_request();
2346
2347
0
  psql_write_request->add_range_column_values()->mutable_value()->set_string_value("pgsql_key1");
2348
0
  PgsqlColumnValuePB* pgsql_column = psql_write_request->add_column_values();
2349
  // 1 is the index for column value.
2350
2351
0
  pgsql_column->set_column_id(pgsq_table->schema().ColumnId(kColIdx));
2352
0
  pgsql_column->mutable_expr()->mutable_value()->set_int64_value(kKeyValue);
2353
0
  std::shared_ptr<YBSession> session = CreateSession(client_.get());
2354
0
  session->Apply(pgsql_write_op);
2355
2356
  // Create a YQL table using range partition.
2357
0
  s = table_creator->table_name(yql_table_name)
2358
0
      .schema(&schema)
2359
0
      .set_range_partition_columns({"key"})
2360
0
      .table_type(YBTableType::YQL_TABLE_TYPE)
2361
0
      .num_tablets(1)
2362
0
      .Create();
2363
0
  EXPECT_OK(s);
2364
2365
  // Write to the YQL table.
2366
0
  client::TableHandle table;
2367
0
  EXPECT_OK(table.Open(yql_table_name, client_.get()));
2368
0
  std::shared_ptr<YBqlWriteOp> write_op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
2369
0
  QLWriteRequestPB* const req = write_op->mutable_request();
2370
0
  req->add_range_column_values()->mutable_value()->set_string_value("key1");
2371
0
  QLColumnValuePB* column = req->add_column_values();
2372
  // 1 is the index for column value.
2373
0
  column->set_column_id(pgsq_table->schema().ColumnId(kColIdx));
2374
0
  column->mutable_expr()->mutable_value()->set_int64_value(kKeyValue);
2375
0
  session->Apply(write_op);
2376
0
}
2377
2378
// TODO(jason): enable the test in clang when we use clang version at least 9 (otherwise, there is a
2379
// compilation error: P0428R2).
2380
#if !defined(__clang__)
2381
TEST_F(ClientTest, FlushTable) {
2382
  const tablet::Tablet* tablet;
2383
  constexpr int kTimeoutSecs = 30;
2384
  int current_row = 0;
2385
2386
  {
2387
    std::shared_ptr<TabletPeer> tablet_peer;
2388
    string tablet_id = GetFirstTabletId(client_table2_.get());
2389
    for (auto& ts : cluster_->mini_tablet_servers()) {
2390
      ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(tablet_id, &tablet_peer));
2391
      if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) {
2392
        break;
2393
      }
2394
    }
2395
    tablet = tablet_peer->tablet();
2396
  }
2397
2398
  auto test_good_flush_and_compact = ([&]<class T>(T table_id_or_name) {
2399
    int initial_num_sst_files = tablet->GetCurrentVersionNumSSTFiles();
2400
2401
    // Test flush table.
2402
    InsertTestRows(client_table2_, 1, current_row++);
2403
    ASSERT_EQ(tablet->GetCurrentVersionNumSSTFiles(), initial_num_sst_files);
2404
    ASSERT_OK(client_->FlushTables(
2405
        {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, false /* is_compaction */));
2406
    ASSERT_EQ(tablet->GetCurrentVersionNumSSTFiles(), initial_num_sst_files + 1);
2407
2408
    // Insert and flush more rows.
2409
    InsertTestRows(client_table2_, 1, current_row++);
2410
    ASSERT_OK(client_->FlushTables(
2411
        {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, false /* is_compaction */));
2412
    InsertTestRows(client_table2_, 1, current_row++);
2413
    ASSERT_OK(client_->FlushTables(
2414
        {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, false /* is_compaction */));
2415
2416
    // Test compact table.
2417
    ASSERT_EQ(tablet->GetCurrentVersionNumSSTFiles(), initial_num_sst_files + 3);
2418
    ASSERT_OK(client_->FlushTables(
2419
        {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, true /* is_compaction */));
2420
    ASSERT_EQ(tablet->GetCurrentVersionNumSSTFiles(), 1);
2421
  });
2422
2423
  test_good_flush_and_compact(client_table2_.table()->id());
2424
  test_good_flush_and_compact(client_table2_.table()->name());
2425
2426
  auto test_bad_flush_and_compact = ([&]<class T>(T table_id_or_name) {
2427
    // Test flush table.
2428
    ASSERT_NOK(client_->FlushTables(
2429
        {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, false /* is_compaction */));
2430
    // Test compact table.
2431
    ASSERT_NOK(client_->FlushTables(
2432
        {table_id_or_name}, /* add_indexes */ false, kTimeoutSecs, true /* is_compaction */));
2433
  });
2434
2435
  test_bad_flush_and_compact("bad table id");
2436
  test_bad_flush_and_compact(YBTableName(
2437
      YQLDatabase::YQL_DATABASE_CQL,
2438
      "bad namespace name",
2439
      "bad table name"));
2440
}
2441
#endif  // !defined(__clang__)
2442
2443
0
TEST_F(ClientTest, GetNamespaceInfo) {
2444
0
  GetNamespaceInfoResponsePB resp;
2445
2446
  // Setup.
2447
0
  ASSERT_OK(client_->CreateNamespace(kPgsqlKeyspaceName,
2448
0
                                     YQLDatabase::YQL_DATABASE_PGSQL,
2449
0
                                     "" /* creator_role_name */,
2450
0
                                     kPgsqlKeyspaceID,
2451
0
                                     "" /* source_namespace_id */,
2452
0
                                     boost::none /* next_pg_oid */,
2453
0
                                     nullptr /* txn */,
2454
0
                                     true /* colocated */));
2455
2456
  // CQL non-colocated.
2457
0
  ASSERT_OK(client_->GetNamespaceInfo(
2458
0
        "" /* namespace_id */, kKeyspaceName, YQL_DATABASE_CQL, &resp));
2459
0
  ASSERT_EQ(resp.namespace_().name(), kKeyspaceName);
2460
0
  ASSERT_EQ(resp.namespace_().database_type(), YQL_DATABASE_CQL);
2461
0
  ASSERT_FALSE(resp.colocated());
2462
2463
  // SQL colocated.
2464
0
  ASSERT_OK(client_->GetNamespaceInfo(
2465
0
        kPgsqlKeyspaceID, "" /* namespace_name */, YQL_DATABASE_PGSQL, &resp));
2466
0
  ASSERT_EQ(resp.namespace_().id(), kPgsqlKeyspaceID);
2467
0
  ASSERT_EQ(resp.namespace_().name(), kPgsqlKeyspaceName);
2468
0
  ASSERT_EQ(resp.namespace_().database_type(), YQL_DATABASE_PGSQL);
2469
0
  ASSERT_TRUE(resp.colocated());
2470
0
}
2471
2472
0
TEST_F(ClientTest, BadMasterAddress) {
2473
0
  auto messenger = ASSERT_RESULT(CreateMessenger("test-messenger"));
2474
0
  auto host = "should.not.resolve";
2475
2476
  // Put host entry in cache.
2477
0
  ASSERT_NOK(messenger->resolver().Resolve(host));
2478
2479
0
  {
2480
0
    struct TestServerOptions : public server::ServerBaseOptions {
2481
0
      TestServerOptions() : server::ServerBaseOptions(1) {}
2482
0
    };
2483
0
    TestServerOptions opts;
2484
0
    auto master_addr = std::make_shared<server::MasterAddresses>();
2485
    // Put several hosts, so resolve would take place.
2486
0
    master_addr->push_back({HostPort(host, 1)});
2487
0
    master_addr->push_back({HostPort(host, 2)});
2488
0
    opts.SetMasterAddresses(master_addr);
2489
2490
0
    AsyncClientInitialiser async_init(
2491
0
        "test-client", /* num_reactors= */ 1, /* timeout_seconds= */ 1, "UUID", &opts,
2492
0
        /* metric_entity= */ nullptr, /* parent_mem_tracker= */ nullptr, messenger.get());
2493
0
    async_init.Start();
2494
0
    async_init.get_client_future().wait_for(1s);
2495
0
  }
2496
2497
0
  messenger->Shutdown();
2498
0
}
2499
2500
0
TEST_F(ClientTest, RefreshPartitions) {
2501
0
  const auto kLookupTimeout = 10s;
2502
0
  const auto kNumLookupThreads = 2;
2503
0
  const auto kNumLookups = 100;
2504
2505
0
  std::atomic<bool> stop_requested{false};
2506
0
  std::atomic<size_t> num_lookups_called{0};
2507
0
  std::atomic<size_t> num_lookups_done{0};
2508
2509
0
  const auto callback = [&num_lookups_done](
2510
0
                            size_t lookup_idx, const Result<internal::RemoteTabletPtr>& tablet) {
2511
0
    const auto prefix = Format("Lookup $0 got ", lookup_idx);
2512
0
    if (tablet.ok()) {
2513
0
      LOG(INFO) << prefix << "tablet: " << (*tablet)->tablet_id();
2514
0
    } else {
2515
0
      LOG(INFO) << prefix << "error: " << AsString(tablet.status());
2516
0
    }
2517
0
    num_lookups_done.fetch_add(1);
2518
0
  };
2519
2520
0
  const auto lookup_func = [&]() {
2521
0
    while(!stop_requested) {
2522
0
      const auto hash_code = RandomUniformInt<uint16_t>(0, PartitionSchema::kMaxPartitionKey);
2523
0
      const auto partition_key = PartitionSchema::EncodeMultiColumnHashValue(hash_code);
2524
0
      client_->LookupTabletByKey(
2525
0
          client_table_.table(), partition_key, CoarseMonoClock::now() + kLookupTimeout,
2526
0
          std::bind(callback, num_lookups_called.fetch_add(1), std::placeholders::_1));
2527
0
    }
2528
0
  };
2529
2530
0
  const auto marker_func = [&]() {
2531
0
    while(!stop_requested) {
2532
0
      const auto table = client_table_.table();
2533
0
      table->MarkPartitionsAsStale();
2534
2535
0
      Synchronizer synchronizer;
2536
0
      table->RefreshPartitions(client_table_.client(), synchronizer.AsStdStatusCallback());
2537
0
      const auto status = synchronizer.Wait();
2538
0
      if (!status.ok()) {
2539
0
        LOG(INFO) << status;
2540
0
      }
2541
0
    }
2542
0
  };
2543
2544
0
  LOG(INFO) << "Starting threads";
2545
2546
0
  std::vector<std::thread> threads;
2547
0
  for (int i = 0; i < kNumLookupThreads; ++i) {
2548
0
    threads.push_back(std::thread(lookup_func));
2549
0
  }
2550
0
  threads.push_back(std::thread(marker_func));
2551
2552
0
  ASSERT_OK(LoggedWaitFor([&num_lookups_called]{
2553
0
    return num_lookups_called >= kNumLookups;
2554
0
  }, 120s, "Tablet lookup calls"));
2555
2556
0
  LOG(INFO) << "Stopping threads";
2557
0
  stop_requested = true;
2558
0
  for (auto& thread : threads) {
2559
0
    thread.join();
2560
0
  }
2561
0
  LOG(INFO) << "Stopped threads";
2562
2563
0
  ASSERT_OK(LoggedWaitFor([&num_lookups_done, num_lookups = num_lookups_called.load()] {
2564
0
    return num_lookups_done >= num_lookups;
2565
0
  }, kLookupTimeout, "Tablet lookup responses"));
2566
2567
0
  LOG(INFO) << "num_lookups_done: " << num_lookups_done;
2568
0
}
2569
2570
// There should be only one lookup RPC asking for colocated tables tablet locations.
2571
// When we ask for tablet lookup for other tables colocated with the first one we asked, MetaCache
2572
// should be able to respond without sending RPCs to master again.
2573
0
TEST_F(ClientTest, ColocatedTablesLookupTablet) {
2574
0
  const auto kTabletLookupTimeout = 10s;
2575
0
  const auto kNumTables = 10;
2576
2577
0
  YBTableName common_table_name(
2578
0
      YQLDatabase::YQL_DATABASE_PGSQL, kPgsqlKeyspaceID, kPgsqlKeyspaceName, "table_name");
2579
0
  ASSERT_OK(client_->CreateNamespace(
2580
0
      common_table_name.namespace_name(),
2581
0
      common_table_name.namespace_type(),
2582
0
      /* creator_role_name =*/ "",
2583
0
      common_table_name.namespace_id(),
2584
0
      /* source_namespace_id =*/ "",
2585
0
      /* next_pg_oid =*/ boost::none,
2586
0
      /* txn =*/ nullptr,
2587
0
      /* colocated =*/ true));
2588
2589
0
  YBSchemaBuilder schemaBuilder;
2590
0
  schemaBuilder.AddColumn("key")->PrimaryKey()->Type(yb::INT64);
2591
0
  schemaBuilder.AddColumn("value")->Type(yb::INT64);
2592
0
  YBSchema schema;
2593
0
  ASSERT_OK(schemaBuilder.Build(&schema));
2594
2595
0
  std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
2596
0
  std::vector<YBTableName> table_names;
2597
0
  for (auto i = 0; i < kNumTables; ++i) {
2598
0
    const auto name = Format("table_$0", i);
2599
0
    auto table_name = common_table_name;
2600
    // Autogenerated ids will fail the IsPgsqlId() CHECKs so we need to generate oids.
2601
0
    table_name.set_table_id(GetPgsqlTableId(i, i));
2602
0
    table_name.set_table_name(name);
2603
0
    ASSERT_OK(table_creator->table_name(table_name)
2604
0
                  .table_id(table_name.table_id())
2605
0
                  .schema(&schema)
2606
0
                  .set_range_partition_columns({"key"})
2607
0
                  .table_type(YBTableType::PGSQL_TABLE_TYPE)
2608
0
                  .num_tablets(1)
2609
0
                  .Create());
2610
0
    table_names.push_back(table_name);
2611
0
  }
2612
2613
0
  const auto lookup_serial_start = client::internal::TEST_GetLookupSerial();
2614
2615
0
  TabletId colocated_tablet_id;
2616
0
  for (const auto& table_name : table_names) {
2617
0
    auto table = ASSERT_RESULT(client_->OpenTable(table_name));
2618
0
    auto tablet = ASSERT_RESULT(client_->LookupTabletByKeyFuture(
2619
0
        table, /* partition_key =*/ "",
2620
0
        CoarseMonoClock::now() + kTabletLookupTimeout).get());
2621
0
    const auto tablet_id = tablet->tablet_id();
2622
0
    if (colocated_tablet_id.empty()) {
2623
0
      colocated_tablet_id = tablet_id;
2624
0
    } else {
2625
0
      ASSERT_EQ(tablet_id, colocated_tablet_id);
2626
0
    }
2627
0
  }
2628
2629
0
  const auto lookup_serial_stop = client::internal::TEST_GetLookupSerial();
2630
0
  ASSERT_EQ(lookup_serial_stop, lookup_serial_start + 1);
2631
0
}
2632
2633
class ClientTestWithHashAndRangePk : public ClientTest {
2634
 public:
2635
1
  void SetUp() override {
2636
1
    YBSchemaBuilder b;
2637
1
    b.AddColumn("h")->Type(INT32)->HashPrimaryKey()->NotNull();
2638
1
    b.AddColumn("r")->Type(INT32)->PrimaryKey()->NotNull();
2639
1
    b.AddColumn("v")->Type(INT32);
2640
2641
1
    CHECK_OK(b.Build(&schema_));
2642
2643
1
    ClientTest::SetUp();
2644
1
  }
2645
2646
0
  shared_ptr<YBqlWriteOp> BuildTestRow(const TableHandle& table, int h, int r, int v) {
2647
0
    auto insert = table.NewInsertOp();
2648
0
    auto req = insert->mutable_request();
2649
0
    QLAddInt32HashValue(req, h);
2650
0
    const auto& columns = table.schema().columns();
2651
0
    table.AddInt32ColumnValue(req, columns[1].name(), r);
2652
0
    table.AddInt32ColumnValue(req, columns[2].name(), v);
2653
0
    return insert;
2654
0
  }
2655
};
2656
2657
// We concurrently execute batches of insert operations, each batch targeting the same hash
2658
// partition key. Concurrently we emulate table partition list version increase and meta cache
2659
// invalidation.
2660
// This tests https://github.com/yugabyte/yugabyte-db/issues/9806 with a scenario
2661
// when the batcher is emptied due to part of tablet lookups failing, but callback was not called.
2662
0
TEST_F_EX(ClientTest, EmptiedBatcherFlush, ClientTestWithHashAndRangePk) {
2663
0
  constexpr auto kNumRowsPerBatch = 100;
2664
0
  constexpr auto kWriters = 4;
2665
0
  const auto kTotalNumBatches = 50;
2666
0
  const auto kFlushTimeout = 10s * kTimeMultiplier;
2667
2668
0
  TestThreadHolder thread_holder;
2669
0
  std::atomic<int> next_batch_hash_key{10000};
2670
0
  const auto stop_at_batch_hash_key = next_batch_hash_key.load() + kTotalNumBatches;
2671
2672
0
  for (int i = 0; i != kWriters; ++i) {
2673
0
    thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag(), &next_batch_hash_key,
2674
0
                                    num_rows_per_batch = kNumRowsPerBatch, stop_at_batch_hash_key,
2675
0
                                    kFlushTimeout] {
2676
0
      SetFlagOnExit set_flag_on_exit(&stop);
2677
2678
0
      while (!stop.load(std::memory_order_acquire)) {
2679
0
        auto batch_hash_key = next_batch_hash_key.fetch_add(1);
2680
0
        if (batch_hash_key >= stop_at_batch_hash_key) {
2681
0
          break;
2682
0
        }
2683
0
        auto session = CreateSession(client_.get());
2684
0
        for (int r = 0; r < num_rows_per_batch; r++) {
2685
0
          session->Apply(BuildTestRow(client_table_, batch_hash_key, r, 0));
2686
0
        }
2687
0
        auto flush_future = session->FlushFuture();
2688
0
        ASSERT_EQ(flush_future.wait_for(kFlushTimeout), std::future_status::ready)
2689
0
            << "batch_hash_key: " << batch_hash_key;
2690
0
        const auto& flush_status = flush_future.get();
2691
0
        if (!flush_status.status.ok() || !flush_status.errors.empty()) {
2692
0
          LogSessionErrorsAndDie(flush_status);
2693
0
        }
2694
0
      }
2695
0
    });
2696
0
  }
2697
2698
0
  thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] {
2699
0
    SetFlagOnExit set_flag_on_exit(&stop);
2700
2701
0
    const auto table = client_table_.table();
2702
2703
0
    while (!stop.load(std::memory_order_acquire)) {
2704
0
      ASSERT_OK(cluster_->mini_master()->catalog_manager()
2705
0
          .TEST_IncrementTablePartitionListVersion(table->id()));
2706
0
      table->MarkPartitionsAsStale();
2707
0
      SleepFor(10ms);
2708
0
    }
2709
0
  });
2710
2711
0
  thread_holder.JoinAll();
2712
0
}
2713
2714
class ClientTestWithThreeMasters : public ClientTest {
2715
 protected:
2716
1
  int NumMasters() override {
2717
1
    return 3;
2718
1
  }
2719
2720
0
  Status InitClient() override {
2721
    // Connect to the cluster using hostnames.
2722
0
    string master_addrs;
2723
0
    for (int i = 1; i <= NumMasters(); ++i) {
2724
      // TEST_RpcAddress is 1-indexed, but mini_master is 0-indexed.
2725
0
      master_addrs += server::TEST_RpcAddress(i, server::Private::kFalse) +
2726
0
                      ":" + yb::ToString(cluster_->mini_master(i - 1)->bound_rpc_addr().port());
2727
0
      if (i < NumMasters()) {
2728
0
        master_addrs += ",";
2729
0
      }
2730
0
    }
2731
2732
0
    client_ = VERIFY_RESULT(YBClientBuilder()
2733
0
        .add_master_server_addr(master_addrs)
2734
0
        .Build());
2735
2736
0
    return Status::OK();
2737
0
  }
2738
};
2739
2740
0
TEST_F_EX(ClientTest, IsMultiMasterWithFailingHostnameResolution, ClientTestWithThreeMasters) {
2741
0
  google::FlagSaver flag_saver;
2742
  // TEST_RpcAddress is 1-indexed.
2743
0
  string hostname = server::TEST_RpcAddress(cluster_->LeaderMasterIdx() + 1,
2744
0
                                            server::Private::kFalse);
2745
2746
  // Shutdown the master leader, and wait for new leader to get elected.
2747
0
  ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Shutdown();
2748
0
  ASSERT_RESULT(cluster_->GetLeaderMiniMaster());
2749
2750
  // Fail resolution of the old leader master's hostname.
2751
0
  FLAGS_TEST_fail_to_fast_resolve_address = hostname;
2752
0
  LOG(INFO) << "Setting FLAGS_TEST_fail_to_fast_resolve_address to: "
2753
0
            << FLAGS_TEST_fail_to_fast_resolve_address;
2754
2755
  // Make a client request to the leader master, since that master is no longer the leader, we will
2756
  // check that we have a MultiMaster setup. That check should not fail even though one of the
2757
  // master addresses currently doesn't resolve. Thus, we should be able to find the new master
2758
  // leader and complete the request.
2759
0
  ASSERT_RESULT(client_->ListTables());
2760
0
}
2761
2762
}  // namespace client
2763
}  // namespace yb