YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tools/yb-bulk_load.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <thread>
15
16
#include <boost/algorithm/string.hpp>
17
#include <gflags/gflags.h>
18
#include <glog/logging.h>
19
20
#include "yb/client/client.h"
21
#include "yb/client/table.h"
22
23
#include "yb/common/entity_ids.h"
24
#include "yb/common/hybrid_time.h"
25
#include "yb/common/jsonb.h"
26
#include "yb/common/ql_protocol.pb.h"
27
#include "yb/common/ql_value.h"
28
#include "yb/common/partition.h"
29
#include "yb/common/schema.h"
30
#include "yb/common/wire_protocol.h"
31
32
#include "yb/docdb/cql_operation.h"
33
#include "yb/docdb/doc_operation.h"
34
35
#include "yb/master/master_client.pb.h"
36
#include "yb/master/master_util.h"
37
38
#include "yb/rocksdb/db.h"
39
#include "yb/rocksdb/options.h"
40
41
#include "yb/rpc/messenger.h"
42
#include "yb/rpc/proxy.h"
43
#include "yb/rpc/rpc_controller.h"
44
45
#include "yb/tools/bulk_load_docdb_util.h"
46
#include "yb/tools/bulk_load_utils.h"
47
#include "yb/tools/yb-generate_partitions.h"
48
49
#include "yb/tserver/tserver_service.proxy.h"
50
51
#include "yb/util/env.h"
52
#include "yb/util/flags.h"
53
#include "yb/util/logging.h"
54
#include "yb/util/size_literals.h"
55
#include "yb/util/status.h"
56
#include "yb/util/status_format.h"
57
#include "yb/util/status_log.h"
58
#include "yb/util/stol_utils.h"
59
#include "yb/util/subprocess.h"
60
#include "yb/util/threadpool.h"
61
62
using std::pair;
63
using std::string;
64
using std::shared_ptr;
65
using std::unique_ptr;
66
using std::vector;
67
using yb::client::YBClient;
68
using yb::client::YBClientBuilder;
69
using yb::client::YBTable;
70
using yb::client::YBTableName;
71
using yb::docdb::DocWriteBatch;
72
using yb::docdb::InitMarkerBehavior;
73
using yb::operator"" _GB;
74
75
DEFINE_string(master_addresses, "", "Comma-separated list of YB Master server addresses");
76
DEFINE_string(table_name, "", "Name of the table to generate partitions for");
77
DEFINE_string(namespace_name, "", "Namespace of the table");
78
DEFINE_string(base_dir, "", "Base directory where we will store all the SSTable files");
79
DEFINE_int64(memtable_size_bytes, 1_GB, "Amount of bytes to use for the rocksdb memtable");
80
DEFINE_uint64(row_batch_size, 1000, "The number of rows to batch together in each rocksdb write");
81
DEFINE_bool(flush_batch_for_tests, false, "Option used only in tests to flush after each batch. "
82
    "Used to generate multiple SST files in conjuction with small row_batch_size");
83
DEFINE_string(bulk_load_helper_script, "./bulk_load_helper.sh", "Relative path for bulk load helper"
84
              " script");
85
DEFINE_string(bulk_load_cleanup_script, "./bulk_load_cleanup.sh", "Relative path for bulk load "
86
              "cleanup script");
87
DEFINE_string(ssh_key_file, "", "SSH key to push SSTable files to production cluster");
88
DEFINE_bool(export_files, false, "Whether or not the files should be exported to a production "
89
            "cluster.");
90
DEFINE_int32(bulk_load_num_threads, 16, "Number of threads to use for bulk load");
91
DEFINE_int32(bulk_load_threadpool_queue_size, 10000,
92
             "Maximum number of entries to queue in the threadpool");
93
DEFINE_int32(bulk_load_num_memtables, 3, "Number of memtables to use for rocksdb");
94
DEFINE_int32(bulk_load_max_background_flushes, 2, "Number of flushes to perform in the background");
95
DEFINE_uint64(bulk_load_num_files_per_tablet, 5,
96
              "Determines how to compact the data of a tablet to ensure we have only a certain "
97
              "number of sst files per tablet");
