/Users/deen/code/yugabyte-db/src/yb/tools/yb-bulk_load.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <thread> |
15 | | |
16 | | #include <boost/algorithm/string.hpp> |
17 | | #include <gflags/gflags.h> |
18 | | #include <glog/logging.h> |
19 | | |
20 | | #include "yb/client/client.h" |
21 | | #include "yb/client/table.h" |
22 | | |
23 | | #include "yb/common/entity_ids.h" |
24 | | #include "yb/common/hybrid_time.h" |
25 | | #include "yb/common/jsonb.h" |
26 | | #include "yb/common/ql_protocol.pb.h" |
27 | | #include "yb/common/ql_value.h" |
28 | | #include "yb/common/partition.h" |
29 | | #include "yb/common/schema.h" |
30 | | #include "yb/common/wire_protocol.h" |
31 | | |
32 | | #include "yb/docdb/cql_operation.h" |
33 | | #include "yb/docdb/doc_operation.h" |
34 | | |
35 | | #include "yb/master/master_client.pb.h" |
36 | | #include "yb/master/master_util.h" |
37 | | |
38 | | #include "yb/rocksdb/db.h" |
39 | | #include "yb/rocksdb/options.h" |
40 | | |
41 | | #include "yb/rpc/messenger.h" |
42 | | #include "yb/rpc/proxy.h" |
43 | | #include "yb/rpc/rpc_controller.h" |
44 | | |
45 | | #include "yb/tools/bulk_load_docdb_util.h" |
46 | | #include "yb/tools/bulk_load_utils.h" |
47 | | #include "yb/tools/yb-generate_partitions.h" |
48 | | |
49 | | #include "yb/tserver/tserver_service.proxy.h" |
50 | | |
51 | | #include "yb/util/env.h" |
52 | | #include "yb/util/flags.h" |
53 | | #include "yb/util/logging.h" |
54 | | #include "yb/util/size_literals.h" |
55 | | #include "yb/util/status.h" |
56 | | #include "yb/util/status_format.h" |
57 | | #include "yb/util/status_log.h" |
58 | | #include "yb/util/stol_utils.h" |
59 | | #include "yb/util/subprocess.h" |
60 | | #include "yb/util/threadpool.h" |
61 | | |
62 | | using std::pair; |
63 | | using std::string; |
64 | | using std::shared_ptr; |
65 | | using std::unique_ptr; |
66 | | using std::vector; |
67 | | using yb::client::YBClient; |
68 | | using yb::client::YBClientBuilder; |
69 | | using yb::client::YBTable; |
70 | | using yb::client::YBTableName; |
71 | | using yb::docdb::DocWriteBatch; |
72 | | using yb::docdb::InitMarkerBehavior; |
73 | | using yb::operator"" _GB; |
74 | | |
75 | | DEFINE_string(master_addresses, "", "Comma-separated list of YB Master server addresses"); |
76 | | DEFINE_string(table_name, "", "Name of the table to generate partitions for"); |
77 | | DEFINE_string(namespace_name, "", "Namespace of the table"); |
78 | | DEFINE_string(base_dir, "", "Base directory where we will store all the SSTable files"); |
79 | | DEFINE_int64(memtable_size_bytes, 1_GB, "Amount of bytes to use for the rocksdb memtable"); |
80 | | DEFINE_uint64(row_batch_size, 1000, "The number of rows to batch together in each rocksdb write"); |
81 | | DEFINE_bool(flush_batch_for_tests, false, "Option used only in tests to flush after each batch. " |
82 | | "Used to generate multiple SST files in conjuction with small row_batch_size"); |
83 | | DEFINE_string(bulk_load_helper_script, "./bulk_load_helper.sh", "Relative path for bulk load helper" |
84 | | " script"); |
85 | | DEFINE_string(bulk_load_cleanup_script, "./bulk_load_cleanup.sh", "Relative path for bulk load " |
86 | | "cleanup script"); |
87 | | DEFINE_string(ssh_key_file, "", "SSH key to push SSTable files to production cluster"); |
88 | | DEFINE_bool(export_files, false, "Whether or not the files should be exported to a production " |
89 | | "cluster."); |
90 | | DEFINE_int32(bulk_load_num_threads, 16, "Number of threads to use for bulk load"); |
91 | | DEFINE_int32(bulk_load_threadpool_queue_size, 10000, |
92 | | "Maximum number of entries to queue in the threadpool"); |
93 | | DEFINE_int32(bulk_load_num_memtables, 3, "Number of memtables to use for rocksdb"); |
94 | | DEFINE_int32(bulk_load_max_background_flushes, 2, "Number of flushes to perform in the background"); |
95 | | DEFINE_uint64(bulk_load_num_files_per_tablet, 5, |
96 | | "Determines how to compact the data of a tablet to ensure we have only a certain " |
97 | | "number of sst files per tablet"); |
98 | | |
99 | | DECLARE_string(skipped_cols); |
100 | | |
101 | | namespace yb { |
102 | | namespace tools { |
103 | | |
104 | | namespace { |
105 | | |
106 | | class BulkLoadTask : public Runnable { |
107 | | public: |
108 | | BulkLoadTask(vector<pair<TabletId, string>> rows, BulkLoadDocDBUtil *db_fixture, |
109 | | const YBTable *table, YBPartitionGenerator *partition_generator); |
110 | | void Run(); |
111 | | private: |
112 | | CHECKED_STATUS PopulateColumnValue(const string &column, |
113 | | const DataType data_type, |
114 | | QLExpressionPB *column_value); |
115 | | CHECKED_STATUS InsertRow(const string &row, |
116 | | const Schema &schema, |
117 | | const IndexMap& index_map, |
118 | | BulkLoadDocDBUtil *const db_fixture, |
119 | | docdb::DocWriteBatch *const doc_write_batch, |
120 | | YBPartitionGenerator *const partition_generator); |
121 | | vector<pair<TabletId, string>> rows_; |
122 | | const std::set<int> skipped_cols_; |
123 | | BulkLoadDocDBUtil *const db_fixture_; |
124 | | const YBTable *const table_; |
125 | | YBPartitionGenerator *const partition_generator_; |
126 | | }; |
127 | | |
128 | | class CompactionTask: public Runnable { |
129 | | public: |
130 | | CompactionTask(const vector<string>& sst_filenames, BulkLoadDocDBUtil* db_fixture); |
131 | | void Run(); |
132 | | private: |
133 | | vector <string> sst_filenames_; |
134 | | BulkLoadDocDBUtil *const db_fixture_; |
135 | | }; |
136 | | |
137 | | class BulkLoad { |
138 | | public: |
139 | | CHECKED_STATUS RunBulkLoad(); |
140 | | |
141 | | private: |
142 | | CHECKED_STATUS InitYBBulkLoad(); |
143 | | CHECKED_STATUS InitDBUtil(const TabletId &tablet_id); |
144 | | CHECKED_STATUS FinishTabletProcessing(const TabletId &tablet_id, |
145 | | vector<pair<TabletId, string>> rows); |
146 | | CHECKED_STATUS RetryableSubmit(vector<pair<TabletId, string>> rows); |
147 | | CHECKED_STATUS CompactFiles(); |
148 | | |
149 | | std::unique_ptr<YBClient> client_; |
150 | | shared_ptr<YBTable> table_; |
151 | | unique_ptr<YBPartitionGenerator> partition_generator_; |
152 | | std::unique_ptr<ThreadPool> thread_pool_; |
153 | | unique_ptr<BulkLoadDocDBUtil> db_fixture_; |
154 | | }; |
155 | | |
156 | | CompactionTask::CompactionTask(const vector<string>& sst_filenames, BulkLoadDocDBUtil* db_fixture) |
157 | | : sst_filenames_(sst_filenames), |
158 | 0 | db_fixture_(db_fixture) { |
159 | 0 | } |
160 | | |
161 | 0 | void CompactionTask::Run() { |
162 | 0 | if (sst_filenames_.size() == 1) { |
163 | 0 | LOG(INFO) << "Skipping compaction since we have only a single file: " << sst_filenames_[0]; |
164 | 0 | return; |
165 | 0 | } |
166 | | |
167 | 0 | LOG(INFO) << "Compacting files: " << ToString(sst_filenames_); |
168 | 0 | CHECK_OK(db_fixture_->rocksdb()->CompactFiles(rocksdb::CompactionOptions(), |
169 | 0 | sst_filenames_, |
170 | 0 | /* output_level */ 0)); |
171 | 0 | } |
172 | | |
173 | | BulkLoadTask::BulkLoadTask(vector<pair<TabletId, string>> rows, |
174 | | BulkLoadDocDBUtil *db_fixture, const YBTable *table, |
175 | | YBPartitionGenerator *partition_generator) |
176 | | : rows_(std::move(rows)), |
177 | | skipped_cols_(tools::SkippedColumns()), |
178 | | db_fixture_(db_fixture), |
179 | | table_(table), |
180 | 0 | partition_generator_(partition_generator) { |
181 | 0 | } |
182 | | |
183 | 0 | void BulkLoadTask::Run() { |
184 | 0 | DocWriteBatch doc_write_batch(docdb::DocDB::FromRegularUnbounded(db_fixture_->rocksdb()), |
185 | 0 | InitMarkerBehavior::kOptional); |
186 | |
|
187 | 0 | for (const auto &entry : rows_) { |
188 | 0 | const string &row = entry.second; |
189 | | |
190 | | // Populate the row. |
191 | 0 | CHECK_OK(InsertRow(row, table_->InternalSchema(), table_->index_map(), db_fixture_, |
192 | 0 | &doc_write_batch, partition_generator_)); |
193 | 0 | } |
194 | | |
195 | | // Flush the batch. |
196 | 0 | CHECK_OK(db_fixture_->WriteToRocksDB( |
197 | 0 | doc_write_batch, HybridTime::FromMicros(kYugaByteMicrosecondEpoch), |
198 | 0 | /* decode_dockey */ false, /* increment_write_id */ false)); |
199 | |
|
200 | 0 | if (FLAGS_flush_batch_for_tests) { |
201 | 0 | CHECK_OK(db_fixture_->FlushRocksDbAndWait()); |
202 | 0 | } |
203 | 0 | } |
204 | | |
205 | | Status BulkLoadTask::PopulateColumnValue(const string &column, |
206 | | const DataType data_type, |
207 | 0 | QLExpressionPB *column_value) { |
208 | 0 | auto ql_valuepb = column_value->mutable_value(); |
209 | 0 | switch (data_type) { |
210 | 0 | YB_SET_INT_VALUE(ql_valuepb, column, 8); |
211 | 0 | YB_SET_INT_VALUE(ql_valuepb, column, 16); |
212 | 0 | YB_SET_INT_VALUE(ql_valuepb, column, 32); |
213 | 0 | YB_SET_INT_VALUE(ql_valuepb, column, 64); |
214 | 0 | case DataType::FLOAT: { |
215 | 0 | auto value = CheckedStold(column); |
216 | 0 | RETURN_NOT_OK(value); |
217 | 0 | ql_valuepb->set_float_value(*value); |
218 | 0 | break; |
219 | 0 | } |
220 | 0 | case DataType::DOUBLE: { |
221 | 0 | auto value = CheckedStold(column); |
222 | 0 | RETURN_NOT_OK(value); |
223 | 0 | ql_valuepb->set_double_value(*value); |
224 | 0 | break; |
225 | 0 | } |
226 | 0 | case DataType::STRING: { |
227 | 0 | ql_valuepb->set_string_value(column); |
228 | 0 | break; |
229 | 0 | } |
230 | 0 | case DataType::JSONB: { |
231 | 0 | common::Jsonb jsonb; |
232 | 0 | RETURN_NOT_OK(jsonb.FromString(column)); |
233 | 0 | ql_valuepb->set_jsonb_value(jsonb.MoveSerializedJsonb()); |
234 | 0 | break; |
235 | 0 | } |
236 | 0 | case DataType::TIMESTAMP: { |
237 | 0 | auto ts = TimestampFromString(column); |
238 | 0 | RETURN_NOT_OK(ts); |
239 | 0 | ql_valuepb->set_timestamp_value(ts->ToInt64()); |
240 | 0 | break; |
241 | 0 | } |
242 | 0 | case DataType::BINARY: { |
243 | 0 | ql_valuepb->set_binary_value(column); |
244 | 0 | break; |
245 | 0 | } |
246 | 0 | default: |
247 | 0 | FATAL_INVALID_ENUM_VALUE(DataType, data_type); |
248 | 0 | } |
249 | 0 | return Status::OK(); |
250 | 0 | } |
251 | | |
252 | | Status BulkLoadTask::InsertRow(const string &row, |
253 | | const Schema &schema, |
254 | | const IndexMap& index_map, |
255 | | BulkLoadDocDBUtil *const db_fixture, |
256 | | docdb::DocWriteBatch *const doc_write_batch, |
257 | 0 | YBPartitionGenerator *const partition_generator) { |
258 | | // Get individual columns. |
259 | 0 | CsvTokenizer tokenizer = Tokenize(row); |
260 | 0 | size_t ncolumns = std::distance(tokenizer.begin(), tokenizer.end()); |
261 | 0 | if (ncolumns != schema.num_columns()) { |
262 | 0 | return STATUS_SUBSTITUTE(IllegalState, "row '$0' has $1 columns, need exactly $2", row, |
263 | 0 | ncolumns, schema.num_columns()); |
264 | 0 | } |
265 | | |
266 | 0 | QLResponsePB resp; |
267 | 0 | QLWriteRequestPB req; |
268 | 0 | req.set_type(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT); |
269 | 0 | req.set_client(YQL_CLIENT_CQL); |
270 | |
|
271 | 0 | int col_id = 0; |
272 | 0 | auto it = tokenizer.begin(); |
273 | | // Process the hash keys first. |
274 | 0 | for (size_t i = 0; i < schema.num_key_columns(); it++, col_id++) { |
275 | 0 | if (skipped_cols_.find(col_id) != skipped_cols_.end()) { |
276 | 0 | continue; |
277 | 0 | } |
278 | 0 | if (IsNull(*it)) { |
279 | 0 | return STATUS_SUBSTITUTE(IllegalState, "Primary key cannot be null: $0", *it); |
280 | 0 | } |
281 | | |
282 | 0 | QLExpressionPB *column_value = nullptr; |
283 | 0 | if (schema.is_hash_key_column(i)) { |
284 | 0 | column_value = req.add_hashed_column_values(); |
285 | 0 | } else { |
286 | 0 | column_value = req.add_range_column_values(); |
287 | 0 | } |
288 | |
|
289 | 0 | RETURN_NOT_OK(PopulateColumnValue(*it, schema.column(i).type_info()->type, column_value)); |
290 | 0 | i++; // Avoid this if we are skipping the column. |
291 | 0 | } |
292 | | |
293 | | // Finally process the regular columns. |
294 | 0 | for (auto i = schema.num_key_columns(); i < schema.num_columns(); it++, col_id++) { |
295 | 0 | if (skipped_cols_.find(col_id) != skipped_cols_.end()) { |
296 | 0 | continue; |
297 | 0 | } |
298 | 0 | QLColumnValuePB *column_value = req.add_column_values(); |
299 | 0 | column_value->set_column_id(narrow_cast<int32_t>(kFirstColumnId + i)); |
300 | 0 | if (IsNull(*it)) { |
301 | | // Use empty value for null. |
302 | 0 | column_value->mutable_expr()->mutable_value(); |
303 | 0 | } else { |
304 | 0 | RETURN_NOT_OK(PopulateColumnValue(*it, schema.column(i).type_info()->type, |
305 | 0 | column_value->mutable_expr())); |
306 | 0 | } |
307 | 0 | i++; // Avoid this if we are skipping the column. |
308 | 0 | } |
309 | | |
310 | | // Add the hash code to the operation. |
311 | 0 | string tablet_id; |
312 | 0 | string partition_key; |
313 | 0 | RETURN_NOT_OK(partition_generator->LookupTabletIdWithTokenizer( |
314 | 0 | tokenizer, skipped_cols_, &tablet_id, &partition_key)); |
315 | 0 | req.set_hash_code(PartitionSchema::DecodeMultiColumnHashValue(partition_key)); |
316 | | |
317 | | // Finally apply the operation to the doc_write_batch. |
318 | | // TODO(dtxn) pass correct TransactionContext. |
319 | | // Comment from PritamD: Don't need cross shard transaction support in bulk load, but I guess |
320 | | // once we have secondary indexes we probably might need to ensure bulk load builds the indexes |
321 | | // as well. |
322 | 0 | docdb::QLWriteOperation op( |
323 | 0 | req, std::shared_ptr<const Schema>(&schema, [](const Schema*){}), |
324 | 0 | index_map, nullptr /* unique_index_key_schema */, TransactionOperationContext()); |
325 | 0 | RETURN_NOT_OK(op.Init(&resp)); |
326 | 0 | RETURN_NOT_OK(op.Apply({ |
327 | 0 | doc_write_batch, |
328 | 0 | CoarseTimePoint::max() /* deadline */, |
329 | 0 | ReadHybridTime::SingleTime(HybridTime::FromMicros(kYugaByteMicrosecondEpoch))})); |
330 | 0 | return Status::OK(); |
331 | 0 | } |
332 | | |
333 | | |
334 | 0 | Status BulkLoad::RetryableSubmit(vector<pair<TabletId, string>> rows) { |
335 | 0 | auto runnable = std::make_shared<BulkLoadTask>( |
336 | 0 | std::move(rows), db_fixture_.get(), table_.get(), partition_generator_.get()); |
337 | |
|
338 | 0 | Status s; |
339 | 0 | do { |
340 | 0 | s = thread_pool_->Submit(runnable); |
341 | |
|
342 | 0 | if (!s.IsServiceUnavailable()) { |
343 | 0 | return s; |
344 | 0 | } |
345 | | |
346 | 0 | LOG (ERROR) << "Failed submitting task, sleeping for a while: " << s.ToString(); |
347 | | |
348 | | // If service is unavailable, the queue might be full. Sleep and try again. |
349 | 0 | SleepFor(MonoDelta::FromSeconds(10)); |
350 | 0 | } while (!s.ok()); |
351 | | |
352 | 0 | return Status::OK(); |
353 | 0 | } |
354 | | |
355 | 0 | Status BulkLoad::CompactFiles() { |
356 | 0 | std::vector<rocksdb::LiveFileMetaData> live_files_metadata; |
357 | 0 | db_fixture_->rocksdb()->GetLiveFilesMetaData(&live_files_metadata); |
358 | 0 | if (live_files_metadata.empty()) { |
359 | 0 | return STATUS(IllegalState, "Need atleast one sst file"); |
360 | 0 | } |
361 | | |
362 | | // Extract file names. |
363 | 0 | vector<string> sst_files; |
364 | 0 | sst_files.reserve(live_files_metadata.size()); |
365 | 0 | for (const rocksdb::LiveFileMetaData& file : live_files_metadata) { |
366 | 0 | sst_files.push_back(file.name); |
367 | 0 | } |
368 | | |
369 | | // Batch the files for compaction. |
370 | 0 | size_t batch_size = sst_files.size() / FLAGS_bulk_load_num_files_per_tablet; |
371 | | // We need to perform compactions only if we have more than 'bulk_load_num_files_per_tablet' |
372 | | // files. |
373 | 0 | if (batch_size != 0) { |
374 | 0 | auto start_iter = sst_files.begin(); |
375 | 0 | for (size_t i = 0; i < FLAGS_bulk_load_num_files_per_tablet; i++) { |
376 | | // Sanity check. |
377 | 0 | CHECK_GE(std::distance(start_iter, sst_files.end()), batch_size); |
378 | | |
379 | | // Include the remaining files for the last batch. |
380 | 0 | auto end_iter = (i == FLAGS_bulk_load_num_files_per_tablet - 1) ? sst_files.end() |
381 | 0 | : start_iter + batch_size; |
382 | 0 | auto runnable = std::make_shared<CompactionTask>(vector<string>(start_iter, end_iter), |
383 | 0 | db_fixture_.get()); |
384 | 0 | RETURN_NOT_OK(thread_pool_->Submit(runnable)); |
385 | 0 | start_iter = end_iter; |
386 | 0 | } |
387 | | |
388 | | // Finally wait for all compactions to finish. |
389 | 0 | thread_pool_->Wait(); |
390 | | |
391 | | // Reopen rocksdb to clean up deleted files. |
392 | 0 | return db_fixture_->ReopenRocksDB(); |
393 | 0 | } |
394 | 0 | return Status::OK(); |
395 | 0 | } |
396 | | |
397 | | Status BulkLoad::FinishTabletProcessing(const TabletId &tablet_id, |
398 | 0 | vector<pair<TabletId, string>> rows) { |
399 | 0 | if (!db_fixture_) { |
400 | | // Skip processing since db_fixture wasn't initialized indicating empty input. |
401 | 0 | return Status::OK(); |
402 | 0 | } |
403 | | |
404 | | // Submit all the work. |
405 | 0 | RETURN_NOT_OK(RetryableSubmit(std::move(rows))); |
406 | | |
407 | | // Wait for all tasks for the tablet to complete. |
408 | 0 | thread_pool_->Wait(); |
409 | | |
410 | | // Now flush the DB. |
411 | 0 | RETURN_NOT_OK(db_fixture_->FlushRocksDbAndWait()); |
412 | | |
413 | | // Perform the necessary compactions. |
414 | 0 | RETURN_NOT_OK(CompactFiles()); |
415 | | |
416 | 0 | if (!FLAGS_export_files) { |
417 | 0 | return Status::OK(); |
418 | 0 | } |
419 | | |
420 | | // Find replicas for the tablet. |
421 | 0 | master::TabletLocationsPB tablet_locations; |
422 | 0 | RETURN_NOT_OK(client_->GetTabletLocation(tablet_id, &tablet_locations)); |
423 | 0 | string csv_replicas; |
424 | 0 | std::map<string, int32_t> host_to_rpcport; |
425 | 0 | for (const master::TabletLocationsPB_ReplicaPB &replica : tablet_locations.replicas()) { |
426 | 0 | if (!csv_replicas.empty()) { |
427 | 0 | csv_replicas += ","; |
428 | 0 | } |
429 | 0 | const string &host = replica.ts_info().private_rpc_addresses(0).host(); |
430 | 0 | csv_replicas += host; |
431 | 0 | host_to_rpcport[host] = replica.ts_info().private_rpc_addresses(0).port(); |
432 | 0 | } |
433 | | |
434 | | // Invoke the bulk_load_helper script. |
435 | 0 | vector<string> argv = {FLAGS_bulk_load_helper_script, "-t", tablet_id, "-r", csv_replicas, "-i", |
436 | 0 | FLAGS_ssh_key_file, "-d", db_fixture_->rocksdb_dir()}; |
437 | 0 | string bulk_load_helper_stdout; |
438 | 0 | RETURN_NOT_OK(Subprocess::Call(argv, &bulk_load_helper_stdout)); |
439 | | |
440 | | // Trim the output. |
441 | 0 | boost::trim(bulk_load_helper_stdout); |
442 | 0 | LOG(INFO) << "Helper script stdout: " << bulk_load_helper_stdout; |
443 | | |
444 | | // Finalize the import. |
445 | 0 | rpc::MessengerBuilder bld("Client"); |
446 | 0 | std::unique_ptr<rpc::Messenger> client_messenger = VERIFY_RESULT(bld.Build()); |
447 | 0 | rpc::ProxyCache proxy_cache(client_messenger.get()); |
448 | 0 | vector<string> lines; |
449 | 0 | boost::split(lines, bulk_load_helper_stdout, boost::is_any_of("\n")); |
450 | 0 | for (const string &line : lines) { |
451 | 0 | vector<string> tokens; |
452 | 0 | boost::split(tokens, line, boost::is_any_of(",")); |
453 | 0 | if (tokens.size() != 2) { |
454 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Invalid line $0", line); |
455 | 0 | } |
456 | 0 | const string &replica_host = tokens[0]; |
457 | 0 | const string &directory = tokens[1]; |
458 | 0 | HostPort hostport(replica_host, host_to_rpcport[replica_host]); |
459 | |
|
460 | 0 | tserver::TabletServerServiceProxy proxy(&proxy_cache, hostport); |
461 | 0 | tserver::ImportDataRequestPB req; |
462 | 0 | req.set_tablet_id(tablet_id); |
463 | 0 | req.set_source_dir(directory); |
464 | |
|
465 | 0 | tserver::ImportDataResponsePB resp; |
466 | 0 | rpc::RpcController controller; |
467 | 0 | LOG(INFO) << "Importing " << directory << " on " << replica_host << " for tablet_id: " |
468 | 0 | << tablet_id; |
469 | 0 | RETURN_NOT_OK(proxy.ImportData(req, &resp, &controller)); |
470 | 0 | if (resp.has_error()) { |
471 | 0 | RETURN_NOT_OK(StatusFromPB(resp.error().status())); |
472 | 0 | } |
473 | | |
474 | | // Now cleanup the files from the production tserver. |
475 | 0 | vector<string> cleanup_script = {FLAGS_bulk_load_cleanup_script, "-d", directory, "-t", |
476 | 0 | replica_host, "-i", FLAGS_ssh_key_file}; |
477 | 0 | RETURN_NOT_OK(Subprocess::Call(cleanup_script)); |
478 | 0 | } |
479 | | |
480 | | // Delete the data once the import is done. |
481 | 0 | return yb::Env::Default()->DeleteRecursively(db_fixture_->rocksdb_dir()); |
482 | 0 | } |
483 | | |
484 | | |
485 | 0 | CHECKED_STATUS BulkLoad::InitDBUtil(const TabletId &tablet_id) { |
486 | 0 | db_fixture_.reset(new BulkLoadDocDBUtil(tablet_id, FLAGS_base_dir, |
487 | 0 | FLAGS_memtable_size_bytes, |
488 | 0 | FLAGS_bulk_load_num_memtables, |
489 | 0 | FLAGS_bulk_load_max_background_flushes)); |
490 | 0 | RETURN_NOT_OK(db_fixture_->InitRocksDBOptions()); |
491 | 0 | RETURN_NOT_OK(db_fixture_->DisableCompactions()); // This opens rocksdb. |
492 | 0 | return Status::OK(); |
493 | 0 | } |
494 | | |
495 | 0 | Status BulkLoad::InitYBBulkLoad() { |
496 | | // Convert table_name to lowercase since we store table names in lowercase. |
497 | 0 | string table_name_lower = boost::to_lower_copy(FLAGS_table_name); |
498 | 0 | YBTableName table_name( |
499 | 0 | master::GetDefaultDatabaseType(FLAGS_namespace_name), FLAGS_namespace_name, table_name_lower); |
500 | |
|
501 | 0 | YBClientBuilder builder; |
502 | 0 | builder.add_master_server_addr(FLAGS_master_addresses); |
503 | |
|
504 | 0 | client_ = VERIFY_RESULT(builder.Build()); |
505 | 0 | RETURN_NOT_OK(client_->OpenTable(table_name, &table_)); |
506 | 0 | partition_generator_.reset(new YBPartitionGenerator(table_name, {FLAGS_master_addresses})); |
507 | 0 | RETURN_NOT_OK(partition_generator_->Init()); |
508 | | |
509 | 0 | db_fixture_ = nullptr; |
510 | 0 | CHECK_OK( |
511 | 0 | ThreadPoolBuilder("bulk_load_tasks") |
512 | 0 | .set_min_threads(FLAGS_bulk_load_num_threads) |
513 | 0 | .set_max_threads(FLAGS_bulk_load_num_threads) |
514 | 0 | .set_max_queue_size(FLAGS_bulk_load_threadpool_queue_size) |
515 | 0 | .set_idle_timeout(MonoDelta::FromMilliseconds(5000)) |
516 | 0 | .Build(&thread_pool_)); |
517 | 0 | return Status::OK(); |
518 | 0 | } |
519 | | |
520 | | |
521 | 0 | Status BulkLoad::RunBulkLoad() { |
522 | |
|
523 | 0 | RETURN_NOT_OK(InitYBBulkLoad()); |
524 | | |
525 | 0 | TabletId current_tablet_id; |
526 | |
|
527 | 0 | vector<pair<TabletId, string>> rows; |
528 | 0 | for (string line; std::getline(std::cin, line);) { |
529 | | // Trim the line. |
530 | 0 | boost::algorithm::trim(line); |
531 | | |
532 | | // Get the key and value. |
533 | 0 | std::size_t index = line.find("\t"); |
534 | 0 | if (index == std::string::npos) { |
535 | 0 | return STATUS_SUBSTITUTE(IllegalState, "Invalid line: $0", line); |
536 | 0 | } |
537 | 0 | const TabletId tablet_id = line.substr(0, index); |
538 | 0 | const string row = line.substr(index + 1, line.size() - (index + 1)); |
539 | | |
540 | | // Reinitialize rocksdb if needed. |
541 | 0 | if (current_tablet_id.empty() || current_tablet_id != tablet_id) { |
542 | | // Flush all of the data before opening a new rocksdb. |
543 | 0 | RETURN_NOT_OK(FinishTabletProcessing(current_tablet_id, std::move(rows))); |
544 | 0 | RETURN_NOT_OK(InitDBUtil(tablet_id)); |
545 | 0 | } |
546 | 0 | current_tablet_id = tablet_id; |
547 | 0 | rows.emplace_back(std::move(tablet_id), std::move(row)); |
548 | | |
549 | | // Flush the batch if necessary. |
550 | 0 | if (rows.size() >= FLAGS_row_batch_size) { |
551 | 0 | RETURN_NOT_OK(RetryableSubmit(std::move(rows))); |
552 | 0 | } |
553 | 0 | } |
554 | | |
555 | | // Process last tablet. |
556 | 0 | RETURN_NOT_OK(FinishTabletProcessing(current_tablet_id, std::move(rows))); |
557 | 0 | return Status::OK(); |
558 | 0 | } |
559 | | |
560 | | } // anonymous namespace |
561 | | |
562 | | } // namespace tools |
563 | | } // namespace yb |
564 | | |
565 | | int main(int argc, char** argv) { |
566 | | yb::ParseCommandLineFlags(&argc, &argv, true); |
567 | | yb::InitGoogleLoggingSafe(argv[0]); |
568 | | if (FLAGS_master_addresses.empty() || FLAGS_table_name.empty() || FLAGS_namespace_name.empty() |
569 | | || FLAGS_base_dir.empty()) { |
570 | | LOG(FATAL) << "Need to specify --master_addresses, --table_name, --namespace_name, " |
571 | | "--base_dir"; |
572 | | } |
573 | | |
574 | | if (FLAGS_export_files && FLAGS_ssh_key_file.empty()) { |
575 | | LOG(FATAL) << "Need to specify --ssh_key_file with --export_files"; |
576 | | } |
577 | | |
578 | | // Verify the bulk load path exists. |
579 | | if (!yb::Env::Default()->FileExists(FLAGS_base_dir)) { |
580 | | LOG(FATAL) << "Bulk load directory doesn't exist: " << FLAGS_base_dir; |
581 | | } |
582 | | |
583 | | if (FLAGS_bulk_load_num_files_per_tablet <= 0) { |
584 | | LOG(FATAL) << "--bulk_load_num_files_per_tablet needs to be greater than 0"; |
585 | | } |
586 | | |
587 | | yb::tools::BulkLoad bulk_load; |
588 | | yb::Status s = bulk_load.RunBulkLoad(); |
589 | | if (!s.ok()) { |
590 | | LOG(FATAL) << "Error running bulk load: " << s.ToString(); |
591 | | } |
592 | | return 0; |
593 | | } |