YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tools/yb-bulk_load-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <string>
15
#include <thread>
16
17
#include <boost/algorithm/string.hpp>
18
#include <gtest/gtest.h>
19
20
#include "yb/client/client.h"
21
#include "yb/client/schema.h"
22
#include "yb/client/table.h"
23
#include "yb/client/table_creator.h"
24
#include "yb/client/table_handle.h"
25
26
#include "yb/common/hybrid_time.h"
27
#include "yb/common/jsonb.h"
28
#include "yb/common/partition.h"
29
#include "yb/common/ql_type.h"
30
#include "yb/common/ql_value.h"
31
#include "yb/common/schema.h"
32
#include "yb/common/wire_protocol.h"
33
34
#include "yb/gutil/casts.h"
35
36
#include "yb/integration-tests/mini_cluster.h"
37
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
38
39
#include "yb/master/master_client.proxy.h"
40
#include "yb/master/mini_master.h"
41
42
#include "yb/rpc/messenger.h"
43
#include "yb/rpc/proxy.h"
44
#include "yb/rpc/rpc_controller.h"
45
46
#include "yb/tools/bulk_load_utils.h"
47
#include "yb/tools/yb-generate_partitions.h"
48
49
#include "yb/tserver/tserver_service.proxy.h"
50
51
#include "yb/util/path_util.h"
52
#include "yb/util/random.h"
53
#include "yb/util/result.h"
54
#include "yb/util/status_log.h"
55
#include "yb/util/subprocess.h"
56
#include "yb/util/tsan_util.h"
57
58
#include "yb/yql/cql/ql/util/statement_result.h"
59
60
DECLARE_uint64(initial_seqno);
61
DECLARE_uint64(bulk_load_num_files_per_tablet);
62
DECLARE_bool(enable_load_balancing);
63
DECLARE_int32(replication_factor);
64
65
using namespace std::literals;
66
67
namespace yb {
68
namespace tools {
69
70
using client::YBClient;
71
using client::YBClientBuilder;
72
using client::YBSchema;
73
using client::YBSchemaBuilder;
74
using client::YBTableCreator;
75
using client::YBTableName;
76
using client::YBTable;
77
using common::Jsonb;
78
79
static const char* const kPartitionToolName = "yb-generate_partitions_main";
80
static const char* const kBulkLoadToolName = "yb-bulk_load";
81
static const char* const kNamespace = "bulk_load_test_namespace";
82
static const char* const kTableName = "my_table";
83
// Lower number of runs for tsan due to low perf.
84
static constexpr int32_t kNumIterations = NonTsanVsTsan(10000, 30);
85
static constexpr int32_t kNumTablets = NonTsanVsTsan(3, 3);
86
static constexpr int32_t kNumTabletServers = 1;
87
static constexpr int32_t kV2Value = 12345;
88
static constexpr size_t kV2Index = 5;
89
static constexpr uint64_t kNumFilesPerTablet = 5;
90
91
class YBBulkLoadTest : public YBMiniClusterTestBase<MiniCluster> {
92
 public:
93
0
  YBBulkLoadTest() : random_(0) {
94
0
  }
95
96
0
  void SetUp() override {
97
0
    YBMiniClusterTestBase::SetUp();
98
0
    MiniClusterOptions opts;
99
100
0
    opts.num_tablet_servers = kNumTabletServers;
101
102
    // Use a high enough initial sequence number.
103
0
    FLAGS_initial_seqno = 1 << 20;
104
105
0
    cluster_.reset(new MiniCluster(opts));
106
0
    ASSERT_OK(cluster_->Start());
107
108
0
    client_ = ASSERT_RESULT(YBClientBuilder()
109
0
        .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str())
110
0
        .Build());
111
112
0
    YBSchemaBuilder b;
113
0
    b.AddColumn("hash_key")->Type(INT64)->NotNull()->HashPrimaryKey();
114
0
    b.AddColumn("hash_key_timestamp")->Type(TIMESTAMP)->NotNull()->HashPrimaryKey();
115
0
    b.AddColumn("hash_key_string")->Type(STRING)->NotNull()->HashPrimaryKey();
116
0
    b.AddColumn("range_key")->Type(TIMESTAMP)->NotNull()->PrimaryKey();
117
0
    b.AddColumn("v1")->Type(STRING)->NotNull();
118
0
    b.AddColumn("v2")->Type(INT32)->NotNull();
119
0
    b.AddColumn("v3")->Type(FLOAT)->NotNull();
120
0
    b.AddColumn("v4")->Type(DOUBLE)->NotNull();
121
0
    b.AddColumn("v5")->Type(JSONB)->Nullable();
122
0
    CHECK_OK(b.Build(&schema_));
123
124
0
    client_ = ASSERT_RESULT(cluster_->CreateClient());
125
0
    client_messenger_ = ASSERT_RESULT(rpc::MessengerBuilder("Client").Build());
126
0
    rpc::ProxyCache proxy_cache(client_messenger_.get());
127
0
    proxy_ = std::make_unique<master::MasterClientProxy>(
128
0
        &proxy_cache, ASSERT_RESULT(cluster_->GetLeaderMasterBoundRpcAddr()));
129
130
    // Create the namespace.
131
0
    ASSERT_OK(client_->CreateNamespace(kNamespace));
132
133
    // Create the table.
134
0
    table_name_.reset(new YBTableName(YQL_DATABASE_CQL, kNamespace, kTableName));
135
0
    std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
136
0
    ASSERT_OK(table_creator->table_name(*table_name_.get())
137
0
          .table_type(client::YBTableType::YQL_TABLE_TYPE)
138
0
          .schema(&schema_)
139
0
          .num_tablets(kNumTablets)
140
0
          .wait(true)
141
0
          .Create());
142
143
0
    ASSERT_OK(client_->OpenTable(*table_name_, &table_));
144
145
0
    for (size_t i = 0; i < cluster_->num_masters(); i++) {
146
0
      const string& master_address = cluster_->mini_master(i)->bound_rpc_addr_str();
147
0
      master_addresses_.push_back(master_address);
148
0
    }
149
150
0
    master_addresses_comma_separated_ = boost::algorithm::join(master_addresses_, ",");
151
0
    partition_generator_.reset(new YBPartitionGenerator(*table_name_, master_addresses_));
152
0
    ASSERT_OK(partition_generator_->Init());
153
0
  }
154
155
0
  void DoTearDown() override {
156
0
    client_messenger_->Shutdown();
157
0
    client_.reset();
158
0
    cluster_->Shutdown();
159
0
  }
160
161
  CHECKED_STATUS StartProcessAndGetStreams(string exe_path, vector<string> argv, FILE** out,
162
0
                                           FILE** in, std::unique_ptr<Subprocess>* process) {
163
0
    process->reset(new Subprocess(exe_path, argv));
164
0
    (*process)->PipeParentStdout();
165
0
    RETURN_NOT_OK((*process)->Start());
166
167
0
    *out = fdopen((*process)->ReleaseChildStdinFd(), "w");
168
0
    PCHECK(out);
169
0
    *in = fdopen((*process)->from_child_stdout_fd(), "r");
170
0
    PCHECK(in);
171
0
    return Status::OK();
172
0
  }
173
174
0
  void CloseStreamsAndWaitForProcess(FILE* out, FILE* in, Subprocess* const process) {
175
0
    ASSERT_EQ(0, fclose(out));
176
0
    ASSERT_EQ(0, fclose(in));
177
178
0
    int wait_status = 0;
179
0
    ASSERT_OK(process->Wait(&wait_status));
180
0
    ASSERT_TRUE(WIFEXITED(wait_status));
181
0
    ASSERT_EQ(0, WEXITSTATUS(wait_status));
182
0
  }
183
184
0
  CHECKED_STATUS CreateQLReadRequest(const string& row, QLReadRequestPB* req) {
185
0
    req->set_client(YQL_CLIENT_CQL);
186
0
    string tablet_id;
187
0
    string partition_key;
188
0
    CsvTokenizer tokenizer = Tokenize(row);
189
0
    RETURN_NOT_OK(partition_generator_->LookupTabletIdWithTokenizer(
190
0
        tokenizer, {}, &tablet_id, &partition_key));
191
0
    uint16_t hash_code = PartitionSchema::DecodeMultiColumnHashValue(partition_key);
192
0
    req->set_hash_code(hash_code);
193
0
    req->set_max_hash_code(hash_code);
194
195
0
    auto it = tokenizer.begin();
196
    // Set hash columns.
197
    // hash_key .
198
0
    QLExpressionPB* hashed_column = req->add_hashed_column_values();
199
0
    hashed_column->mutable_value()->set_int64_value(std::stol(*it++));
200
201
    // hash_key_timestamp.
202
0
    auto ts = TimestampFromString(*it++);
203
0
    RETURN_NOT_OK(ts);
204
0
    hashed_column = req->add_hashed_column_values();
205
0
    hashed_column->mutable_value()->set_timestamp_value(ts->ToInt64());
206
207
    // hash_key_string.
208
0
    hashed_column = req->add_hashed_column_values();
209
0
    hashed_column->mutable_value()->set_string_value(*it++);
210
211
    // Set range column.
212
0
    QLConditionPB* condition = req->mutable_where_expr()->mutable_condition();
213
0
    condition->set_op(QLOperator::QL_OP_EQUAL);
214
0
    condition->add_operands()->set_column_id(kFirstColumnId + 3);
215
0
    RETURN_NOT_OK(ts = TimestampFromString(*it++));
216
0
    condition->add_operands()->mutable_value()->set_timestamp_value(ts->ToInt64());
217
218
    // Set all column ids.
219
0
    QLRSRowDescPB *rsrow_desc = req->mutable_rsrow_desc();
220
0
    for (size_t i = 0; i < table_->InternalSchema().num_columns(); i++) {
221
0
      req->mutable_column_refs()->add_ids(narrow_cast<int32_t>(kFirstColumnId + i));
222
0
      req->add_selected_exprs()->set_column_id(narrow_cast<int32_t>(kFirstColumnId + i));
223
224
0
      const ColumnSchema& col = table_->InternalSchema().column(i);
225
0
      QLRSColDescPB *rscol_desc = rsrow_desc->add_rscol_descs();
226
0
      rscol_desc->set_name(col.name());
227
0
      col.type()->ToQLTypePB(rscol_desc->mutable_ql_type());
228
0
    }
229
0
    return Status::OK();
230
0
  }
231
232
233
0
  void ValidateRow(const string& row, const QLRow& ql_row) {
234
    // Get individual columns.
235
0
    CsvTokenizer tokenizer = Tokenize(row);
236
0
    auto it = tokenizer.begin();
237
0
    ASSERT_EQ(std::stol(*it++), ql_row.column(0).int64_value());
238
0
    auto ts = TimestampFromString(*it++);
239
0
    ASSERT_OK(ts);
240
0
    ASSERT_EQ(*ts, ql_row.column(1).timestamp_value());
241
0
    ASSERT_EQ(*it++, ql_row.column(2).string_value());
242
0
    ASSERT_OK(ts = TimestampFromString(*it++));
243
0
    ASSERT_EQ(*ts, ql_row.column(3).timestamp_value());
244
0
    ASSERT_EQ(*it++, ql_row.column(4).string_value());
245
0
    ASSERT_EQ(std::stoi(*it++), ql_row.column(5).int32_value());
246
0
    ASSERT_FLOAT_EQ(std::stof(*it++), ql_row.column(6).float_value());
247
0
    ASSERT_DOUBLE_EQ(std::stold(*it++), ql_row.column(7).double_value());
248
0
    string token_str = *it++;
249
0
    if (IsNull(token_str)) {
250
0
      ASSERT_TRUE(ql_row.column(8).IsNull());
251
0
    } else {
252
0
      Jsonb jsonb_from_token;
253
0
      CHECK_OK(jsonb_from_token.FromString(token_str));
254
0
      ASSERT_EQ(jsonb_from_token.SerializedJsonb(), ql_row.column(8).jsonb_value());
255
0
    }
256
0
  }
257
258
 protected:
259
260
  class LoadGenerator {
261
   public:
262
    LoadGenerator(const string& tablet_id, const client::TableHandle* table,
263
                  tserver::TabletServerServiceProxy* tserver_proxy)
264
      : tablet_id_(tablet_id),
265
        table_(table),
266
0
        tserver_proxy_(tserver_proxy) {
267
0
    }
268
269
0
    void RunLoad() {
270
0
      while (running_.load()) {
271
0
        ASSERT_NO_FATALS(WriteRow(tablet_id_));
272
0
      }
273
0
    }
274
275
0
    void StopLoad() {
276
0
      running_.store(false);
277
0
    }
278
279
0
    void WriteRow(const string& tablet_id) {
280
0
      tserver::WriteRequestPB req;
281
0
      tserver::WriteResponsePB resp;
282
0
      rpc::RpcController controller;
283
0
      controller.set_timeout(15s);
284
285
0
      req.set_tablet_id(tablet_id);
286
0
      QLWriteRequestPB* ql_write = req.add_ql_write_batch();
287
0
      QLAddInt64HashValue(ql_write, random_.Next64());
288
0
      QLAddTimestampHashValue(ql_write, random_.Next32());
289
0
      QLAddStringHashValue(ql_write, std::to_string(random_.Next32()));
290
0
      QLSetHashCode(ql_write);
291
292
0
      QLAddTimestampRangeValue(ql_write, random_.Next64());
293
294
0
      table_->AddStringColumnValue(ql_write, "v1", "");
295
0
      table_->AddInt32ColumnValue(ql_write, "v2", 0);
296
0
      table_->AddFloatColumnValue(ql_write, "v3", 0);
297
0
      table_->AddDoubleColumnValue(ql_write, "v4", 0);
298
0
      table_->AddJsonbColumnValue(ql_write, "v5", "{ \"a\" : \"foo\" , \"b\" : \"bar\" }");
299
300
0
      auto status = tserver_proxy_->Write(req, &resp, &controller);
301
0
      ASSERT_TRUE(status.ok() || status.IsTimedOut()) << "Bad status: " << status;
302
0
      ASSERT_FALSE(resp.has_error()) << "Resp: " << resp.ShortDebugString();
303
0
    }
304
305
   private:
306
    string tablet_id_;
307
    const client::TableHandle* table_;
308
    tserver::TabletServerServiceProxy* tserver_proxy_;
309
310
    std::atomic<bool> running_{true};
311
    Random random_{0};
312
  };
313
314
0
  string GenerateRow(int index) {
315
    // Build the row and lookup table_id
316
0
    string timestamp_string;
317
0
    string json;
318
0
    if (index % 2 == 0) {
319
      // Use string format.
320
0
      int year = 1970 + random_.Next32() % 2000;
321
0
      int month = 1 + random_.Next32() % 12;
322
0
      int day = 1 + random_.Next32() % 28;
323
0
      int hour = random_.Next32() % 24;
324
0
      int minute = random_.Next32() % 60;
325
0
      int second = random_.Next32() % 60;
326
0
      timestamp_string = strings::Substitute("$0-$1-$2 $3:$4:$5", year, month, day, hour, minute,
327
0
                                             second);
328
0
      json = "\"{\\\"a\\\":\\\"foo\\\",\\\"b\\\":\\\"bar\\\"}\"";
329
0
    } else {
330
0
      timestamp_string = std::to_string(static_cast<int64_t>(random_.Next32()));
331
0
      json = "\\\\n"; // represents null value.
332
0
    }
333
334
0
    string row = strings::Substitute(
335
0
        "$0,$1,$2,2017-06-17 14:47:00,\"abc,xyz\",$3,3.14,4.1,$4",
336
0
        static_cast<int64_t>(random_.Next32()), timestamp_string, random_.Next32(), kV2Value, json);
337
0
    VLOG(1) << "Generated row: " << row;
338
0
    return row;
339
0
  }
340
341
0
  void VerifyTabletId(const string& tablet_id, master::TabletLocationsPB* tablet_location) {
342
    // Verify we have the appropriate tablet_id.
343
0
    master::GetTabletLocationsRequestPB req;
344
0
    req.add_tablet_ids(tablet_id);
345
0
    master::GetTabletLocationsResponsePB resp;
346
0
    rpc::RpcController controller;
347
0
    controller.set_timeout(60s);
348
0
    ASSERT_OK(proxy_->GetTabletLocations(req, &resp, &controller));
349
0
    ASSERT_FALSE(resp.has_error());
350
0
    ASSERT_EQ(1, resp.tablet_locations_size());
351
0
    *tablet_location = resp.tablet_locations(0);
352
0
    VLOG(1) << "Got tablet info: " << tablet_location->DebugString();
353
0
  }
354
355
0
  void VerifyTabletIdPartitionKey(const string& tablet_id, const string& partition_key) {
356
0
    master::TabletLocationsPB tablet_location;
357
0
    VerifyTabletId(tablet_id, &tablet_location);
358
0
    ASSERT_GE(partition_key, tablet_location.partition().partition_key_start());
359
0
    auto partition_key_end = tablet_location.partition().partition_key_end();
360
0
    if (!partition_key_end.empty()) {
361
0
      ASSERT_LT(partition_key, tablet_location.partition().partition_key_end());
362
0
    }
363
0
  }
364
365
  void PerformRead(const tserver::ReadRequestPB& req,
366
                   tserver::TabletServerServiceProxy* tserver_proxy,
367
0
                   std::unique_ptr<QLRowBlock>* rowblock) {
368
0
    tserver::ReadResponsePB resp;
369
0
    rpc::RpcController controller;
370
0
    controller.set_timeout(15s);
371
0
    ASSERT_OK(tserver_proxy->Read(req, &resp, &controller));
372
0
    ASSERT_FALSE(resp.has_error());
373
0
    ASSERT_EQ(1, resp.ql_batch_size());
374
0
    QLResponsePB ql_resp = resp.ql_batch(0);
375
0
    ASSERT_EQ(QLResponsePB_QLStatus_YQL_STATUS_OK, ql_resp.status())
376
0
        << "Response: " << ql_resp.ShortDebugString();
377
0
    ASSERT_TRUE(ql_resp.has_rows_data_sidecar());
378
379
    // Retrieve row.
380
0
    ASSERT_TRUE(controller.finished());
381
0
    Slice rows_data = ASSERT_RESULT(controller.GetSidecar(ql_resp.rows_data_sidecar()));
382
0
    std::shared_ptr<std::vector<ColumnSchema>>
383
0
      columns = std::make_shared<std::vector<ColumnSchema>>(schema_.columns());
384
0
    yb::ql::RowsResult rowsResult(*table_name_, columns, rows_data.ToBuffer());
385
0
    *rowblock = rowsResult.GetRowBlock();
386
0
  }
387
388
  std::unique_ptr<YBClient> client_;
389
  YBSchema schema_;
390
  std::unique_ptr<YBTableName> table_name_;
391
  std::shared_ptr<YBTable> table_;
392
  std::unique_ptr<master::MasterClientProxy> proxy_;
393
  std::unique_ptr<rpc::Messenger> client_messenger_;
394
  std::unique_ptr<YBPartitionGenerator> partition_generator_;
395
  std::vector<std::string> master_addresses_;
396
  std::string master_addresses_comma_separated_;
397
  Random random_;
398
};
399
400
class YBBulkLoadTestWithoutRebalancing : public YBBulkLoadTest {
401
 public:
402
0
  void SetUp() override {
403
0
    FLAGS_enable_load_balancing = false;
404
0
    YBBulkLoadTest::SetUp();
405
0
  }
406
};
407
408
409
0
TEST_F(YBBulkLoadTest, VerifyPartitions) {
410
0
  for (int i = 0; i < kNumIterations; i++) {
411
0
    string tablet_id;
412
0
    string partition_key;
413
0
    ASSERT_OK(partition_generator_->LookupTabletId(GenerateRow(i), &tablet_id, &partition_key));
414
0
    VLOG(1) << "Got tablet id: " << tablet_id << ", partition key: " << partition_key;
415
416
0
    VerifyTabletIdPartitionKey(tablet_id, partition_key);
417
0
  }
418
0
}
419
420
0
TEST_F(YBBulkLoadTest, VerifyPartitionsWithIgnoredColumns) {
421
0
  const std::set<int> skipped_cols = tools::SkippedColumns("0,9");
422
0
  for (int i = 0; i < kNumIterations; i++) {
423
0
    string tablet_id;
424
0
    string partition_key;
425
0
    string row = GenerateRow(i);
426
0
    ASSERT_OK(partition_generator_->LookupTabletId(row, &tablet_id, &partition_key));
427
0
    VLOG(1) << "Got tablet id: " << tablet_id << ", partition key: " << partition_key;
428
429
0
    VerifyTabletIdPartitionKey(tablet_id, partition_key);
430
431
0
    string tablet_id2;
432
0
    string partition_key2;
433
0
    string row_with_extras = "foo," + row + ",bar";
434
0
    ASSERT_OK(partition_generator_->LookupTabletId(
435
0
        row_with_extras, skipped_cols, &tablet_id2, &partition_key2));
436
0
    ASSERT_EQ(tablet_id, tablet_id2);
437
0
    ASSERT_EQ(partition_key, partition_key2);
438
0
  }
439
0
}
440
441
TEST_F(YBBulkLoadTest, TestTokenizer) {
442
  {
443
    // JSON needs to be enclosed in quotes, so that the internal commas are not treated as different
444
    // columns. Need to escape the quotes within to ensure that they are not eaten up.
445
    string str ="1,2017-06-17 14:47:00,\"abc,;xyz\","
446
                 "\"{\\\"a\\\":\\\"foo\\\",\\\"b\\\":\\\"bar\\\"}\"";
447
    CsvTokenizer tokenizer = Tokenize(str);
448
    auto it = tokenizer.begin();
449
    ASSERT_EQ(*it++, "1");
450
    ASSERT_EQ(*it++, "2017-06-17 14:47:00");
451
    ASSERT_EQ(*it++, "abc,;xyz");
452
    ASSERT_EQ(*it++, "{\"a\":\"foo\",\"b\":\"bar\"}");
453
    ASSERT_EQ(it, tokenizer.end());
454
  }
455
456
  {
457
    // Separating fields with ';'. No need to enclose JSON in quotes. Internal quotes still need
458
    // to be escapted to prevent being consumed.
459
    string str ="1;2017-06-17 14:47:00;\"abc,;xyz\";"
460
                "{\\\"a\\\":\\\"foo\\\",\\\"b\\\":\\\"bar\\\"}";
461
    CsvTokenizer tokenizer = Tokenize(str, ';', '\"');
462
    auto it = tokenizer.begin();
463
    ASSERT_EQ(*it++, "1");
464
    ASSERT_EQ(*it++, "2017-06-17 14:47:00");
465
    ASSERT_EQ(*it++, "abc,;xyz");
466
    ASSERT_EQ(*it++, "{\"a\":\"foo\",\"b\":\"bar\"}");
467
    ASSERT_EQ(it, tokenizer.end());
468
  }
469
470
  {
471
    string str ="1,2017-06-17 14:47:00,'abc,;xyz',"
472
                 "'{\"a\":\"foo\",\"b\":\"bar\"}'";
473
    // No need to escape quotes because the quote character is \'
474
    CsvTokenizer tokenizer = Tokenize(str, ',', '\'');
475
    auto it = tokenizer.begin();
476
    ASSERT_EQ(*it++, "1");
477
    ASSERT_EQ(*it++, "2017-06-17 14:47:00");
478
    ASSERT_EQ(*it++, "abc,;xyz");
479
    ASSERT_EQ(*it++, "{\"a\":\"foo\",\"b\":\"bar\"}");
480
    ASSERT_EQ(it, tokenizer.end());
481
  }
482
  {
483
    string str ="1,2017-06-17 14:47:00,'abc,;xyz',"
484
                "\\\\n";
485
    // No need to escape quotes because the quote character is \'
486
    CsvTokenizer tokenizer = Tokenize(str, ',', '\'');
487
    auto it = tokenizer.begin();
488
    ASSERT_EQ(*it++, "1");
489
    ASSERT_EQ(*it++, "2017-06-17 14:47:00");
490
    ASSERT_EQ(*it++, "abc,;xyz");
491
    ASSERT_EQ(*it++, "\\n");
492
    ASSERT_EQ(it, tokenizer.end());
493
  }
494
}
495
496
0
TEST_F(YBBulkLoadTest, InvalidLines) {
497
0
  string tablet_id;
498
0
  string partition_key;
499
  // Not enough hash columns.
500
0
  ASSERT_NOK(partition_generator_->LookupTabletId("1", &tablet_id, &partition_key));
501
502
  // Null primary keys.
503
0
  ASSERT_NOK(partition_generator_->LookupTabletId("1,\\n", &tablet_id, &partition_key));
504
0
  ASSERT_NOK(partition_generator_->LookupTabletId("1,null", &tablet_id, &partition_key));
505
0
  ASSERT_NOK(partition_generator_->LookupTabletId("1,NULL", &tablet_id, &partition_key));
506
507
  // Invalid types.
508
0
  ASSERT_NOK(partition_generator_->LookupTabletId("abc,123", &tablet_id, &partition_key));
509
0
  ASSERT_NOK(partition_generator_->LookupTabletId("123,abc", &tablet_id, &partition_key));
510
0
  ASSERT_NOK(partition_generator_->LookupTabletId("123.1,123", &tablet_id, &partition_key));
511
0
  ASSERT_NOK(partition_generator_->LookupTabletId("123,123.2", &tablet_id, &partition_key));
512
0
}
513
514
0
TEST_F_EX(YBBulkLoadTest, TestCLITool, YBBulkLoadTestWithoutRebalancing) {
515
0
  string exe_path = GetToolPath(kPartitionToolName);
516
0
  vector<string> argv = {kPartitionToolName, "-master_addresses", master_addresses_comma_separated_,
517
0
      "-table_name", kTableName, "-namespace_name", kNamespace};
518
0
  FILE *out;
519
0
  FILE *in;
520
0
  std::unique_ptr<Subprocess> partition_process;
521
0
  ASSERT_OK(StartProcessAndGetStreams(exe_path, argv, &out, &in, &partition_process));
522
523
  // Write multiple lines.
524
0
  vector <string> generated_rows;
525
0
  vector <string> mapper_output;
526
0
  std::map<string, vector<string>> tabletid_to_line;
527
0
  for (int i = 0; i < kNumIterations; i++) {
528
    // Write the input line.
529
0
    string row = GenerateRow(i) + "\n";
530
0
    generated_rows.push_back(row);
531
0
    ASSERT_GT(fputs(row.c_str(), out), 0);
532
0
    ASSERT_EQ(0, fflush(out));
533
534
    // Read the output line.
535
0
    char buf[1024];
536
0
    ASSERT_EQ(buf, fgets(buf, sizeof(buf), in));
537
0
    mapper_output.push_back(string(buf));
538
539
    // Split based on tab.
540
0
    vector<string> tokens;
541
0
    boost::split(tokens, buf, boost::is_any_of("\t"));
542
0
    ASSERT_EQ(2, tokens.size());
543
0
    const string& tablet_id = tokens[0];
544
0
    ASSERT_EQ(generated_rows[i], tokens[1]);
545
0
    ASSERT_EQ(tokens[1][tokens[1].length() -1], '\n');
546
0
    boost::trim_right(tokens[1]); // remove the trailing '\n'
547
0
    const string& line = tokens[1];
548
0
    auto it = tabletid_to_line.find(tablet_id);
549
0
    if (it != tabletid_to_line.end()) {
550
0
      (*it).second.push_back(line);
551
0
    } else {
552
0
      tabletid_to_line[tablet_id].push_back(line);
553
0
    }
554
555
    // Verify tablet id and original line.
556
0
    master::TabletLocationsPB tablet_location;
557
0
    VerifyTabletId(tablet_id, &tablet_location);
558
0
  }
559
560
0
  CloseStreamsAndWaitForProcess(out, in, partition_process.get());
561
562
  // Now lets sort the output and pipe it to the bulk load tool.
563
0
  std::sort(mapper_output.begin(), mapper_output.end());
564
565
  // Start the bulk load tool.
566
0
  string test_dir;
567
0
  Env* env = Env::Default();
568
0
  ASSERT_OK(env->GetTestDirectory(&test_dir));
569
0
  string bulk_load_data = JoinPathSegments(test_dir, "bulk_load_data");
570
0
  if (env->FileExists(bulk_load_data)) {
571
0
    ASSERT_OK(env->DeleteRecursively(bulk_load_data));
572
0
  }
573
0
  ASSERT_OK(env->CreateDir(bulk_load_data));
574
575
0
  string bulk_load_exec = GetToolPath(kBulkLoadToolName);
576
  // -row_batch_size and -flush_batch_for_tests used to ensure we have multiple flushed files per
577
  // tablet which ensures we would compact some files.
578
0
  vector<string> bulk_load_argv = {
579
0
      kBulkLoadToolName,
580
0
      "-master_addresses", master_addresses_comma_separated_,
581
0
      "-table_name", kTableName,
582
0
      "-namespace_name", kNamespace,
583
0
      "-base_dir", bulk_load_data,
584
0
      "-initial_seqno", "0",
585
0
      "-row_batch_size", std::to_string(kNumIterations/kNumTablets/10),
586
0
      "-bulk_load_num_files_per_tablet", std::to_string(kNumFilesPerTablet),
587
0
      "-flush_batch_for_tests"
588
0
  };
589
590
0
  std::unique_ptr<Subprocess> bulk_load_process;
591
0
  ASSERT_OK(StartProcessAndGetStreams(bulk_load_exec, bulk_load_argv, &out, &in,
592
0
                &bulk_load_process));
593
594
0
  for (size_t i = 0; i < mapper_output.size(); i++) {
595
    // Write the input line.
596
0
    ASSERT_GT(fprintf(out, "%s", mapper_output[i].c_str()), 0);
597
0
    ASSERT_EQ(0, fflush(out));
598
0
  }
599
600
0
  CloseStreamsAndWaitForProcess(out, in, bulk_load_process.get());
601
602
  // Verify we have all tablet ids in the bulk load directory.
603
0
  master::GetTableLocationsRequestPB req;
604
0
  master::GetTableLocationsResponsePB resp;
605
0
  rpc::RpcController controller;
606
607
0
  req.mutable_table()->set_table_name(table_name_->table_name());
608
0
  req.mutable_table()->mutable_namespace_()->set_name(kNamespace);
609
0
  req.set_max_returned_locations(kNumTablets);
610
0
  ASSERT_OK(proxy_->GetTableLocations(req, &resp, &controller));
611
0
  ASSERT_FALSE(resp.has_error());
612
0
  ASSERT_EQ(kNumTablets, resp.tablet_locations_size());
613
0
  client::TableHandle table;
614
0
  ASSERT_OK(table.Open(*table_name_, client_.get()));
615
616
0
  for (const master::TabletLocationsPB& tablet_location : resp.tablet_locations()) {
617
0
    const string& tablet_id = tablet_location.tablet_id();
618
0
    string tablet_path = JoinPathSegments(bulk_load_data, tablet_id);
619
0
    ASSERT_TRUE(env->FileExists(tablet_path));
620
621
    // Verify atmost 'bulk_load_num_files_per_tablet' files.
622
0
    vector <string> tablet_files;
623
0
    ASSERT_OK(env->GetChildren(tablet_path, &tablet_files));
624
0
    size_t num_files = 0;
625
0
    for (const string& tablet_file : tablet_files) {
626
0
      if (boost::algorithm::ends_with(tablet_file, ".sst")) {
627
0
        num_files++;
628
0
      }
629
0
    }
630
0
    ASSERT_GE(kNumFilesPerTablet, num_files);
631
632
0
    HostPort leader_tserver;
633
0
    for (const master::TabletLocationsPB::ReplicaPB& replica : tablet_location.replicas()) {
634
0
      if (replica.role() == PeerRole::LEADER) {
635
0
        leader_tserver = HostPortFromPB(replica.ts_info().private_rpc_addresses(0));
636
0
        break;
637
0
      }
638
0
    }
639
640
0
    rpc::ProxyCache proxy_cache(client_messenger_.get());
641
0
    auto tserver_proxy = std::make_unique<tserver::TabletServerServiceProxy>(&proxy_cache,
642
0
                                                                             leader_tserver);
643
644
    // Start the load generator to ensure we can import files with running load.
645
0
    LoadGenerator load_generator(tablet_id, &table, tserver_proxy.get());
646
0
    std::thread load_thread(&LoadGenerator::RunLoad, &load_generator);
647
    // Wait for load generator to generate some traffic.
648
0
    SleepFor(MonoDelta::FromSeconds(5));
649
650
    // Import the data into the tserver.
651
0
    tserver::ImportDataRequestPB import_req;
652
0
    import_req.set_tablet_id(tablet_id);
653
0
    import_req.set_source_dir(tablet_path);
654
0
    tserver::ImportDataResponsePB import_resp;
655
0
    rpc::RpcController controller;
656
0
    ASSERT_OK(tserver_proxy->ImportData(import_req, &import_resp, &controller));
657
0
    ASSERT_FALSE(import_resp.has_error()) << import_resp.DebugString();
658
659
0
    for (const string& row : tabletid_to_line[tablet_id]) {
660
      // Build read request.
661
0
      tserver::ReadRequestPB req;
662
0
      req.set_tablet_id(tablet_id);
663
0
      QLReadRequestPB* ql_req = req.mutable_ql_batch()->Add();
664
0
      ASSERT_OK(CreateQLReadRequest(row, ql_req));
665
666
0
      std::unique_ptr<QLRowBlock> rowblock;
667
0
      PerformRead(req, tserver_proxy.get(), &rowblock);
668
669
      // Validate row.
670
0
      ASSERT_EQ(1, rowblock->row_count());
671
0
      const QLRow& ql_row = rowblock->row(0);
672
0
      ASSERT_EQ(schema_.num_columns(), ql_row.column_count());
673
0
      ValidateRow(row, ql_row);
674
0
    }
675
676
    // Perform a SELECT * and verify the number of rows present in the tablet is what we expected.
677
0
    tserver::ReadRequestPB req;
678
0
    tserver::ReadResponsePB resp;
679
0
    req.set_tablet_id(tablet_id);
680
0
    QLReadRequestPB* ql_req = req.mutable_ql_batch()->Add();
681
0
    QLConditionPB* condition = ql_req->mutable_where_expr()->mutable_condition();
682
0
    condition->set_op(QLOperator::QL_OP_EQUAL);
683
0
    condition->add_operands()->set_column_id(kFirstColumnId + kV2Index);
684
    // kV2Value is common across all rows in the tablet and hence we use that value to verify the
685
    // expected number of rows. Note that since we have a parallel load tester running, we can't
686
    // validate the total number of rows in the DB.
687
0
    condition->add_operands()->mutable_value()->set_int32_value(kV2Value);
688
689
    // Set all column ids.
690
0
    QLRSRowDescPB *rsrow_desc = ql_req->mutable_rsrow_desc();
691
0
    for (size_t i = 0; i < table_->InternalSchema().num_columns(); i++) {
692
0
      ql_req->mutable_column_refs()->add_ids(narrow_cast<int32_t>(kFirstColumnId + i));
693
0
      ql_req->add_selected_exprs()->set_column_id(narrow_cast<int32_t>(kFirstColumnId + i));
694
695
0
      const ColumnSchema& col = table_->InternalSchema().column(i);
696
0
      QLRSColDescPB *rscol_desc = rsrow_desc->add_rscol_descs();
697
0
      rscol_desc->set_name(col.name());
698
0
      col.type()->ToQLTypePB(rscol_desc->mutable_ql_type());
699
0
    }
700
701
0
    std::unique_ptr<QLRowBlock> rowblock;
702
0
    PerformRead(req, tserver_proxy.get(), &rowblock);
703
0
    ASSERT_EQ(tabletid_to_line[tablet_id].size(), rowblock->row_count());
704
705
    // Stop and join load generator.
706
0
    load_generator.StopLoad();
707
0
    load_thread.join();
708
0
  }
709
0
}
710
711
} // namespace tools
712
} // namespace yb