98
99
DECLARE_string(skipped_cols);
100
101
namespace yb {
102
namespace tools {
103
104
namespace {
105
106
class BulkLoadTask : public Runnable {
107
 public:
108
  BulkLoadTask(vector<pair<TabletId, string>> rows, BulkLoadDocDBUtil *db_fixture,
109
               const YBTable *table, YBPartitionGenerator *partition_generator);
110
  void Run();
111
 private:
112
  CHECKED_STATUS PopulateColumnValue(const string &column,
113
                                     const DataType data_type,
114
                                     QLExpressionPB *column_value);
115
  CHECKED_STATUS InsertRow(const string &row,
116
                           const Schema &schema,
117
                           const IndexMap& index_map,
118
                           BulkLoadDocDBUtil *const db_fixture,
119
                           docdb::DocWriteBatch *const doc_write_batch,
120
                           YBPartitionGenerator *const partition_generator);
121
  vector<pair<TabletId, string>> rows_;
122
  const std::set<int> skipped_cols_;
123
  BulkLoadDocDBUtil *const db_fixture_;
124
  const YBTable *const table_;
125
  YBPartitionGenerator *const partition_generator_;
126
};
127
128
class CompactionTask: public Runnable {
129
 public:
130
  CompactionTask(const vector<string>& sst_filenames, BulkLoadDocDBUtil* db_fixture);
131
  void Run();
132
 private:
133
  vector <string> sst_filenames_;
134
  BulkLoadDocDBUtil *const db_fixture_;
135
};
136
137
class BulkLoad {
138
 public:
139
  CHECKED_STATUS RunBulkLoad();
140
141
 private:
142
  CHECKED_STATUS InitYBBulkLoad();
143
  CHECKED_STATUS InitDBUtil(const TabletId &tablet_id);
144
  CHECKED_STATUS FinishTabletProcessing(const TabletId &tablet_id,
145
                                        vector<pair<TabletId, string>> rows);
146
  CHECKED_STATUS RetryableSubmit(vector<pair<TabletId, string>> rows);
147
  CHECKED_STATUS CompactFiles();
148
149
  std::unique_ptr<YBClient> client_;
150
  shared_ptr<YBTable> table_;
151
  unique_ptr<YBPartitionGenerator> partition_generator_;
152
  std::unique_ptr<ThreadPool> thread_pool_;
153
  unique_ptr<BulkLoadDocDBUtil> db_fixture_;
154
};
155
156
CompactionTask::CompactionTask(const vector<string>& sst_filenames, BulkLoadDocDBUtil* db_fixture)
157
    : sst_filenames_(sst_filenames),
158
0
      db_fixture_(db_fixture) {
159
0
}
160
161
0
void CompactionTask::Run() {
162
0
  if (sst_filenames_.size() == 1) {
163
0
    LOG(INFO) << "Skipping compaction since we have only a single file: " << sst_filenames_[0];
164
0
    return;
165
0
  }
166
167
0
  LOG(INFO) << "Compacting files: " << ToString(sst_filenames_);
168
0
  CHECK_OK(db_fixture_->rocksdb()->CompactFiles(rocksdb::CompactionOptions(),
169
0
                                                sst_filenames_,
170
0
                                                /* output_level */ 0));
171
0
}
172
173
BulkLoadTask::BulkLoadTask(vector<pair<TabletId, string>> rows,
174
                           BulkLoadDocDBUtil *db_fixture, const YBTable *table,
175
                           YBPartitionGenerator *partition_generator)
176
    : rows_(std::move(rows)),
177
      skipped_cols_(tools::SkippedColumns()),
178
      db_fixture_(db_fixture),
179
      table_(table),
180
0
      partition_generator_(partition_generator) {
181
0
}
182
183
0
void BulkLoadTask::Run() {
184
0
  DocWriteBatch doc_write_batch(docdb::DocDB::FromRegularUnbounded(db_fixture_->rocksdb()),
185
0
                                InitMarkerBehavior::kOptional);
186
187
0
  for (const auto &entry : rows_) {
188
0
    const string &row = entry.second;
189
190
    // Populate the row.
191
0
    CHECK_OK(InsertRow(row, table_->InternalSchema(), table_->index_map(), db_fixture_,
192
0
                       &doc_write_batch, partition_generator_));
193
0
  }
194
195
  // Flush the batch.
196
0
  CHECK_OK(db_fixture_->WriteToRocksDB(
197
0
      doc_write_batch, HybridTime::FromMicros(kYugaByteMicrosecondEpoch),
198
0
      /* decode_dockey */ false, /* increment_write_id */ false));
199
200
0
  if (FLAGS_flush_batch_for_tests) {
201
0
    CHECK_OK(db_fixture_->FlushRocksDbAndWait());
202
0
  }
203
0
}
204
205
Status BulkLoadTask::PopulateColumnValue(const string &column,
206
                                         const DataType data_type,
207
0
                                         QLExpressionPB *column_value) {
208
0
  auto ql_valuepb = column_value->mutable_value();
209
0
  switch (data_type) {
210
0
    YB_SET_INT_VALUE(ql_valuepb, column, 8);
211
0
    YB_SET_INT_VALUE(ql_valuepb, column, 16);
212
0
    YB_SET_INT_VALUE(ql_valuepb, column, 32);
213
0
    YB_SET_INT_VALUE(ql_valuepb, column, 64);
214
0
    case DataType::FLOAT: {
215
0
      auto value = CheckedStold(column);
216
0
      RETURN_NOT_OK(value);
217
0
      ql_valuepb->set_float_value(*value);
218
0
      break;
219
0
    }
220
0
    case DataType::DOUBLE: {
221
0
      auto value = CheckedStold(column);
222
0
      RETURN_NOT_OK(value);
223
0
      ql_valuepb->set_double_value(*value);
224
0
      break;
225
0
    }
226
0
    case DataType::STRING: {
227
0
      ql_valuepb->set_string_value(column);
228
0
      break;
229
0
    }
230
0
    case DataType::JSONB: {
231
0
      common::Jsonb jsonb;
232
0
      RETURN_NOT_OK(jsonb.FromString(column));
233
0
      ql_valuepb->set_jsonb_value(jsonb.MoveSerializedJsonb());
234
0
      break;
235
0
    }
236
0
    case DataType::TIMESTAMP: {
237
0
      auto ts = TimestampFromString(column);
238
0
      RETURN_NOT_OK(ts);
239
0
      ql_valuepb->set_timestamp_value(ts->ToInt64());
240
0
      break;
241
0
    }
242
0
    case DataType::BINARY: {
243
0
      ql_valuepb->set_binary_value(column);
244
0
      break;
245
0
    }
246
0
    default:
247
0
      FATAL_INVALID_ENUM_VALUE(DataType, data_type);
248
0
  }
249
0
  return Status::OK();
250
0
}
251
252
Status BulkLoadTask::InsertRow(const string &row,
253
                               const Schema &schema,
254
                               const IndexMap& index_map,
255
                               BulkLoadDocDBUtil *const db_fixture,
256
                               docdb::DocWriteBatch *const doc_write_batch,
257
0
                               YBPartitionGenerator *const partition_generator) {
258
  // Get individual columns.
259
0
  CsvTokenizer tokenizer = Tokenize(row);
260
0
  size_t ncolumns = std::distance(tokenizer.begin(), tokenizer.end());
261
0
  if (ncolumns != schema.num_columns()) {
262
0
    return STATUS_SUBSTITUTE(IllegalState, "row '$0' has $1 columns, need exactly $2", row,
263
0
                             ncolumns, schema.num_columns());
264
0
  }
265
266
0
  QLResponsePB resp;
267
0
  QLWriteRequestPB req;
268
0
  req.set_type(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT);
269
0
  req.set_client(YQL_CLIENT_CQL);
270
271
0
  int col_id = 0;
272
0
  auto it = tokenizer.begin();
273
  // Process the hash keys first.
274
0
  for (size_t i = 0; i < schema.num_key_columns(); it++, col_id++) {
275
0
    if (skipped_cols_.find(col_id) != skipped_cols_.end()) {
276
0
      continue;
277
0
    }
278
0
    if (IsNull(*it)) {
279
0
      return STATUS_SUBSTITUTE(IllegalState, "Primary key cannot be null: $0", *it);
280
0
    }
281
282
0
    QLExpressionPB *column_value = nullptr;
283
0
    if (schema.is_hash_key_column(i)) {
284
0
      column_value = req.add_hashed_column_values();
285
0
    } else {
286
0
      column_value = req.add_range_column_values();
287
0
    }
288
289
0
    RETURN_NOT_OK(PopulateColumnValue(*it, schema.column(i).type_info()->type, column_value));
290
0
    i++;  // Avoid this if we are skipping the column.
291
0
  }
292
293
  // Finally process the regular columns.
294
0
  for (auto i = schema.num_key_columns(); i < schema.num_columns(); it++, col_id++) {
295
0
    if (skipped_cols_.find(col_id) != skipped_cols_.end()) {
296
0
      continue;
297
0
    }
298
0
    QLColumnValuePB *column_value = req.add_column_values();
299
0
    column_value->set_column_id(narrow_cast<int32_t>(kFirstColumnId + i));
300
0
    if (IsNull(*it)) {
301
      // Use empty value for null.
302
0
      column_value->mutable_expr()->mutable_value();
303
0
    } else {
304
0
      RETURN_NOT_OK(PopulateColumnValue(*it, schema.column(i).type_info()->type,
305
0
                                        column_value->mutable_expr()));
306
0
    }
307
0
    i++;  // Avoid this if we are skipping the column.
308
0
  }
309
310
  // Add the hash code to the operation.
311
0
  string tablet_id;
312
0
  string partition_key;
313
0
  RETURN_NOT_OK(partition_generator->LookupTabletIdWithTokenizer(
314
0
      tokenizer, skipped_cols_, &tablet_id, &partition_key));
315
0
  req.set_hash_code(PartitionSchema::DecodeMultiColumnHashValue(partition_key));
316
317
  // Finally apply the operation to the doc_write_batch.
318
  // TODO(dtxn) pass correct TransactionContext.
319
  // Comment from PritamD: Don't need cross shard transaction support in bulk load, but I guess
320
  // once we have secondary indexes we probably might need to ensure bulk load builds the indexes
321
  // as well.
322
0
  docdb::QLWriteOperation op(
323
0
      req, std::shared_ptr<const Schema>(&schema, [](const Schema*){}),
324
0
      index_map, nullptr /* unique_index_key_schema */, TransactionOperationContext());
325
0
  RETURN_NOT_OK(op.Init(&resp));
326
0
  RETURN_NOT_OK(op.Apply({
327
0
      doc_write_batch,
328
0
      CoarseTimePoint::max() /* deadline */,
329
0
      ReadHybridTime::SingleTime(HybridTime::FromMicros(kYugaByteMicrosecondEpoch))}));
330
0
  return Status::OK();
331
0
}
332
333
334
0
Status BulkLoad::RetryableSubmit(vector<pair<TabletId, string>> rows) {
335
0
  auto runnable = std::make_shared<BulkLoadTask>(
336
0
      std::move(rows), db_fixture_.get(), table_.get(), partition_generator_.get());
337
338
0
  Status s;
339
0
  do {
340
0
    s = thread_pool_->Submit(runnable);
341
342
0
    if (!s.IsServiceUnavailable()) {
343
0
      return s;
344
0
    }
345
346
0
    LOG (ERROR) << "Failed submitting task, sleeping for a while: " << s.ToString();
347
348
    // If service is unavailable, the queue might be full. Sleep and try again.
349
0
    SleepFor(MonoDelta::FromSeconds(10));
350
0
  } while (!s.ok());
351
352
0
  return Status::OK();
353
0
}
354
355
0
Status BulkLoad::CompactFiles() {
356
0
  std::vector<rocksdb::LiveFileMetaData> live_files_metadata;
357
0
  db_fixture_->rocksdb()->GetLiveFilesMetaData(&live_files_metadata);
358
0
  if (live_files_metadata.empty()) {
359
0
    return STATUS(IllegalState, "Need atleast one sst file");
360
0
  }
361
362
  // Extract file names.
363
0
  vector<string> sst_files;
364
0
  sst_files.reserve(live_files_metadata.size());
365
0
  for (const rocksdb::LiveFileMetaData& file : live_files_metadata) {
366
0
    sst_files.push_back(file.name);
367
0
  }
368
369
  // Batch the files for compaction.
370
0
  size_t batch_size = sst_files.size() / FLAGS_bulk_load_num_files_per_tablet;
371
  // We need to perform compactions only if we have more than 'bulk_load_num_files_per_tablet'
372
  // files.
373
0
  if (batch_size != 0) {
374
0
    auto start_iter = sst_files.begin();
375
0
    for (size_t i = 0; i < FLAGS_bulk_load_num_files_per_tablet; i++) {
376
      // Sanity check.
377
0
      CHECK_GE(std::distance(start_iter, sst_files.end()), batch_size);
378
379
      // Include the remaining files for the last batch.
380
0
      auto end_iter = (i == FLAGS_bulk_load_num_files_per_tablet - 1) ? sst_files.end()
381
0
                                                                      : start_iter + batch_size;
382
0
      auto runnable = std::make_shared<CompactionTask>(vector<string>(start_iter, end_iter),
383
0
                                                       db_fixture_.get());
384
0
      RETURN_NOT_OK(thread_pool_->Submit(runnable));
385
0
      start_iter = end_iter;
386
0
    }
387
388
    // Finally wait for all compactions to finish.
389
0
    thread_pool_->Wait();
390
391
    // Reopen rocksdb to clean up deleted files.
392
0
    return db_fixture_->ReopenRocksDB();
393
0
  }
394
0
  return Status::OK();
395
0
}
396
397
Status BulkLoad::FinishTabletProcessing(const TabletId &tablet_id,
398
0
                                        vector<pair<TabletId, string>> rows) {
399
0
  if (!db_fixture_) {
400
    // Skip processing since db_fixture wasn't initialized indicating empty input.
401
0
    return Status::OK();
402
0
  }
403
404
  // Submit all the work.
405
0
  RETURN_NOT_OK(RetryableSubmit(std::move(rows)));
406
407
  // Wait for all tasks for the tablet to complete.
408
0
  thread_pool_->Wait();
409
410
  // Now flush the DB.
411
0
  RETURN_NOT_OK(db_fixture_->FlushRocksDbAndWait());
412
413
  // Perform the necessary compactions.
414
0
  RETURN_NOT_OK(CompactFiles());
415
416
0
  if (!FLAGS_export_files) {
417
0
    return Status::OK();
418
0
  }
419
420
  // Find replicas for the tablet.
421
0
  master::TabletLocationsPB tablet_locations;
422
0
  RETURN_NOT_OK(client_->GetTabletLocation(tablet_id, &tablet_locations));
423
0
  string csv_replicas;
424
0
  std::map<string, int32_t> host_to_rpcport;
425
0
  for (const master::TabletLocationsPB_ReplicaPB &replica : tablet_locations.replicas()) {
426
0
    if (!csv_replicas.empty()) {
427
0
      csv_replicas += ",";
428
0
    }
429
0
    const string &host = replica.ts_info().private_rpc_addresses(0).host();
430
0
    csv_replicas += host;
431
0
    host_to_rpcport[host] = replica.ts_info().private_rpc_addresses(0).port();
432
0
  }
433
434
  // Invoke the bulk_load_helper script.
435
0
  vector<string> argv = {FLAGS_bulk_load_helper_script, "-t", tablet_id, "-r", csv_replicas, "-i",
436
0
      FLAGS_ssh_key_file, "-d", db_fixture_->rocksdb_dir()};
437
0
  string bulk_load_helper_stdout;
438
0
  RETURN_NOT_OK(Subprocess::Call(argv, &bulk_load_helper_stdout));
439
440
  // Trim the output.
441
0
  boost::trim(bulk_load_helper_stdout);
442
0
  LOG(INFO) << "Helper script stdout: " << bulk_load_helper_stdout;
443
444
  // Finalize the import.
445
0
  rpc::MessengerBuilder bld("Client");
446
0
  std::unique_ptr<rpc::Messenger> client_messenger = VERIFY_RESULT(bld.Build());
447
0
  rpc::ProxyCache proxy_cache(client_messenger.get());
448
0
  vector<string> lines;
449
0
  boost::split(lines, bulk_load_helper_stdout, boost::is_any_of("\n"));
450
0
  for (const string &line : lines) {
451
0
    vector<string> tokens;
452
0
    boost::split(tokens, line, boost::is_any_of(","));
453
0
    if (tokens.size() != 2) {
454
0
      return STATUS_SUBSTITUTE(InvalidArgument, "Invalid line $0", line);
455
0
    }
456
0
    const string &replica_host = tokens[0];
457
0
    const string &directory = tokens[1];
458
0
    HostPort hostport(replica_host, host_to_rpcport[replica_host]);
459
460
0
    tserver::TabletServerServiceProxy proxy(&proxy_cache, hostport);
461
0
    tserver::ImportDataRequestPB req;
462
0
    req.set_tablet_id(tablet_id);
463
0
    req.set_source_dir(directory);
464
465
0
    tserver::ImportDataResponsePB resp;
466
0
    rpc::RpcController controller;
467
0
    LOG(INFO) << "Importing " << directory << " on " << replica_host << " for tablet_id: "
468
0
              << tablet_id;
469
0
    RETURN_NOT_OK(proxy.ImportData(req, &resp, &controller));
470
0
    if (resp.has_error()) {
471
0
      RETURN_NOT_OK(StatusFromPB(resp.error().status()));
472
0
    }
473
474
    // Now cleanup the files from the production tserver.
475
0
    vector<string> cleanup_script = {FLAGS_bulk_load_cleanup_script, "-d", directory, "-t",
476
0
        replica_host, "-i", FLAGS_ssh_key_file};
477
0
    RETURN_NOT_OK(Subprocess::Call(cleanup_script));
478
0
  }
479
480
  // Delete the data once the import is done.
481
0
  return yb::Env::Default()->DeleteRecursively(db_fixture_->rocksdb_dir());
482
0
}
483
484
485
0
CHECKED_STATUS BulkLoad::InitDBUtil(const TabletId &tablet_id) {
486
0
  db_fixture_.reset(new BulkLoadDocDBUtil(tablet_id, FLAGS_base_dir,
487
0
                                          FLAGS_memtable_size_bytes,
488
0
                                          FLAGS_bulk_load_num_memtables,
489
0
                                          FLAGS_bulk_load_max_background_flushes));
490
0
  RETURN_NOT_OK(db_fixture_->InitRocksDBOptions());
491
0
  RETURN_NOT_OK(db_fixture_->DisableCompactions()); // This opens rocksdb.
492
0
  return Status::OK();
493
0
}
494
495
0
Status BulkLoad::InitYBBulkLoad() {
496
  // Convert table_name to lowercase since we store table names in lowercase.
497
0
  string table_name_lower = boost::to_lower_copy(FLAGS_table_name);
498
0
  YBTableName table_name(
499
0
      master::GetDefaultDatabaseType(FLAGS_namespace_name), FLAGS_namespace_name, table_name_lower);
500
501
0
  YBClientBuilder builder;
502
0
  builder.add_master_server_addr(FLAGS_master_addresses);
503
504
0
  client_ = VERIFY_RESULT(builder.Build());
505
0
  RETURN_NOT_OK(client_->OpenTable(table_name, &table_));
506
0
  partition_generator_.reset(new YBPartitionGenerator(table_name, {FLAGS_master_addresses}));
507
0
  RETURN_NOT_OK(partition_generator_->Init());
508
509
0
  db_fixture_ = nullptr;
510
0
  CHECK_OK(
511
0
      ThreadPoolBuilder("bulk_load_tasks")
512
0
          .set_min_threads(FLAGS_bulk_load_num_threads)
513
0
          .set_max_threads(FLAGS_bulk_load_num_threads)
514
0
          .set_max_queue_size(FLAGS_bulk_load_threadpool_queue_size)
515
0
          .set_idle_timeout(MonoDelta::FromMilliseconds(5000))
516
0
          .Build(&thread_pool_));
517
0
  return Status::OK();
518
0
}
519
520
521
0
Status BulkLoad::RunBulkLoad() {
522
523
0
  RETURN_NOT_OK(InitYBBulkLoad());
524
525
0
  TabletId current_tablet_id;
526
527
0
  vector<pair<TabletId, string>> rows;
528
0
  for (string line; std::getline(std::cin, line);) {
529
    // Trim the line.
530
0
    boost::algorithm::trim(line);
531
532
    // Get the key and value.
533
0
    std::size_t index = line.find("\t");
534
0
    if (index == std::string::npos) {
535
0
      return STATUS_SUBSTITUTE(IllegalState, "Invalid line: $0", line);
536
0
    }
537
0
    const TabletId tablet_id = line.substr(0, index);
538
0
    const string row = line.substr(index + 1, line.size() - (index + 1));
539
540
    // Reinitialize rocksdb if needed.
541
0
    if (current_tablet_id.empty() || current_tablet_id != tablet_id) {
542
      // Flush all of the data before opening a new rocksdb.
543
0
      RETURN_NOT_OK(FinishTabletProcessing(current_tablet_id, std::move(rows)));
544
0
      RETURN_NOT_OK(InitDBUtil(tablet_id));
545
0
    }
546
0
    current_tablet_id = tablet_id;
547
0
    rows.emplace_back(std::move(tablet_id), std::move(row));
548
549
    // Flush the batch if necessary.
550
0
    if (rows.size() >= FLAGS_row_batch_size) {
551
0
      RETURN_NOT_OK(RetryableSubmit(std::move(rows)));
552
0
    }
553
0
  }
554
555
  // Process last tablet.
556
0
  RETURN_NOT_OK(FinishTabletProcessing(current_tablet_id, std::move(rows)));
557
0
  return Status::OK();
558
0
}
559
560
} // anonymous namespace
561
562
} // namespace tools
563
} // namespace yb
564
565
int main(int argc, char** argv) {
566
  yb::ParseCommandLineFlags(&argc, &argv, true);
567
  yb::InitGoogleLoggingSafe(argv[0]);
568
  if (FLAGS_master_addresses.empty() || FLAGS_table_name.empty() || FLAGS_namespace_name.empty()
569
      || FLAGS_base_dir.empty()) {
570
    LOG(FATAL) << "Need to specify --master_addresses, --table_name, --namespace_name, "
571
        "--base_dir";
572
  }
573
574
  if (FLAGS_export_files && FLAGS_ssh_key_file.empty()) {
575
    LOG(FATAL) << "Need to specify --ssh_key_file with --export_files";
576
  }
577
578
  // Verify the bulk load path exists.
579
  if (!yb::Env::Default()->FileExists(FLAGS_base_dir)) {
580
    LOG(FATAL) << "Bulk load directory doesn't exist: " << FLAGS_base_dir;
581
  }
582
583
  if (FLAGS_bulk_load_num_files_per_tablet <= 0) {
584
    LOG(FATAL) << "--bulk_load_num_files_per_tablet needs to be greater than 0";
585
  }
586
587
  yb::tools::BulkLoad bulk_load;
588
  yb::Status s = bulk_load.RunBulkLoad();
589
  if (!s.ok()) {
590
    LOG(FATAL) << "Error running bulk load: " << s.ToString();
591
  }
592
  return 0;
593
}