/Users/deen/code/yugabyte-db/src/yb/tablet/tablet.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tablet/tablet.h" |
34 | | |
35 | | #include <boost/container/static_vector.hpp> |
36 | | |
37 | | #include "yb/client/client.h" |
38 | | #include "yb/client/error.h" |
39 | | #include "yb/client/meta_data_cache.h" |
40 | | #include "yb/client/session.h" |
41 | | #include "yb/client/table.h" |
42 | | #include "yb/client/transaction.h" |
43 | | #include "yb/client/transaction_manager.h" |
44 | | #include "yb/client/yb_op.h" |
45 | | |
46 | | #include "yb/common/index_column.h" |
47 | | #include "yb/common/pgsql_error.h" |
48 | | #include "yb/common/ql_rowblock.h" |
49 | | #include "yb/common/row_mark.h" |
50 | | #include "yb/common/schema.h" |
51 | | #include "yb/common/transaction_error.h" |
52 | | #include "yb/common/wire_protocol.h" |
53 | | |
54 | | #include "yb/consensus/consensus.pb.h" |
55 | | #include "yb/consensus/log_anchor_registry.h" |
56 | | #include "yb/consensus/opid_util.h" |
57 | | |
58 | | #include "yb/docdb/compaction_file_filter.h" |
59 | | #include "yb/docdb/conflict_resolution.h" |
60 | | #include "yb/docdb/consensus_frontier.h" |
61 | | #include "yb/docdb/cql_operation.h" |
62 | | #include "yb/docdb/doc_rowwise_iterator.h" |
63 | | #include "yb/docdb/doc_write_batch.h" |
64 | | #include "yb/docdb/docdb.h" |
65 | | #include "yb/docdb/docdb_compaction_filter.h" |
66 | | #include "yb/docdb/docdb_compaction_filter_intents.h" |
67 | | #include "yb/docdb/docdb_debug.h" |
68 | | #include "yb/docdb/docdb_rocksdb_util.h" |
69 | | #include "yb/docdb/pgsql_operation.h" |
70 | | #include "yb/docdb/ql_rocksdb_storage.h" |
71 | | #include "yb/docdb/redis_operation.h" |
72 | | #include "yb/docdb/rocksdb_writer.h" |
73 | | |
74 | | #include "yb/gutil/casts.h" |
75 | | |
76 | | #include "yb/rocksdb/db/memtable.h" |
77 | | #include "yb/rocksdb/utilities/checkpoint.h" |
78 | | |
79 | | #include "yb/rocksutil/yb_rocksdb.h" |
80 | | |
81 | | #include "yb/server/hybrid_clock.h" |
82 | | |
83 | | #include "yb/tablet/operations/change_metadata_operation.h" |
84 | | #include "yb/tablet/operations/operation.h" |
85 | | #include "yb/tablet/operations/snapshot_operation.h" |
86 | | #include "yb/tablet/operations/split_operation.h" |
87 | | #include "yb/tablet/operations/truncate_operation.h" |
88 | | #include "yb/tablet/operations/write_operation.h" |
89 | | #include "yb/tablet/read_result.h" |
90 | | #include "yb/tablet/snapshot_coordinator.h" |
91 | | #include "yb/tablet/tablet_bootstrap_if.h" |
92 | | #include "yb/tablet/tablet_metadata.h" |
93 | | #include "yb/tablet/tablet_metrics.h" |
94 | | #include "yb/tablet/tablet_retention_policy.h" |
95 | | #include "yb/tablet/tablet_snapshots.h" |
96 | | #include "yb/tablet/transaction_coordinator.h" |
97 | | #include "yb/tablet/transaction_participant.h" |
98 | | #include "yb/tablet/write_query.h" |
99 | | |
100 | | #include "yb/tserver/tserver.pb.h" |
101 | | |
102 | | #include "yb/util/debug-util.h" |
103 | | #include "yb/util/debug/trace_event.h" |
104 | | #include "yb/util/flag_tags.h" |
105 | | #include "yb/util/format.h" |
106 | | #include "yb/util/logging.h" |
107 | | #include "yb/util/mem_tracker.h" |
108 | | #include "yb/util/metrics.h" |
109 | | #include "yb/util/net/net_util.h" |
110 | | #include "yb/util/pg_util.h" |
111 | | #include "yb/util/scope_exit.h" |
112 | | #include "yb/util/status_format.h" |
113 | | #include "yb/util/status_log.h" |
114 | | #include "yb/util/stopwatch.h" |
115 | | #include "yb/util/trace.h" |
116 | | #include "yb/util/yb_pg_errcodes.h" |
117 | | |
118 | | #include "yb/yql/pgwrapper/libpq_utils.h" |
119 | | |
120 | | DEFINE_bool(tablet_do_dup_key_checks, true, |
121 | | "Whether to check primary keys for duplicate on insertion. " |
122 | | "Use at your own risk!"); |
123 | | TAG_FLAG(tablet_do_dup_key_checks, unsafe); |
124 | | |
125 | | DEFINE_bool(tablet_do_compaction_cleanup_for_intents, true, |
126 | | "Whether to clean up intents for aborted transactions in compaction."); |
127 | | |
128 | | DEFINE_int32(tablet_bloom_block_size, 4096, |
129 | | "Block size of the bloom filters used for tablet keys."); |
130 | | TAG_FLAG(tablet_bloom_block_size, advanced); |
131 | | |
132 | | DEFINE_double(tablet_bloom_target_fp_rate, 0.01f, |
133 | | "Target false-positive rate (between 0 and 1) to size tablet key bloom filters. " |
134 | | "A lower false positive rate may reduce the number of disk seeks required " |
135 | | "in heavy insert workloads, at the expense of more space and RAM " |
136 | | "required for bloom filters."); |
137 | | TAG_FLAG(tablet_bloom_target_fp_rate, advanced); |
138 | | |
139 | | METRIC_DEFINE_entity(table); |
140 | | METRIC_DEFINE_entity(tablet); |
141 | | |
142 | | // TODO: use a lower default for truncate / snapshot restore Raft operations. The one-minute timeout |
143 | | // is probably OK for shutdown. |
144 | | DEFINE_int32(tablet_rocksdb_ops_quiet_down_timeout_ms, 60000, |
145 | | "Max amount of time we can wait for read/write operations on RocksDB to finish " |
146 | | "so that we can perform exclusive-ownership operations on RocksDB, such as removing " |
147 | | "all data in the tablet by replacing the RocksDB instance with an empty one."); |
148 | | |
149 | | DEFINE_int32(intents_flush_max_delay_ms, 2000, |
150 | | "Max time to wait for regular db to flush during flush of intents. " |
151 | | "After this time flush of regular db will be forced."); |
152 | | |
153 | | DEFINE_int32(num_raft_ops_to_force_idle_intents_db_to_flush, 1000, |
154 | | "When writes to intents RocksDB are stopped and the number of Raft operations after " |
155 | | "the last write to the intents RocksDB " |
156 | | "is greater than this value, the intents RocksDB would be requested to flush."); |
157 | | |
158 | | DEFINE_bool(delete_intents_sst_files, true, |
159 | | "Delete whole intents .SST files when possible."); |
160 | | |
161 | | DEFINE_uint64(backfill_index_write_batch_size, 128, "The batch size for backfilling the index."); |
162 | | TAG_FLAG(backfill_index_write_batch_size, advanced); |
163 | | TAG_FLAG(backfill_index_write_batch_size, runtime); |
164 | | |
165 | | DEFINE_int32(backfill_index_rate_rows_per_sec, 0, "Rate of at which the " |
166 | | "indexed table's entries are populated into the index table during index " |
167 | | "backfill. This is a per-tablet flag, i.e. a tserver responsible for " |
168 | | "multiple tablets could be processing more than this."); |
169 | | TAG_FLAG(backfill_index_rate_rows_per_sec, advanced); |
170 | | TAG_FLAG(backfill_index_rate_rows_per_sec, runtime); |
171 | | |
172 | | DEFINE_uint64(verify_index_read_batch_size, 128, "The batch size for reading the index."); |
173 | | TAG_FLAG(verify_index_read_batch_size, advanced); |
174 | | TAG_FLAG(verify_index_read_batch_size, runtime); |
175 | | |
176 | | DEFINE_int32(verify_index_rate_rows_per_sec, 0, |
177 | | "Rate of at which the indexed table's entries are read during index consistency checks." |
178 | | "This is a per-tablet flag, i.e. a tserver responsible for multiple tablets could be " |
179 | | "processing more than this."); |
180 | | TAG_FLAG(verify_index_rate_rows_per_sec, advanced); |
181 | | TAG_FLAG(verify_index_rate_rows_per_sec, runtime); |
182 | | |
183 | | DEFINE_int32(backfill_index_timeout_grace_margin_ms, -1, |
184 | | "The time we give the backfill process to wrap up the current set " |
185 | | "of writes and return successfully the RPC with the information about " |
186 | | "how far we have processed the rows."); |
187 | | TAG_FLAG(backfill_index_timeout_grace_margin_ms, advanced); |
188 | | TAG_FLAG(backfill_index_timeout_grace_margin_ms, runtime); |
189 | | |
190 | | DEFINE_bool(yql_allow_compatible_schema_versions, true, |
191 | | "Allow YCQL requests to be accepted even if they originate from a client who is ahead " |
192 | | "of the server's schema, but is determined to be compatible with the current version."); |
193 | | TAG_FLAG(yql_allow_compatible_schema_versions, advanced); |
194 | | TAG_FLAG(yql_allow_compatible_schema_versions, runtime); |
195 | | |
196 | | DEFINE_bool(disable_alter_vs_write_mutual_exclusion, false, |
197 | | "A safety switch to disable the changes from D8710 which makes a schema " |
198 | | "operation take an exclusive lock making all write operations wait for it."); |
199 | | TAG_FLAG(disable_alter_vs_write_mutual_exclusion, advanced); |
200 | | TAG_FLAG(disable_alter_vs_write_mutual_exclusion, runtime); |
201 | | |
202 | | DEFINE_bool(cleanup_intents_sst_files, true, |
203 | | "Cleanup intents files that are no more relevant to any running transaction."); |
204 | | |
205 | | DEFINE_int32(ysql_transaction_abort_timeout_ms, 15 * 60 * 1000, // 15 minutes |
206 | | "Max amount of time we can wait for active transactions to abort on a tablet " |
207 | | "after DDL (eg. DROP TABLE) is executed. This deadline is same as " |
208 | | "unresponsive_ts_rpc_timeout_ms"); |
209 | | |
210 | | DEFINE_test_flag(int32, backfill_sabotage_frequency, 0, |
211 | | "If set to value greater than 0, every nth row will be corrupted in the backfill process " |
212 | | "to create an inconsistency between the index and the indexed tables where n is the " |
213 | | "input parameter given."); |
214 | | |
215 | | DEFINE_test_flag(int32, backfill_drop_frequency, 0, |
216 | | "If set to value greater than 0, every nth row will be dropped in the backfill process " |
217 | | "to create an inconsistency between the index and the indexed tables where n is the " |
218 | | "input parameter given."); |
219 | | |
220 | | DEFINE_bool(tablet_enable_ttl_file_filter, false, |
221 | | "Enables compaction to directly delete files that have expired based on TTL, " |
222 | | "rather than removing them via the normal compaction process."); |
223 | | |
224 | | DEFINE_test_flag(int32, slowdown_backfill_by_ms, 0, |
225 | | "If set > 0, slows down the backfill process by this amount."); |
226 | | |
227 | | DEFINE_test_flag(uint64, backfill_paging_size, 0, |
228 | | "If set > 0, returns early after processing this number of rows."); |
229 | | |
230 | | DEFINE_test_flag(bool, tablet_verify_flushed_frontier_after_modifying, false, |
231 | | "After modifying the flushed frontier in RocksDB, verify that the restored value " |
232 | | "of it is as expected. Used for testing."); |
233 | | |
234 | | DEFINE_test_flag(bool, docdb_log_write_batches, false, |
235 | | "Dump write batches being written to RocksDB"); |
236 | | |
237 | | DEFINE_test_flag(bool, export_intentdb_metrics, false, |
238 | | "Dump intentsdb statistics to prometheus metrics"); |
239 | | |
240 | | DEFINE_test_flag(bool, pause_before_post_split_compaction, false, |
241 | | "Pause before triggering post split compaction."); |
242 | | |
243 | | DEFINE_test_flag(bool, disable_adding_user_frontier_to_sst, false, |
244 | | "Prevents adding the UserFrontier to SST file in order to mimic older files."); |
245 | | |
246 | | // FLAGS_TEST_disable_getting_user_frontier_from_mem_table is used in conjunction with |
247 | | // FLAGS_TEST_disable_adding_user_frontier_to_sst. Two flags are needed for the case in which |
248 | | // we're writing a mixture of SST files with and without UserFrontiers, to ensure that we're |
249 | | // not attempting to read the UserFrontier from the MemTable in either case. |
250 | | DEFINE_test_flag(bool, disable_getting_user_frontier_from_mem_table, false, |
251 | | "Prevents checking the MemTable for a UserFrontier for test cases where we are " |
252 | | "generating SST files without UserFrontiers."); |
253 | | |
254 | | DECLARE_int32(client_read_write_timeout_ms); |
255 | | DECLARE_bool(consistent_restore); |
256 | | DECLARE_int32(rocksdb_level0_slowdown_writes_trigger); |
257 | | DECLARE_int32(rocksdb_level0_stop_writes_trigger); |
258 | | DECLARE_uint64(rocksdb_max_file_size_for_compaction); |
259 | | DECLARE_int64(apply_intents_task_injected_delay_ms); |
260 | | DECLARE_string(regular_tablets_data_block_key_value_encoding); |
261 | | |
262 | | DEFINE_test_flag(uint64, inject_sleep_before_applying_intents_ms, 0, |
263 | | "Sleep before applying intents to docdb after transaction commit"); |
264 | | |
265 | | using namespace std::placeholders; |
266 | | |
267 | | using std::shared_ptr; |
268 | | using std::make_shared; |
269 | | using std::string; |
270 | | using std::unordered_set; |
271 | | using std::vector; |
272 | | using std::unique_ptr; |
273 | | using namespace std::literals; // NOLINT |
274 | | |
275 | | using rocksdb::WriteBatch; |
276 | | using rocksdb::SequenceNumber; |
277 | | using yb::tserver::WriteRequestPB; |
278 | | using yb::tserver::WriteResponsePB; |
279 | | using yb::docdb::KeyValueWriteBatchPB; |
280 | | using yb::tserver::ReadRequestPB; |
281 | | using yb::docdb::DocOperation; |
282 | | using yb::docdb::RedisWriteOperation; |
283 | | using yb::docdb::QLWriteOperation; |
284 | | using yb::docdb::PgsqlWriteOperation; |
285 | | using yb::docdb::DocDBCompactionFilterFactory; |
286 | | using yb::docdb::InitMarkerBehavior; |
287 | | |
288 | | namespace yb { |
289 | | namespace tablet { |
290 | | |
291 | | using consensus::MaximumOpId; |
292 | | using log::LogAnchorRegistry; |
293 | | using strings::Substitute; |
294 | | using base::subtle::Barrier_AtomicIncrement; |
295 | | |
296 | | using client::ChildTransactionData; |
297 | | using client::TransactionManager; |
298 | | using client::YBSession; |
299 | | using client::YBTransaction; |
300 | | using client::YBTablePtr; |
301 | | |
302 | | using docdb::DocKey; |
303 | | using docdb::DocPath; |
304 | | using docdb::DocRowwiseIterator; |
305 | | using docdb::DocWriteBatch; |
306 | | using docdb::SubDocKey; |
307 | | using docdb::PrimitiveValue; |
308 | | using docdb::StorageDbType; |
309 | | |
310 | | //////////////////////////////////////////////////////////// |
311 | | // Tablet |
312 | | //////////////////////////////////////////////////////////// |
313 | | |
314 | | namespace { |
315 | | |
316 | | std::string MakeTabletLogPrefix( |
317 | 2.25M | const TabletId& tablet_id, const std::string& log_prefix_suffix) { |
318 | 2.25M | return Format("T $0$1: ", tablet_id, log_prefix_suffix); |
319 | 2.25M | } |
320 | | |
321 | | docdb::ConsensusFrontiers* InitFrontiers( |
322 | | const OpId op_id, |
323 | | const HybridTime log_ht, |
324 | 6.46M | docdb::ConsensusFrontiers* frontiers) { |
325 | 6.46M | if (FLAGS_TEST_disable_adding_user_frontier_to_sst) { |
326 | 0 | return nullptr; |
327 | 0 | } |
328 | 6.46M | set_op_id(op_id, frontiers); |
329 | 6.46M | set_hybrid_time(log_ht, frontiers); |
330 | 6.46M | return frontiers; |
331 | 6.46M | } |
332 | | |
333 | | template <class Data> |
334 | 1.70M | docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) { |
335 | 1.70M | return InitFrontiers(data.op_id, data.log_ht, frontiers); |
336 | 1.70M | } tablet.cc:_ZN2yb6tablet12_GLOBAL__N_113InitFrontiersINS0_20TransactionApplyDataEEEPN7rocksdb17UserFrontiersBaseINS_5docdb17ConsensusFrontierEEERKT_S9_ Line | Count | Source | 334 | 737k | docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) { | 335 | 737k | return InitFrontiers(data.op_id, data.log_ht, frontiers); | 336 | 737k | } |
tablet.cc:_ZN2yb6tablet12_GLOBAL__N_113InitFrontiersINS0_17RemoveIntentsDataEEEPN7rocksdb17UserFrontiersBaseINS_5docdb17ConsensusFrontierEEERKT_S9_ Line | Count | Source | 334 | 968k | docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) { | 335 | 968k | return InitFrontiers(data.op_id, data.log_ht, frontiers); | 336 | 968k | } |
|
337 | | |
338 | | rocksdb::UserFrontierPtr MemTableFrontierFromDb( |
339 | | rocksdb::DB* db, |
340 | 1.72M | rocksdb::UpdateUserValueType type) { |
341 | 1.72M | if (FLAGS_TEST_disable_getting_user_frontier_from_mem_table) { |
342 | 0 | return nullptr; |
343 | 0 | } |
344 | 1.72M | return db->GetMutableMemTableFrontier(type); |
345 | 1.72M | } |
346 | | |
347 | | } // namespace |
348 | | |
349 | | class Tablet::RegularRocksDbListener : public rocksdb::EventListener { |
350 | | public: |
351 | | RegularRocksDbListener(Tablet* tablet, const std::string& log_prefix) |
352 | | : tablet_(*CHECK_NOTNULL(tablet)), |
353 | 220k | log_prefix_(log_prefix) {} |
354 | | |
355 | 433 | void OnCompactionCompleted(rocksdb::DB* db, const rocksdb::CompactionJobInfo& ci) override { |
356 | 433 | if (ci.is_full_compaction) { |
357 | 230 | auto& metadata = *CHECK_NOTNULL(tablet_.metadata()); |
358 | 230 | if (!metadata.has_been_fully_compacted()) { |
359 | 104 | metadata.set_has_been_fully_compacted(true); |
360 | 104 | ERROR_NOT_OK(metadata.Flush(), log_prefix_); |
361 | 104 | } |
362 | 230 | } |
363 | 433 | } |
364 | | |
365 | | private: |
366 | | Tablet& tablet_; |
367 | | const std::string log_prefix_; |
368 | | }; |
369 | | |
370 | | Tablet::Tablet(const TabletInitData& data) |
371 | | : key_schema_(std::make_unique<Schema>(data.metadata->schema()->CreateKeyProjection())), |
372 | | metadata_(data.metadata), |
373 | | table_type_(data.metadata->table_type()), |
374 | | log_anchor_registry_(data.log_anchor_registry), |
375 | | mem_tracker_(MemTracker::CreateTracker( |
376 | | Format("tablet-$0", tablet_id()), data.parent_mem_tracker, AddToParent::kTrue, |
377 | | CreateMetrics::kFalse)), |
378 | | block_based_table_mem_tracker_(data.block_based_table_mem_tracker), |
379 | | clock_(data.clock), |
380 | | mvcc_( |
381 | | MakeTabletLogPrefix(data.metadata->raft_group_id(), data.log_prefix_suffix), data.clock), |
382 | | tablet_options_(data.tablet_options), |
383 | | pending_non_abortable_op_counter_("RocksDB non-abortable read/write operations"), |
384 | | pending_abortable_op_counter_("RocksDB abortable read/write operations"), |
385 | | write_ops_being_submitted_counter_("Tablet schema"), |
386 | | client_future_(data.client_future), |
387 | | local_tablet_filter_(data.local_tablet_filter), |
388 | | log_prefix_suffix_(data.log_prefix_suffix), |
389 | | is_sys_catalog_(data.is_sys_catalog), |
390 | | txns_enabled_(data.txns_enabled), |
391 | | retention_policy_(std::make_shared<TabletRetentionPolicy>( |
392 | 89.2k | clock_, data.allowed_history_cutoff_provider, metadata_.get())) { |
393 | 89.2k | CHECK(schema()->has_column_ids()); |
394 | 89.2k | LOG_WITH_PREFIX(INFO) << "Schema version for " << metadata_->table_name() << " is " |
395 | 89.2k | << metadata_->schema_version(); |
396 | | |
397 | 89.2k | if (data.metric_registry) { |
398 | 88.8k | MetricEntity::AttributeMap attrs; |
399 | | // TODO(KUDU-745): table_id is apparently not set in the metadata. |
400 | 88.8k | attrs["table_id"] = metadata_->table_id(); |
401 | 88.8k | attrs["table_name"] = metadata_->table_name(); |
402 | 88.8k | attrs["namespace_name"] = metadata_->namespace_name(); |
403 | 88.8k | table_metrics_entity_ = |
404 | 88.8k | METRIC_ENTITY_table.Instantiate(data.metric_registry, metadata_->table_id(), attrs); |
405 | 88.8k | tablet_metrics_entity_ = |
406 | 88.8k | METRIC_ENTITY_tablet.Instantiate(data.metric_registry, tablet_id(), attrs); |
407 | | // If we are creating a KV table create the metrics callback. |
408 | 88.8k | regulardb_statistics_ = |
409 | 88.8k | rocksdb::CreateDBStatistics(table_metrics_entity_, tablet_metrics_entity_); |
410 | 88.8k | intentsdb_statistics_ = |
411 | 88.8k | (GetAtomicFlag(&FLAGS_TEST_export_intentdb_metrics) |
412 | 199 | ? rocksdb::CreateDBStatistics(table_metrics_entity_, tablet_metrics_entity_, true) |
413 | 88.6k | : rocksdb::CreateDBStatistics(table_metrics_entity_, nullptr, true)); |
414 | | |
415 | 88.8k | metrics_.reset(new TabletMetrics(table_metrics_entity_, tablet_metrics_entity_)); |
416 | | |
417 | 88.8k | mem_tracker_->SetMetricEntity(tablet_metrics_entity_); |
418 | 88.8k | } |
419 | | |
420 | 89.2k | auto table_info = metadata_->primary_table_info(); |
421 | 89.2k | bool has_index = !table_info->index_map->empty(); |
422 | 89.2k | bool transactional = data.metadata->schema()->table_properties().is_transactional(); |
423 | 89.2k | if (transactional) { |
424 | 20.4k | server::HybridClock::EnableClockSkewControl(); |
425 | 20.4k | } |
426 | 89.2k | if (txns_enabled_ && |
427 | 89.0k | data.transaction_participant_context && |
428 | 88.6k | (is_sys_catalog_ || transactional)) { |
429 | 25.7k | transaction_participant_ = std::make_unique<TransactionParticipant>( |
430 | 25.7k | data.transaction_participant_context, this, tablet_metrics_entity_); |
431 | | // Create transaction manager for secondary index update. |
432 | 25.7k | if (has_index) { |
433 | 0 | transaction_manager_ = std::make_unique<client::TransactionManager>( |
434 | 0 | client_future_.get(), scoped_refptr<server::Clock>(clock_), local_tablet_filter_); |
435 | 0 | } |
436 | 25.7k | } |
437 | | |
438 | | // Create index table metadata cache for secondary index update. |
439 | 89.2k | if (has_index) { |
440 | 0 | CreateNewYBMetaDataCache(); |
441 | 0 | } |
442 | | |
443 | | // If this is a unique index tablet, set up the index primary key schema. |
444 | 89.2k | if (table_info->index_info && table_info->index_info->is_unique()) { |
445 | 1.92k | unique_index_key_schema_ = std::make_unique<Schema>(); |
446 | 1.92k | const auto ids = table_info->index_info->index_key_column_ids(); |
447 | 1.92k | CHECK_OK(table_info->schema->CreateProjectionByIdsIgnoreMissing( |
448 | 1.92k | ids, unique_index_key_schema_.get())); |
449 | 1.92k | } |
450 | | |
451 | 89.2k | if (data.transaction_coordinator_context && |
452 | 88.7k | table_info->table_type == TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
453 | 29.1k | transaction_coordinator_ = std::make_unique<TransactionCoordinator>( |
454 | 29.1k | metadata_->fs_manager()->uuid(), |
455 | 29.1k | data.transaction_coordinator_context, |
456 | 29.1k | metrics_->expired_transactions.get()); |
457 | 29.1k | } |
458 | | |
459 | 89.2k | snapshots_ = std::make_unique<TabletSnapshots>(this); |
460 | | |
461 | 89.2k | snapshot_coordinator_ = data.snapshot_coordinator; |
462 | | |
463 | 89.2k | if (metadata_->tablet_data_state() == TabletDataState::TABLET_DATA_SPLIT_COMPLETED) { |
464 | 1 | SplitDone(); |
465 | 1 | } |
466 | 89.2k | auto restoration_hybrid_time = metadata_->restoration_hybrid_time(); |
467 | 89.2k | if (restoration_hybrid_time && transaction_participant_ && FLAGS_consistent_restore) { |
468 | 0 | transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time); |
469 | 0 | } |
470 | 89.2k | SyncRestoringOperationFilter(ResetSplit::kFalse); |
471 | 89.2k | } |
472 | | |
473 | 48.1k | Tablet::~Tablet() { |
474 | 48.1k | if (StartShutdown()) { |
475 | 481 | CompleteShutdown(); |
476 | 47.6k | } else { |
477 | 47.6k | auto state = state_; |
478 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, state != kShutdown) |
479 | 18.4E | << "Destroying Tablet that did not complete shutdown: " << state; |
480 | 47.6k | } |
481 | 48.1k | if (block_based_table_mem_tracker_) { |
482 | 47.6k | block_based_table_mem_tracker_->UnregisterFromParent(); |
483 | 47.6k | } |
484 | 48.1k | mem_tracker_->UnregisterFromParent(); |
485 | 48.1k | } |
486 | | |
487 | 89.2k | Status Tablet::Open() { |
488 | 89.2k | TRACE_EVENT0("tablet", "Tablet::Open"); |
489 | 89.2k | std::lock_guard<rw_spinlock> lock(component_lock_); |
490 | 0 | CHECK_EQ(state_, kInitialized) << "already open"; |
491 | 89.2k | CHECK(schema()->has_column_ids()); |
492 | | |
493 | 89.2k | switch (table_type_) { |
494 | 10.5k | case TableType::PGSQL_TABLE_TYPE: FALLTHROUGH_INTENDED; |
495 | 56.7k | case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED; |
496 | 60.0k | case TableType::REDIS_TABLE_TYPE: |
497 | 60.0k | RETURN_NOT_OK(OpenKeyValueTablet()); |
498 | 60.0k | state_ = kBootstrapping; |
499 | 60.0k | return Status::OK(); |
500 | 29.1k | case TableType::TRANSACTION_STATUS_TABLE_TYPE: |
501 | 29.1k | state_ = kBootstrapping; |
502 | 29.1k | return Status::OK(); |
503 | 0 | } |
504 | 0 | FATAL_INVALID_ENUM_VALUE(TableType, table_type_); |
505 | |
|
506 | 0 | return Status::OK(); |
507 | 0 | } |
508 | | |
509 | 220k | Status Tablet::CreateTabletDirectories(const string& db_dir, FsManager* fs) { |
510 | 220k | LOG_WITH_PREFIX(INFO) << "Creating RocksDB database in dir " << db_dir; |
511 | | |
512 | | // Create the directory table-uuid first. |
513 | 220k | RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(DirName(db_dir)), |
514 | 220k | Format("Failed to create RocksDB table directory $0", DirName(db_dir))); |
515 | | |
516 | 220k | RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(db_dir), |
517 | 220k | Format("Failed to create RocksDB tablet directory $0", db_dir)); |
518 | | |
519 | 220k | RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(db_dir + kIntentsDBSuffix), |
520 | 220k | Format("Failed to create RocksDB tablet intents directory $0", db_dir)); |
521 | | |
522 | 220k | RETURN_NOT_OK(snapshots_->CreateDirectories(db_dir, fs)); |
523 | | |
524 | 220k | return Status::OK(); |
525 | 220k | } |
526 | | |
527 | 46.7k | void Tablet::ResetYBMetaDataCache() { |
528 | 46.7k | std::atomic_store_explicit(&metadata_cache_, {}, std::memory_order_release); |
529 | 46.7k | } |
530 | | |
531 | 39.7k | void Tablet::CreateNewYBMetaDataCache() { |
532 | 39.7k | std::atomic_store_explicit(&metadata_cache_, |
533 | 39.7k | std::make_shared<client::YBMetaDataCache>(client_future_.get(), |
534 | 39.7k | false /* Update permissions cache */), |
535 | 39.7k | std::memory_order_release); |
536 | 39.7k | } |
537 | | |
538 | 43.4k | std::shared_ptr<client::YBMetaDataCache> Tablet::YBMetaDataCache() { |
539 | 43.4k | return std::atomic_load_explicit(&metadata_cache_, std::memory_order_acquire); |
540 | 43.4k | } |
541 | | |
542 | | template <class F> |
543 | 328k | auto MakeMemTableFlushFilterFactory(const F& f) { |
544 | | // Trick to get type of mem_table_flush_filter_factory field. |
545 | 328k | typedef typename decltype( |
546 | 328k | static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type |
547 | 328k | MemTableFlushFilterFactoryType; |
548 | 328k | return std::make_shared<MemTableFlushFilterFactoryType>(f); |
549 | 328k | } tablet.cc:_ZN2yb6tablet30MakeMemTableFlushFilterFactoryIZNS0_6Tablet18OpenKeyValueTabletEvE3$_0EEDaRKT_ Line | Count | Source | 543 | 220k | auto MakeMemTableFlushFilterFactory(const F& f) { | 544 | | // Trick to get type of mem_table_flush_filter_factory field. | 545 | 220k | typedef typename decltype( | 546 | 220k | static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type | 547 | 220k | MemTableFlushFilterFactoryType; | 548 | 220k | return std::make_shared<MemTableFlushFilterFactoryType>(f); | 549 | 220k | } |
tablet.cc:_ZN2yb6tablet30MakeMemTableFlushFilterFactoryIZNS0_6Tablet18OpenKeyValueTabletEvE3$_2EEDaRKT_ Line | Count | Source | 543 | 107k | auto MakeMemTableFlushFilterFactory(const F& f) { | 544 | | // Trick to get type of mem_table_flush_filter_factory field. | 545 | 107k | typedef typename decltype( | 546 | 107k | static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type | 547 | 107k | MemTableFlushFilterFactoryType; | 548 | 107k | return std::make_shared<MemTableFlushFilterFactoryType>(f); | 549 | 107k | } |
|
550 | | |
551 | | template <class F> |
552 | 220k | auto MakeMaxFileSizeWithTableTTLFunction(const F& f) { |
553 | | // Trick to get type of max_file_size_for_compaction field. |
554 | 220k | typedef typename decltype( |
555 | 220k | static_cast<rocksdb::Options*>(nullptr)->max_file_size_for_compaction)::element_type |
556 | 220k | MaxFileSizeWithTableTTLFunction; |
557 | 220k | return std::make_shared<MaxFileSizeWithTableTTLFunction>(f); |
558 | 220k | } |
559 | | |
560 | 6.93k | Result<bool> Tablet::IntentsDbFlushFilter(const rocksdb::MemTable& memtable) { |
561 | 0 | VLOG_WITH_PREFIX(4) << __func__; |
562 | | |
563 | 6.93k | auto frontiers = memtable.Frontiers(); |
564 | 6.93k | if (frontiers) { |
565 | 6.93k | const auto& intents_largest = |
566 | 6.93k | down_cast<const docdb::ConsensusFrontier&>(frontiers->Largest()); |
567 | | |
568 | | // We allow to flush intents DB only after regular DB. |
569 | | // Otherwise we could lose applied intents when corresponding regular records were not |
570 | | // flushed. |
571 | 6.93k | auto regular_flushed_frontier = regular_db_->GetFlushedFrontier(); |
572 | 6.93k | if (regular_flushed_frontier) { |
573 | 2.00k | const auto& regular_flushed_largest = |
574 | 2.00k | static_cast<const docdb::ConsensusFrontier&>(*regular_flushed_frontier); |
575 | 2.00k | if (regular_flushed_largest.op_id().index >= intents_largest.op_id().index) { |
576 | 0 | VLOG_WITH_PREFIX(4) << __func__ << ", regular already flushed"; |
577 | 82 | return true; |
578 | 82 | } |
579 | 0 | } |
580 | 0 | } else { |
581 | 0 | VLOG_WITH_PREFIX(4) << __func__ << ", no frontiers"; |
582 | 0 | } |
583 | | |
584 | | // If regular db does not have anything to flush, it means that we have just added intents, |
585 | | // without apply, so it is OK to flush the intents RocksDB. |
586 | 6.85k | auto flush_intention = regular_db_->GetFlushAbility(); |
587 | 6.85k | if (flush_intention == rocksdb::FlushAbility::kNoNewData) { |
588 | 0 | VLOG_WITH_PREFIX(4) << __func__ << ", no new data"; |
589 | 34 | return true; |
590 | 34 | } |
591 | | |
592 | | // Force flush of regular DB if we were not able to flush for too long. |
593 | 6.81k | auto timeout = std::chrono::milliseconds(FLAGS_intents_flush_max_delay_ms); |
594 | 6.81k | if (flush_intention != rocksdb::FlushAbility::kAlreadyFlushing && |
595 | 1.61k | (shutdown_requested_.load(std::memory_order_acquire) || |
596 | 1.61k | std::chrono::steady_clock::now() > memtable.FlushStartTime() + timeout)) { |
597 | 0 | VLOG_WITH_PREFIX(2) << __func__ << ", force flush"; |
598 | | |
599 | 20 | rocksdb::FlushOptions options; |
600 | 20 | options.wait = false; |
601 | 20 | RETURN_NOT_OK(regular_db_->Flush(options)); |
602 | 20 | } |
603 | | |
604 | 6.81k | return false; |
605 | 6.81k | } |
606 | | |
607 | 1.84M | std::string Tablet::LogPrefix() const { |
608 | 1.84M | return MakeTabletLogPrefix(tablet_id(), log_prefix_suffix_); |
609 | 1.84M | } |
610 | | |
611 | | namespace { |
612 | | |
613 | 328k | std::string LogDbTypePrefix(docdb::StorageDbType db_type) { |
614 | 328k | switch (db_type) { |
615 | 220k | case docdb::StorageDbType::kRegular: |
616 | 220k | return "R"; |
617 | 108k | case docdb::StorageDbType::kIntents: |
618 | 108k | return "I"; |
619 | 0 | } |
620 | 0 | FATAL_INVALID_ENUM_VALUE(docdb::StorageDbType, db_type); |
621 | 0 | } |
622 | | |
623 | | std::string MakeTabletLogPrefix( |
624 | 328k | const TabletId& tablet_id, const std::string& log_prefix_suffix, docdb::StorageDbType db_type) { |
625 | 328k | return MakeTabletLogPrefix( |
626 | 328k | tablet_id, Format("$0 [$1]", log_prefix_suffix, LogDbTypePrefix(db_type))); |
627 | 328k | } |
628 | | |
629 | | } // namespace |
630 | | |
631 | 328k | std::string Tablet::LogPrefix(docdb::StorageDbType db_type) const { |
632 | 328k | return MakeTabletLogPrefix(tablet_id(), log_prefix_suffix_, db_type); |
633 | 328k | } |
634 | | |
635 | 220k | Status Tablet::OpenKeyValueTablet() { |
636 | 220k | static const std::string kRegularDB = "RegularDB"s; |
637 | 220k | static const std::string kIntentsDB = "IntentsDB"s; |
638 | | |
639 | 220k | rocksdb::BlockBasedTableOptions table_options; |
640 | 220k | if (!metadata()->primary_table_info()->index_info || metadata()->colocated()) { |
641 | | // This tablet is not dedicated to the index table, so it should be effective to use |
642 | | // advanced key-value encoding algorithm optimized for docdb keys structure. |
643 | 130k | table_options.use_delta_encoding = true; |
644 | 130k | table_options.data_block_key_value_encoding_format = |
645 | 130k | VERIFY_RESULT(docdb::GetConfiguredKeyValueEncodingFormat( |
646 | 130k | FLAGS_regular_tablets_data_block_key_value_encoding)); |
647 | 130k | } |
648 | 220k | rocksdb::Options rocksdb_options; |
649 | 220k | InitRocksDBOptions( |
650 | 220k | &rocksdb_options, LogPrefix(docdb::StorageDbType::kRegular), std::move(table_options)); |
651 | 220k | rocksdb_options.mem_tracker = MemTracker::FindOrCreateTracker(kRegularDB, mem_tracker_); |
652 | 220k | rocksdb_options.block_based_table_mem_tracker = |
653 | 220k | MemTracker::FindOrCreateTracker( |
654 | 220k | Format("$0-$1", kRegularDB, tablet_id()), block_based_table_mem_tracker_, |
655 | 220k | AddToParent::kTrue, CreateMetrics::kFalse); |
656 | | // We may not have a metrics_entity_ instantiated in tests. |
657 | 220k | if (tablet_metrics_entity_) { |
658 | 220k | rocksdb_options.block_based_table_mem_tracker->SetMetricEntity( |
659 | 220k | tablet_metrics_entity_, Format("$0_$1", "BlockBasedTable", kRegularDB)); |
660 | 220k | } |
661 | | |
662 | 220k | key_bounds_ = docdb::KeyBounds(metadata()->lower_bound_key(), metadata()->upper_bound_key()); |
663 | | |
664 | | // Install the history cleanup handler. Note that TabletRetentionPolicy is going to hold a raw ptr |
665 | | // to this tablet. So, we ensure that rocksdb_ is reset before this tablet gets destroyed. |
666 | 220k | rocksdb_options.compaction_filter_factory = make_shared<DocDBCompactionFilterFactory>( |
667 | 220k | retention_policy_, &key_bounds_); |
668 | | |
669 | 2.65k | rocksdb_options.mem_table_flush_filter_factory = MakeMemTableFlushFilterFactory([this] { |
670 | 2.65k | if (mem_table_flush_filter_factory_) { |
671 | 2.09k | return mem_table_flush_filter_factory_(); |
672 | 2.09k | } |
673 | 563 | return rocksdb::MemTableFilter(); |
674 | 563 | }); |
675 | 220k | if (FLAGS_tablet_enable_ttl_file_filter) { |
676 | 0 | rocksdb_options.compaction_file_filter_factory = |
677 | 0 | std::make_shared<docdb::DocDBCompactionFileFilterFactory>(retention_policy_, clock()); |
678 | 0 | } |
679 | | |
680 | | // Use a function that checks the table TTL before returning a value for max file size |
681 | | // for compactions. |
682 | 904k | rocksdb_options.max_file_size_for_compaction = MakeMaxFileSizeWithTableTTLFunction([this] { |
683 | 904k | if (FLAGS_rocksdb_max_file_size_for_compaction > 0 && |
684 | 0 | retention_policy_->GetRetentionDirective().table_ttl != docdb::Value::kMaxTtl) { |
685 | 0 | return FLAGS_rocksdb_max_file_size_for_compaction; |
686 | 0 | } |
687 | 904k | return std::numeric_limits<uint64_t>::max(); |
688 | 904k | }); |
689 | | |
690 | 220k | rocksdb_options.disable_auto_compactions = true; |
691 | 220k | rocksdb_options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max(); |
692 | 220k | rocksdb_options.level0_stop_writes_trigger = std::numeric_limits<int>::max(); |
693 | | |
694 | 220k | rocksdb::Options regular_rocksdb_options(rocksdb_options); |
695 | 220k | regular_rocksdb_options.listeners.push_back( |
696 | 220k | std::make_shared<RegularRocksDbListener>(this, regular_rocksdb_options.log_prefix)); |
697 | | |
698 | 220k | const string db_dir = metadata()->rocksdb_dir(); |
699 | 220k | RETURN_NOT_OK(CreateTabletDirectories(db_dir, metadata()->fs_manager())); |
700 | | |
701 | 220k | LOG(INFO) << "Opening RocksDB at: " << db_dir; |
702 | 220k | rocksdb::DB* db = nullptr; |
703 | 220k | rocksdb::Status rocksdb_open_status = rocksdb::DB::Open(regular_rocksdb_options, db_dir, &db); |
704 | 220k | if (!rocksdb_open_status.ok()) { |
705 | 0 | LOG_WITH_PREFIX(ERROR) << "Failed to open a RocksDB database in directory " << db_dir << ": " |
706 | 0 | << rocksdb_open_status; |
707 | 0 | if (db != nullptr) { |
708 | 0 | delete db; |
709 | 0 | } |
710 | 0 | return STATUS(IllegalState, rocksdb_open_status.ToString()); |
711 | 0 | } |
712 | 220k | regular_db_.reset(db); |
713 | 220k | regular_db_->ListenFilesChanged(std::bind(&Tablet::RegularDbFilesChanged, this)); |
714 | | |
715 | 220k | if (transaction_participant_) { |
716 | 107k | LOG_WITH_PREFIX(INFO) << "Opening intents DB at: " << db_dir + kIntentsDBSuffix; |
717 | 107k | rocksdb::Options intents_rocksdb_options(rocksdb_options); |
718 | 107k | docdb::SetLogPrefix(&intents_rocksdb_options, LogPrefix(docdb::StorageDbType::kIntents)); |
719 | | |
720 | 6.93k | intents_rocksdb_options.mem_table_flush_filter_factory = MakeMemTableFlushFilterFactory([this] { |
721 | 6.93k | return std::bind(&Tablet::IntentsDbFlushFilter, this, _1); |
722 | 6.93k | }); |
723 | | |
724 | 107k | intents_rocksdb_options.compaction_filter_factory = |
725 | 107k | FLAGS_tablet_do_compaction_cleanup_for_intents ? |
726 | 18.4E | std::make_shared<docdb::DocDBIntentsCompactionFilterFactory>(this, &key_bounds_) : nullptr; |
727 | | |
728 | 107k | intents_rocksdb_options.mem_tracker = MemTracker::FindOrCreateTracker(kIntentsDB, mem_tracker_); |
729 | 107k | intents_rocksdb_options.block_based_table_mem_tracker = |
730 | 107k | MemTracker::FindOrCreateTracker( |
731 | 107k | Format("$0-$1", kIntentsDB, tablet_id()), block_based_table_mem_tracker_, |
732 | 107k | AddToParent::kTrue, CreateMetrics::kFalse); |
733 | | // We may not have a metrics_entity_ instantiated in tests. |
734 | 107k | if (tablet_metrics_entity_) { |
735 | 107k | intents_rocksdb_options.block_based_table_mem_tracker->SetMetricEntity( |
736 | 107k | tablet_metrics_entity_, Format("$0_$1", "BlockBasedTable", kIntentsDB)); |
737 | 107k | } |
738 | 107k | intents_rocksdb_options.statistics = intentsdb_statistics_; |
739 | | |
740 | 107k | rocksdb::DB* intents_db = nullptr; |
741 | 107k | RETURN_NOT_OK( |
742 | 107k | rocksdb::DB::Open(intents_rocksdb_options, db_dir + kIntentsDBSuffix, &intents_db)); |
743 | 107k | intents_db_.reset(intents_db); |
744 | 107k | intents_db_->ListenFilesChanged(std::bind(&Tablet::CleanupIntentFiles, this)); |
745 | 107k | } |
746 | | |
747 | 220k | ql_storage_.reset(new docdb::QLRocksDBStorage(doc_db())); |
748 | 220k | if (transaction_participant_) { |
749 | 107k | transaction_participant_->SetDB(doc_db(), &key_bounds_, &pending_non_abortable_op_counter_); |
750 | 107k | } |
751 | | |
752 | | // Don't allow reads at timestamps lower than the highest history cutoff of a past compaction. |
753 | 220k | auto regular_flushed_frontier = regular_db_->GetFlushedFrontier(); |
754 | 220k | if (regular_flushed_frontier) { |
755 | 1.55k | retention_policy_->UpdateCommittedHistoryCutoff( |
756 | 1.55k | static_cast<const docdb::ConsensusFrontier&>(*regular_flushed_frontier).history_cutoff()); |
757 | 1.55k | } |
758 | | |
759 | 220k | LOG_WITH_PREFIX(INFO) << "Successfully opened a RocksDB database at " << db_dir |
760 | 220k | << ", obj: " << db; |
761 | | |
762 | 220k | return Status::OK(); |
763 | 220k | } |
764 | | |
765 | 3.08k | void Tablet::RegularDbFilesChanged() { |
766 | 3.08k | std::lock_guard<std::mutex> lock(num_sst_files_changed_listener_mutex_); |
767 | 3.08k | if (num_sst_files_changed_listener_) { |
768 | 2.40k | num_sst_files_changed_listener_(); |
769 | 2.40k | } |
770 | 3.08k | } |
771 | | |
772 | 88.7k | void Tablet::SetCleanupPool(ThreadPool* thread_pool) { |
773 | 88.7k | if (!transaction_participant_) { |
774 | 62.8k | return; |
775 | 62.8k | } |
776 | | |
777 | 25.8k | cleanup_intent_files_token_ = thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL); |
778 | | |
779 | 25.8k | CleanupIntentFiles(); |
780 | 25.8k | } |
781 | | |
782 | 32.8k | void Tablet::CleanupIntentFiles() { |
783 | 32.8k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
784 | 32.8k | if (!scoped_read_operation.ok() || state_ != State::kOpen || !FLAGS_delete_intents_sst_files || |
785 | 32.8k | !cleanup_intent_files_token_) { |
786 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << "Skip"; |
787 | 0 | return; |
788 | 0 | } |
789 | | |
790 | 32.8k | WARN_NOT_OK( |
791 | 32.8k | cleanup_intent_files_token_->SubmitFunc(std::bind(&Tablet::DoCleanupIntentFiles, this)), |
792 | 32.8k | "Submit cleanup intent files failed"); |
793 | 32.8k | } |
794 | | |
795 | 32.8k | void Tablet::DoCleanupIntentFiles() { |
796 | 32.8k | if (metadata_->is_under_twodc_replication()) { |
797 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << "Exit because of TwoDC replication"; |
798 | 0 | return; |
799 | 0 | } |
800 | 32.8k | HybridTime best_file_max_ht = HybridTime::kMax; |
801 | 32.8k | std::vector<rocksdb::LiveFileMetaData> files; |
802 | | // Stops when there are no more files to delete. |
803 | 32.8k | std::string previous_name; |
804 | 32.9k | while (GetAtomicFlag(&FLAGS_cleanup_intents_sst_files)) { |
805 | 32.9k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
806 | 32.9k | if (!scoped_read_operation.ok()) { |
807 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << "Failed to acquire scoped read operation"; |
808 | 0 | break; |
809 | 0 | } |
810 | | |
811 | 32.9k | best_file_max_ht = HybridTime::kMax; |
812 | 32.9k | const rocksdb::LiveFileMetaData* best_file = nullptr; |
813 | 32.9k | files.clear(); |
814 | 32.9k | intents_db_->GetLiveFilesMetaData(&files); |
815 | 32.9k | auto min_largest_seq_no = std::numeric_limits<rocksdb::SequenceNumber>::max(); |
816 | | |
817 | 5 | VLOG_WITH_PREFIX_AND_FUNC(5) << "Files: " << AsString(files); |
818 | | |
819 | 124 | for (const auto& file : files) { |
820 | 124 | if (file.largest.seqno < min_largest_seq_no) { |
821 | 124 | min_largest_seq_no = file.largest.seqno; |
822 | 124 | if (file.largest.user_frontier) { |
823 | 124 | auto& frontier = down_cast<docdb::ConsensusFrontier&>(*file.largest.user_frontier); |
824 | 124 | best_file_max_ht = frontier.hybrid_time(); |
825 | 0 | } else { |
826 | 0 | best_file_max_ht = HybridTime::kMax; |
827 | 0 | } |
828 | 124 | best_file = &file; |
829 | 124 | } |
830 | 124 | } |
831 | | |
832 | 32.9k | auto min_running_start_ht = transaction_participant_->MinRunningHybridTime(); |
833 | 32.9k | if (!min_running_start_ht.is_valid() || min_running_start_ht <= best_file_max_ht) { |
834 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) |
835 | 18.4E | << "Cannot delete because of running transactions: " << min_running_start_ht |
836 | 18.4E | << ", best file max ht: " << best_file_max_ht; |
837 | 32.8k | break; |
838 | 32.8k | } |
839 | 103 | if (best_file->name == previous_name) { |
840 | 0 | LOG_WITH_PREFIX_AND_FUNC(INFO) |
841 | 0 | << "Attempt to delete same file: " << previous_name << ", stopping cleanup"; |
842 | 0 | break; |
843 | 0 | } |
844 | 103 | previous_name = best_file->name; |
845 | | |
846 | 103 | LOG_WITH_PREFIX_AND_FUNC(INFO) |
847 | 103 | << "Intents SST file will be deleted: " << best_file->ToString() |
848 | 103 | << ", max ht: " << best_file_max_ht << ", min running transaction start ht: " |
849 | 103 | << min_running_start_ht; |
850 | 103 | auto flush_status = regular_db_->Flush(rocksdb::FlushOptions()); |
851 | 103 | if (!flush_status.ok()) { |
852 | 0 | LOG_WITH_PREFIX_AND_FUNC(WARNING) << "Failed to flush regular db: " << flush_status; |
853 | 0 | break; |
854 | 0 | } |
855 | 103 | auto delete_status = intents_db_->DeleteFile(best_file->name); |
856 | 103 | if (!delete_status.ok()) { |
857 | 0 | LOG_WITH_PREFIX_AND_FUNC(WARNING) |
858 | 0 | << "Failed to delete " << best_file->ToString() << ", all files " << AsString(files) |
859 | 0 | << ": " << delete_status; |
860 | 0 | break; |
861 | 0 | } |
862 | 103 | } |
863 | | |
864 | 32.8k | if (best_file_max_ht != HybridTime::kMax) { |
865 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << "Wait min running hybrid time: " << best_file_max_ht; |
866 | 31 | transaction_participant_->WaitMinRunningHybridTime(best_file_max_ht); |
867 | 31 | } |
868 | 32.8k | } |
869 | | |
870 | 89.6k | Status Tablet::EnableCompactions(ScopedRWOperationPause* non_abortable_ops_pause) { |
871 | 89.6k | if (!non_abortable_ops_pause) { |
872 | 88.8k | auto operation = CreateNonAbortableScopedRWOperation(); |
873 | 88.8k | RETURN_NOT_OK(operation); |
874 | 88.8k | return DoEnableCompactions(); |
875 | 862 | } |
876 | | |
877 | 862 | return DoEnableCompactions(); |
878 | 862 | } |
879 | | |
880 | 249k | Status Tablet::DoEnableCompactions() { |
881 | 249k | Status regular_db_status; |
882 | 249k | std::unordered_map<std::string, std::string> new_options = { |
883 | 249k | { "level0_slowdown_writes_trigger"s, |
884 | 249k | std::to_string(max_if_negative(FLAGS_rocksdb_level0_slowdown_writes_trigger))}, |
885 | 249k | { "level0_stop_writes_trigger"s, |
886 | 249k | std::to_string(max_if_negative(FLAGS_rocksdb_level0_stop_writes_trigger))}, |
887 | 249k | }; |
888 | 249k | if (regular_db_) { |
889 | 220k | WARN_WITH_PREFIX_NOT_OK( |
890 | 220k | regular_db_->SetOptions(new_options, /* dump_options= */ false), |
891 | 220k | "Failed to set options on regular DB"); |
892 | 220k | regular_db_status = |
893 | 220k | regular_db_->EnableAutoCompaction({regular_db_->DefaultColumnFamily()}); |
894 | 220k | if (!regular_db_status.ok()) { |
895 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to enable compactions on regular DB: " |
896 | 0 | << regular_db_status; |
897 | 0 | } |
898 | 220k | } |
899 | 249k | if (intents_db_) { |
900 | 107k | WARN_WITH_PREFIX_NOT_OK( |
901 | 107k | intents_db_->SetOptions(new_options, /* dump_options= */ false), |
902 | 107k | "Failed to set options on provisional records DB"); |
903 | 107k | Status intents_db_status = |
904 | 107k | intents_db_->EnableAutoCompaction({intents_db_->DefaultColumnFamily()}); |
905 | 107k | if (!intents_db_status.ok()) { |
906 | 0 | LOG_WITH_PREFIX(WARNING) |
907 | 0 | << "Failed to enable compactions on provisional records DB: " << intents_db_status; |
908 | 0 | return intents_db_status; |
909 | 0 | } |
910 | 249k | } |
911 | 249k | return regular_db_status; |
912 | 249k | } |
913 | | |
914 | 89.0k | void Tablet::MarkFinishedBootstrapping() { |
915 | 89.0k | CHECK_EQ(state_, kBootstrapping); |
916 | 89.0k | state_ = kOpen; |
917 | 89.0k | } |
918 | | |
919 | 144k | bool Tablet::StartShutdown(const IsDropTable is_drop_table) { |
920 | 144k | LOG_WITH_PREFIX(INFO) << __func__; |
921 | | |
922 | 144k | bool expected = false; |
923 | 144k | if (!shutdown_requested_.compare_exchange_strong(expected, true)) { |
924 | 95.9k | return false; |
925 | 95.9k | } |
926 | | |
927 | 48.2k | if (transaction_participant_) { |
928 | 19.3k | transaction_participant_->StartShutdown(); |
929 | 19.3k | } |
930 | | |
931 | 48.2k | return true; |
932 | 48.2k | } |
933 | | |
934 | 48.2k | void Tablet::CompleteShutdown(IsDropTable is_drop_table) { |
935 | 48.2k | LOG_WITH_PREFIX(INFO) << __func__ << "(" << is_drop_table << ")"; |
936 | | |
937 | 48.2k | StartShutdown(); |
938 | | |
939 | 48.2k | auto op_pauses = StartShutdownRocksDBs(DisableFlushOnShutdown(is_drop_table), Stop::kTrue); |
940 | 48.2k | if (!op_pauses.ok()) { |
941 | 0 | LOG_WITH_PREFIX(DFATAL) << "Failed to shut down: " << op_pauses.status(); |
942 | 0 | return; |
943 | 0 | } |
944 | | |
945 | 48.2k | cleanup_intent_files_token_.reset(); |
946 | | |
947 | 48.2k | if (transaction_coordinator_) { |
948 | 128 | transaction_coordinator_->Shutdown(); |
949 | 128 | } |
950 | | |
951 | 48.2k | if (transaction_participant_) { |
952 | 19.3k | transaction_participant_->CompleteShutdown(); |
953 | 19.3k | } |
954 | | |
955 | 48.2k | { |
956 | 48.2k | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
957 | | |
958 | 48.2k | if (completed_split_log_anchor_) { |
959 | 21 | WARN_NOT_OK(log_anchor_registry_->Unregister(completed_split_log_anchor_.get()), |
960 | 21 | "Unregister split anchor"); |
961 | 21 | } |
962 | | |
963 | 48.2k | if (completed_split_operation_filter_) { |
964 | 21 | UnregisterOperationFilterUnlocked(completed_split_operation_filter_.get()); |
965 | 21 | } |
966 | | |
967 | 48.2k | if (restoring_operation_filter_) { |
968 | 0 | UnregisterOperationFilterUnlocked(restoring_operation_filter_.get()); |
969 | 0 | } |
970 | 48.2k | } |
971 | | |
972 | 48.2k | std::lock_guard<rw_spinlock> lock(component_lock_); |
973 | | |
974 | | // Shutdown the RocksDB instance for this tablet, if present. |
975 | | // Destroy intents and regular DBs in reverse order to their creation. |
976 | | // Also it makes sure that regular DB is alive during flush filter of intents db. |
977 | 48.2k | WARN_NOT_OK(CompleteShutdownRocksDBs(Destroy::kFalse, &(*op_pauses)), |
978 | 48.2k | "Failed to reset rocksdb during shutdown"); |
979 | | |
980 | 48.2k | if (post_split_compaction_task_pool_token_) { |
981 | 0 | post_split_compaction_task_pool_token_->Shutdown(); |
982 | 0 | } |
983 | | |
984 | 48.2k | state_ = kShutdown; |
985 | | |
986 | 96.3k | for (auto* op_pause : op_pauses->AsArray()) { |
987 | | // Release the mutex that prevents snapshot restore / truncate operations from running. Such |
988 | | // operations are no longer possible because the tablet has shut down. When we start the |
989 | | // "read/write operation pause", we incremented the "exclusive operation" counter. This will |
990 | | // prevent us from decrementing that counter back, disabling read/write operations permanently. |
991 | 96.3k | op_pause->ReleaseMutexButKeepDisabled(); |
992 | | // Ensure that op_pause stays in scope throughout this function. |
993 | 18.4E | LOG_IF(DFATAL, !op_pause->status().ok()) << op_pause->status(); |
994 | 96.3k | } |
995 | 48.2k | } |
996 | | |
997 | | CHECKED_STATUS ResetRocksDB( |
998 | 417k | bool destroy, const rocksdb::Options& options, std::unique_ptr<rocksdb::DB>* db) { |
999 | 417k | if (!*db) { |
1000 | 107k | return Status::OK(); |
1001 | 107k | } |
1002 | | |
1003 | 310k | auto dir = (**db).GetName(); |
1004 | 310k | db->reset(); |
1005 | 310k | if (!destroy) { |
1006 | 67.4k | return Status::OK(); |
1007 | 67.4k | } |
1008 | | |
1009 | 242k | return rocksdb::DestroyDB(dir, options); |
1010 | 242k | } |
1011 | | |
1012 | | Result<TabletScopedRWOperationPauses> Tablet::StartShutdownRocksDBs( |
1013 | 208k | DisableFlushOnShutdown disable_flush_on_shutdown, Stop stop) { |
1014 | 208k | TabletScopedRWOperationPauses op_pauses; |
1015 | | |
1016 | 417k | auto pause = [this, stop](const Abortable abortable) -> Result<ScopedRWOperationPause> { |
1017 | 417k | auto op_pause = PauseReadWriteOperations(abortable, stop); |
1018 | 417k | if (!op_pause.ok()) { |
1019 | 0 | return op_pause.status().CloneAndPrepend("Failed to stop read/write operations: "); |
1020 | 0 | } |
1021 | 417k | return std::move(op_pause); |
1022 | 417k | }; |
1023 | | |
1024 | 208k | op_pauses.non_abortable = VERIFY_RESULT(pause(Abortable::kFalse)); |
1025 | | |
1026 | 208k | bool expected = false; |
1027 | | // If shutdown has been already requested, we still might need to wait for all pending read/write |
1028 | | // operations to complete here, because caller is not holding ScopedRWOperationPause. |
1029 | 208k | if (rocksdb_shutdown_requested_.compare_exchange_strong(expected, true)) { |
1030 | 417k | for (auto* db : {regular_db_.get(), intents_db_.get()}) { |
1031 | 417k | if (db) { |
1032 | 310k | db->SetDisableFlushOnShutdown(disable_flush_on_shutdown); |
1033 | 310k | db->StartShutdown(); |
1034 | 310k | } |
1035 | 417k | } |
1036 | 208k | } |
1037 | | |
1038 | 208k | op_pauses.abortable = VERIFY_RESULT(pause(Abortable::kTrue)); |
1039 | | |
1040 | 208k | return op_pauses; |
1041 | 208k | } |
1042 | | |
1043 | | Status Tablet::CompleteShutdownRocksDBs( |
1044 | 208k | Destroy destroy, TabletScopedRWOperationPauses* ops_pauses) { |
1045 | | // We need non-null ops_pauses just to guarantee that PauseReadWriteOperations has been called. |
1046 | 208k | RSTATUS_DCHECK( |
1047 | 208k | ops_pauses != nullptr, InvalidArgument, |
1048 | 208k | "ops_pauses could not be null, StartRocksDbShutdown should be called before " |
1049 | 208k | "ShutdownRocksDBs."); |
1050 | | |
1051 | 208k | if (intents_db_) { |
1052 | 101k | intents_db_->ListenFilesChanged(nullptr); |
1053 | 101k | } |
1054 | | |
1055 | 208k | rocksdb::Options rocksdb_options; |
1056 | 208k | if (destroy) { |
1057 | 160k | InitRocksDBOptions(&rocksdb_options, LogPrefix()); |
1058 | 160k | } |
1059 | | |
1060 | 208k | Status intents_status = ResetRocksDB(destroy, rocksdb_options, &intents_db_); |
1061 | 208k | Status regular_status = ResetRocksDB(destroy, rocksdb_options, ®ular_db_); |
1062 | 208k | key_bounds_ = docdb::KeyBounds(); |
1063 | | // Reset rocksdb_shutdown_requested_ to the initial state like RocksDBs were never opened, |
1064 | | // so we don't have to reset it on RocksDB open (we potentially can have several places in the |
1065 | | // code doing opening RocksDB while RocksDB shutdown is always going through |
1066 | | // Tablet::ShutdownRocksDBs). |
1067 | 208k | rocksdb_shutdown_requested_ = false; |
1068 | | |
1069 | 18.4E | return regular_status.ok() ? intents_status : regular_status; |
1070 | 208k | } |
1071 | | |
1072 | | Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::NewRowIterator( |
1073 | | const Schema &projection, |
1074 | | const ReadHybridTime read_hybrid_time, |
1075 | | const TableId& table_id, |
1076 | | CoarseTimePoint deadline, |
1077 | | AllowBootstrappingState allow_bootstrapping_state, |
1078 | 123k | const Slice& sub_doc_key) const { |
1079 | 123k | if (state_ != kOpen && (!allow_bootstrapping_state || state_ != kBootstrapping)) { |
1080 | 0 | return STATUS_FORMAT(IllegalState, "Tablet in wrong state: $0", state_); |
1081 | 0 | } |
1082 | | |
1083 | 123k | if (table_type_ != TableType::YQL_TABLE_TYPE && table_type_ != TableType::PGSQL_TABLE_TYPE) { |
1084 | 0 | return STATUS_FORMAT(NotSupported, "Invalid table type: $0", table_type_); |
1085 | 0 | } |
1086 | | |
1087 | 123k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
1088 | 123k | RETURN_NOT_OK(scoped_read_operation); |
1089 | | |
1090 | 18.4E | VLOG_WITH_PREFIX(2) << "Created new Iterator reading at " << read_hybrid_time.ToString(); |
1091 | | |
1092 | 123k | const std::shared_ptr<tablet::TableInfo> table_info = |
1093 | 123k | VERIFY_RESULT(metadata_->GetTableInfo(table_id)); |
1094 | 123k | const Schema& schema = *table_info->schema; |
1095 | 123k | auto mapped_projection = std::make_unique<Schema>(); |
1096 | 123k | RETURN_NOT_OK(schema.GetMappedReadProjection(projection, mapped_projection.get())); |
1097 | | |
1098 | 123k | auto txn_op_ctx = VERIFY_RESULT(CreateTransactionOperationContext( |
1099 | 123k | /* transaction_id */ boost::none, |
1100 | 123k | schema.table_properties().is_ysql_catalog_table())); |
1101 | 123k | const auto read_time = read_hybrid_time |
1102 | 24.0k | ? read_hybrid_time |
1103 | 99.6k | : ReadHybridTime::SingleTime(VERIFY_RESULT(SafeTime(RequireLease::kFalse))); |
1104 | 123k | auto result = std::make_unique<DocRowwiseIterator>( |
1105 | 123k | std::move(mapped_projection), schema, txn_op_ctx, doc_db(), |
1106 | 123k | deadline, read_time, &pending_non_abortable_op_counter_); |
1107 | 123k | RETURN_NOT_OK(result->Init(table_type_, sub_doc_key)); |
1108 | 123k | return std::move(result); |
1109 | 123k | } |
1110 | | |
1111 | | Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::NewRowIterator( |
1112 | 0 | const TableId& table_id) const { |
1113 | 0 | const std::shared_ptr<tablet::TableInfo> table_info = |
1114 | 0 | VERIFY_RESULT(metadata_->GetTableInfo(table_id)); |
1115 | 0 | return NewRowIterator(*table_info->schema, {}, table_id); |
1116 | 0 | } |
1117 | | |
1118 | | Status Tablet::ApplyRowOperations( |
1119 | 4.76M | WriteOperation* operation, AlreadyAppliedToRegularDB already_applied_to_regular_db) { |
1120 | 4.76M | const auto& write_request = |
1121 | 4.76M | operation->consensus_round() && operation->consensus_round()->replicate_msg() |
1122 | | // Online case. |
1123 | 4.47M | ? operation->consensus_round()->replicate_msg()->write() |
1124 | | // Bootstrap case. |
1125 | 284k | : *operation->request(); |
1126 | 4.76M | const KeyValueWriteBatchPB& put_batch = write_request.write_batch(); |
1127 | 4.76M | if (metrics_) { |
1128 | 53 | VLOG(3) << "Applying write batch (write_pairs=" << put_batch.write_pairs().size() << "): " |
1129 | 53 | << put_batch.ShortDebugString(); |
1130 | 4.60M | metrics_->rows_inserted->IncrementBy(put_batch.write_pairs().size()); |
1131 | 4.60M | } |
1132 | | |
1133 | 4.76M | return ApplyOperation( |
1134 | 4.76M | *operation, write_request.batch_idx(), put_batch, already_applied_to_regular_db); |
1135 | 4.76M | } |
1136 | | |
1137 | | Status Tablet::ApplyOperation( |
1138 | | const Operation& operation, int64_t batch_idx, |
1139 | | const docdb::KeyValueWriteBatchPB& write_batch, |
1140 | 4.76M | AlreadyAppliedToRegularDB already_applied_to_regular_db) { |
1141 | 4.76M | auto hybrid_time = operation.WriteHybridTime(); |
1142 | | |
1143 | 4.76M | docdb::ConsensusFrontiers frontiers; |
1144 | | // Even if we have an external hybrid time, use the local commit hybrid time in the consensus |
1145 | | // frontier. |
1146 | 4.76M | auto frontiers_ptr = |
1147 | 4.76M | InitFrontiers(operation.op_id(), operation.hybrid_time(), &frontiers); |
1148 | 4.76M | if (frontiers_ptr) { |
1149 | 4.76M | auto ttl = write_batch.has_ttl() |
1150 | 276 | ? MonoDelta::FromNanoseconds(write_batch.ttl()) |
1151 | 4.76M | : docdb::Value::kMaxTtl; |
1152 | 4.76M | frontiers_ptr->Largest().set_max_value_level_ttl_expiration_time( |
1153 | 4.76M | docdb::FileExpirationFromValueTTL(operation.hybrid_time(), ttl)); |
1154 | 4.76M | } |
1155 | 4.76M | return ApplyKeyValueRowOperations( |
1156 | 4.76M | batch_idx, write_batch, frontiers_ptr, hybrid_time, already_applied_to_regular_db); |
1157 | 4.76M | } |
1158 | | |
1159 | | Status Tablet::WriteTransactionalBatch( |
1160 | | int64_t batch_idx, |
1161 | | const KeyValueWriteBatchPB& put_batch, |
1162 | | HybridTime hybrid_time, |
1163 | 1.32M | const rocksdb::UserFrontiers* frontiers) { |
1164 | 1.32M | auto transaction_id = CHECK_RESULT( |
1165 | 1.32M | FullyDecodeTransactionId(put_batch.transaction().transaction_id())); |
1166 | | |
1167 | 1.32M | bool store_metadata = false; |
1168 | 1.32M | if (put_batch.transaction().has_isolation()) { |
1169 | | // Store transaction metadata (status tablet, isolation level etc.) |
1170 | 1.02M | auto metadata = VERIFY_RESULT(TransactionMetadata::FromPB(put_batch.transaction())); |
1171 | 1.02M | auto add_result = transaction_participant()->Add(metadata); |
1172 | 1.02M | if (!add_result.ok()) { |
1173 | 60.3k | return add_result.status(); |
1174 | 60.3k | } |
1175 | 968k | store_metadata = add_result.get(); |
1176 | 968k | } |
1177 | 1.26M | boost::container::small_vector<uint8_t, 16> encoded_replicated_batch_idx_set; |
1178 | 1.26M | auto prepare_batch_data = transaction_participant()->PrepareBatchData( |
1179 | 1.26M | transaction_id, batch_idx, &encoded_replicated_batch_idx_set); |
1180 | 1.26M | if (!prepare_batch_data) { |
1181 | | // If metadata is missing it could be caused by aborted and removed transaction. |
1182 | | // In this case we should not add new intents for it. |
1183 | 119 | return STATUS(TryAgain, |
1184 | 119 | Format("Transaction metadata missing: $0, looks like it was just aborted", |
1185 | 119 | transaction_id), Slice(), |
1186 | 119 | PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)); |
1187 | 119 | } |
1188 | | |
1189 | 1.26M | auto isolation_level = prepare_batch_data->first; |
1190 | 1.26M | auto& last_batch_data = prepare_batch_data->second; |
1191 | | |
1192 | 1.26M | docdb::TransactionalWriter writer( |
1193 | 1.26M | put_batch, hybrid_time, transaction_id, isolation_level, |
1194 | 1.26M | docdb::PartialRangeKeyIntents(metadata_->UsePartialRangeKeyIntents()), |
1195 | 1.26M | Slice(encoded_replicated_batch_idx_set.data(), encoded_replicated_batch_idx_set.size()), |
1196 | 1.26M | last_batch_data.next_write_id); |
1197 | 1.26M | if (store_metadata) { |
1198 | 967k | writer.SetMetadataToStore(&put_batch.transaction()); |
1199 | 967k | } |
1200 | 1.26M | rocksdb::WriteBatch write_batch; |
1201 | 1.26M | write_batch.SetDirectWriter(&writer); |
1202 | 1.26M | RequestScope request_scope(transaction_participant_.get()); |
1203 | | |
1204 | 1.26M | WriteToRocksDB(frontiers, &write_batch, StorageDbType::kIntents); |
1205 | | |
1206 | 1.26M | last_batch_data.hybrid_time = hybrid_time; |
1207 | 1.26M | last_batch_data.next_write_id = writer.intra_txn_write_id(); |
1208 | 1.26M | transaction_participant()->BatchReplicated(transaction_id, last_batch_data); |
1209 | | |
1210 | 1.26M | return Status::OK(); |
1211 | 1.26M | } |
1212 | | |
1213 | | Status Tablet::ApplyKeyValueRowOperations( |
1214 | | int64_t batch_idx, |
1215 | | const KeyValueWriteBatchPB& put_batch, |
1216 | | const rocksdb::UserFrontiers* frontiers, |
1217 | | const HybridTime hybrid_time, |
1218 | 4.76M | AlreadyAppliedToRegularDB already_applied_to_regular_db) { |
1219 | 4.76M | if (put_batch.write_pairs().empty() && put_batch.read_pairs().empty() && |
1220 | 8.72k | put_batch.apply_external_transactions().empty()) { |
1221 | 8.72k | return Status::OK(); |
1222 | 8.72k | } |
1223 | | |
1224 | | // Could return failure only for cases where it is safe to skip applying operations to DB. |
1225 | | // For instance where aborted transaction intents are written. |
1226 | | // In all other cases we should crash instead of skipping apply. |
1227 | | |
1228 | 4.75M | if (put_batch.has_transaction()) { |
1229 | 1.32M | RETURN_NOT_OK(WriteTransactionalBatch(batch_idx, put_batch, hybrid_time, frontiers)); |
1230 | 3.43M | } else { |
1231 | 3.43M | rocksdb::WriteBatch regular_write_batch; |
1232 | 3.42M | auto* regular_write_batch_ptr = !already_applied_to_regular_db ? ®ular_write_batch : nullptr; |
1233 | | |
1234 | | // See comments for PrepareExternalWriteBatch. |
1235 | 3.43M | rocksdb::WriteBatch intents_write_batch; |
1236 | 3.43M | bool has_non_exteranl_records = PrepareExternalWriteBatch( |
1237 | 3.43M | put_batch, hybrid_time, intents_db_.get(), regular_write_batch_ptr, &intents_write_batch); |
1238 | | |
1239 | 3.43M | if (intents_write_batch.Count() != 0) { |
1240 | 0 | if (!metadata_->is_under_twodc_replication()) { |
1241 | 0 | RETURN_NOT_OK(metadata_->SetIsUnderTwodcReplicationAndFlush(true)); |
1242 | 0 | } |
1243 | 0 | WriteToRocksDB(frontiers, &intents_write_batch, StorageDbType::kIntents); |
1244 | 0 | } |
1245 | | |
1246 | 3.43M | docdb::NonTransactionalWriter writer(put_batch, hybrid_time); |
1247 | 3.43M | if (!already_applied_to_regular_db && has_non_exteranl_records) { |
1248 | 3.42M | regular_write_batch.SetDirectWriter(&writer); |
1249 | 3.42M | } |
1250 | 3.43M | if (regular_write_batch.Count() != 0 || regular_write_batch.HasDirectWriter()) { |
1251 | 3.42M | WriteToRocksDB(frontiers, ®ular_write_batch, StorageDbType::kRegular); |
1252 | 3.42M | } |
1253 | | |
1254 | 3.43M | if (snapshot_coordinator_) { |
1255 | 20.3M | for (const auto& pair : put_batch.write_pairs()) { |
1256 | 20.3M | WARN_NOT_OK(snapshot_coordinator_->ApplyWritePair(pair.key(), pair.value()), |
1257 | 20.3M | "ApplyWritePair failed"); |
1258 | 20.3M | } |
1259 | 435k | } |
1260 | 3.43M | } |
1261 | | |
1262 | 4.69M | return Status::OK(); |
1263 | 4.75M | } |
1264 | | |
1265 | | void Tablet::WriteToRocksDB( |
1266 | | const rocksdb::UserFrontiers* frontiers, |
1267 | | rocksdb::WriteBatch* write_batch, |
1268 | 6.39M | docdb::StorageDbType storage_db_type) { |
1269 | 6.39M | rocksdb::DB* dest_db = nullptr; |
1270 | 6.39M | switch (storage_db_type) { |
1271 | 4.16M | case StorageDbType::kRegular: dest_db = regular_db_.get(); break; |
1272 | 2.22M | case StorageDbType::kIntents: dest_db = intents_db_.get(); break; |
1273 | 6.39M | } |
1274 | | |
1275 | | // Frontiers can be null for deferred apply operations. |
1276 | 6.39M | if (frontiers) { |
1277 | 6.39M | write_batch->SetFrontiers(frontiers); |
1278 | 6.39M | } |
1279 | | |
1280 | | // We are using Raft replication index for the RocksDB sequence number for |
1281 | | // all members of this write batch. |
1282 | 6.39M | rocksdb::WriteOptions write_options; |
1283 | 6.39M | InitRocksDBWriteOptions(&write_options); |
1284 | | |
1285 | 6.39M | auto rocksdb_write_status = dest_db->Write(write_options, write_batch); |
1286 | 6.39M | if (!rocksdb_write_status.ok()) { |
1287 | 0 | LOG_WITH_PREFIX(FATAL) << "Failed to write a batch with " << write_batch->Count() |
1288 | 0 | << " operations into RocksDB: " << rocksdb_write_status; |
1289 | 0 | } |
1290 | | |
1291 | 6.39M | if (FLAGS_TEST_docdb_log_write_batches) { |
1292 | 0 | LOG_WITH_PREFIX(INFO) |
1293 | 0 | << "Wrote " << write_batch->Count() << " key/value pairs to " << storage_db_type |
1294 | 0 | << " RocksDB:\n" << docdb::WriteBatchToString( |
1295 | 0 | *write_batch, |
1296 | 0 | storage_db_type, |
1297 | 0 | BinaryOutputFormat::kEscapedAndHex, |
1298 | 0 | WriteBatchOutputFormat::kArrow, |
1299 | 0 | " " + LogPrefix(storage_db_type)); |
1300 | 0 | } |
1301 | 6.39M | } |
1302 | | |
1303 | | //-------------------------------------------------------------------------------------------------- |
1304 | | // Redis Request Processing. |
1305 | | Status Tablet::HandleRedisReadRequest(CoarseTimePoint deadline, |
1306 | | const ReadHybridTime& read_time, |
1307 | | const RedisReadRequestPB& redis_read_request, |
1308 | 42.9k | RedisResponsePB* response) { |
1309 | | // TODO: move this locking to the top-level read request handler in TabletService. |
1310 | 42.9k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline); |
1311 | 42.9k | RETURN_NOT_OK(scoped_read_operation); |
1312 | | |
1313 | 42.9k | ScopedTabletMetricsTracker metrics_tracker(metrics_->redis_read_latency); |
1314 | | |
1315 | 42.9k | docdb::RedisReadOperation doc_op(redis_read_request, doc_db(), deadline, read_time); |
1316 | 42.9k | RETURN_NOT_OK(doc_op.Execute()); |
1317 | 42.9k | *response = std::move(doc_op.response()); |
1318 | 42.9k | return Status::OK(); |
1319 | 42.9k | } |
1320 | | |
1321 | | bool IsSchemaVersionCompatible( |
1322 | 7.06M | uint32_t current_version, uint32_t request_version, bool compatible_with_previous_version) { |
1323 | 7.06M | if (request_version == current_version) { |
1324 | 7.06M | return true; |
1325 | 7.06M | } |
1326 | | |
1327 | 1.12k | if (compatible_with_previous_version && request_version == current_version + 1) { |
1328 | 0 | DVLOG(1) << (FLAGS_yql_allow_compatible_schema_versions ? "A" : "Not a") |
1329 | 0 | << "ccepting request that is ahead of us by 1 version"; |
1330 | 77 | return FLAGS_yql_allow_compatible_schema_versions; |
1331 | 77 | } |
1332 | | |
1333 | 872 | return false; |
1334 | 872 | } |
1335 | | |
1336 | | //-------------------------------------------------------------------------------------------------- |
1337 | | // CQL Request Processing. |
1338 | | Status Tablet::HandleQLReadRequest( |
1339 | | CoarseTimePoint deadline, |
1340 | | const ReadHybridTime& read_time, |
1341 | | const QLReadRequestPB& ql_read_request, |
1342 | | const TransactionMetadataPB& transaction_metadata, |
1343 | 3.68M | QLReadRequestResult* result) { |
1344 | 3.68M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline); |
1345 | 3.68M | RETURN_NOT_OK(scoped_read_operation); |
1346 | 3.68M | ScopedTabletMetricsTracker metrics_tracker(metrics_->ql_read_latency); |
1347 | | |
1348 | 3.68M | if (!IsSchemaVersionCompatible( |
1349 | 3.68M | metadata()->schema_version(), ql_read_request.schema_version(), |
1350 | 132 | ql_read_request.is_compatible_with_previous_version())) { |
1351 | 0 | DVLOG(1) << "Setting status for read as YQL_STATUS_SCHEMA_VERSION_MISMATCH"; |
1352 | 132 | result->response.set_status(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH); |
1353 | 132 | result->response.set_error_message(Format( |
1354 | 132 | "schema version mismatch for table $0: expected $1, got $2 (compt with prev: $3)", |
1355 | 132 | metadata()->table_id(), |
1356 | 132 | metadata()->schema_version(), |
1357 | 132 | ql_read_request.schema_version(), |
1358 | 132 | ql_read_request.is_compatible_with_previous_version())); |
1359 | 132 | return Status::OK(); |
1360 | 132 | } |
1361 | | |
1362 | 3.68M | Result<TransactionOperationContext> txn_op_ctx = |
1363 | 3.68M | CreateTransactionOperationContext(transaction_metadata, /* is_ysql_catalog_table */ false); |
1364 | 3.68M | RETURN_NOT_OK(txn_op_ctx); |
1365 | 3.68M | return AbstractTablet::HandleQLReadRequest( |
1366 | 3.68M | deadline, read_time, ql_read_request, *txn_op_ctx, result); |
1367 | 3.68M | } |
1368 | | |
1369 | | CHECKED_STATUS Tablet::CreatePagingStateForRead(const QLReadRequestPB& ql_read_request, |
1370 | | const size_t row_count, |
1371 | 3.66M | QLResponsePB* response) const { |
1372 | | |
1373 | | // If the response does not have a next partition key, it means we are done reading the current |
1374 | | // tablet. But, if the request does not have the hash columns set, this must be a table-scan, |
1375 | | // so we need to decide if we are done or if we need to move to the next tablet. |
1376 | | // If we did not reach the: |
1377 | | // 1. max number of results (LIMIT clause -- if set) |
1378 | | // 2. end of the table (this was the last tablet) |
1379 | | // 3. max partition key (upper bound condition using 'token' -- if set) |
1380 | | // we set the paging state to point to the exclusive end partition key of this tablet, which is |
1381 | | // the start key of the next tablet). |
1382 | 3.66M | if (ql_read_request.hashed_column_values().empty() && |
1383 | 43.1k | !response->paging_state().has_next_partition_key()) { |
1384 | | // Check we did not reach the results limit. |
1385 | | // If return_paging_state is set, it means the request limit is actually just the page size. |
1386 | 42.1k | if (!ql_read_request.has_limit() || |
1387 | 41.0k | row_count < ql_read_request.limit() || |
1388 | 42.1k | ql_read_request.return_paging_state()) { |
1389 | | |
1390 | | // Check we did not reach the last tablet. |
1391 | 42.1k | const string& next_partition_key = metadata_->partition()->partition_key_end(); |
1392 | 42.1k | if (!next_partition_key.empty()) { |
1393 | 33.1k | uint16_t next_hash_code = PartitionSchema::DecodeMultiColumnHashValue(next_partition_key); |
1394 | | |
1395 | | // Check we did not reach the max partition key. |
1396 | 33.1k | if (!ql_read_request.has_max_hash_code() || |
1397 | 33.1k | next_hash_code <= ql_read_request.max_hash_code()) { |
1398 | 33.1k | response->mutable_paging_state()->set_next_partition_key(next_partition_key); |
1399 | 33.1k | } |
1400 | 33.1k | } |
1401 | 42.1k | } |
1402 | 42.1k | } |
1403 | | |
1404 | | // If there is a paging state, update the total number of rows read so far. |
1405 | 3.66M | if (response->has_paging_state()) { |
1406 | 34.6k | response->mutable_paging_state()->set_total_num_rows_read( |
1407 | 34.6k | ql_read_request.paging_state().total_num_rows_read() + row_count); |
1408 | 34.6k | } |
1409 | 3.66M | return Status::OK(); |
1410 | 3.66M | } |
1411 | | |
1412 | | //-------------------------------------------------------------------------------------------------- |
1413 | | // PGSQL Request Processing. |
1414 | | //-------------------------------------------------------------------------------------------------- |
1415 | | Status Tablet::HandlePgsqlReadRequest( |
1416 | | CoarseTimePoint deadline, |
1417 | | const ReadHybridTime& read_time, |
1418 | | bool is_explicit_request_read_time, |
1419 | | const PgsqlReadRequestPB& pgsql_read_request, |
1420 | | const TransactionMetadataPB& transaction_metadata, |
1421 | | const SubTransactionMetadataPB& subtransaction_metadata, |
1422 | | PgsqlReadRequestResult* result, |
1423 | 1.49M | size_t* num_rows_read) { |
1424 | 1.49M | TRACE(LogPrefix()); |
1425 | 1.49M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline); |
1426 | 1.49M | RETURN_NOT_OK(scoped_read_operation); |
1427 | | // TODO(neil) Work on metrics for PGSQL. |
1428 | | // ScopedTabletMetricsTracker metrics_tracker(metrics_->pgsql_read_latency); |
1429 | | |
1430 | 1.49M | const shared_ptr<tablet::TableInfo> table_info = |
1431 | 1.49M | VERIFY_RESULT(metadata_->GetTableInfo(pgsql_read_request.table_id())); |
1432 | | // Assert the table is a Postgres table. |
1433 | 1.49M | DCHECK_EQ(table_info->table_type, TableType::PGSQL_TABLE_TYPE); |
1434 | 1.49M | if (table_info->schema_version != pgsql_read_request.schema_version()) { |
1435 | 6 | result->response.set_status(PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH); |
1436 | 6 | result->response.set_error_message( |
1437 | 6 | Format("schema version mismatch for table $0: expected $1, got $2", |
1438 | 6 | table_info->table_id, |
1439 | 6 | table_info->schema_version, |
1440 | 6 | pgsql_read_request.schema_version())); |
1441 | 6 | return Status::OK(); |
1442 | 6 | } |
1443 | | |
1444 | 1.49M | Result<TransactionOperationContext> txn_op_ctx = |
1445 | 1.49M | CreateTransactionOperationContext( |
1446 | 1.49M | transaction_metadata, |
1447 | 1.49M | table_info->schema->table_properties().is_ysql_catalog_table(), |
1448 | 1.49M | &subtransaction_metadata); |
1449 | 1.49M | RETURN_NOT_OK(txn_op_ctx); |
1450 | 1.49M | return AbstractTablet::HandlePgsqlReadRequest( |
1451 | 1.49M | deadline, read_time, is_explicit_request_read_time, |
1452 | 1.49M | pgsql_read_request, *txn_op_ctx, result, num_rows_read); |
1453 | 1.49M | } |
1454 | | |
1455 | | // Returns true if the query can be satisfied by rows present in current tablet. |
1456 | | // Returns false if query requires other tablets to also be scanned. Examples of this include: |
1457 | | // (1) full table scan queries |
1458 | | // (2) queries that whose key conditions are such that the query will require a multi tablet |
1459 | | // scan. |
1460 | | // |
1461 | | // Requests that are of the form batched index lookups of ybctids are sent only to a single tablet. |
1462 | | // However there can arise situations where tablets splitting occurs after such requests are being |
1463 | | // prepared by the pggate layer (specifically pg_doc_op.cc). Under such circumstances, if tablets |
1464 | | // are split into two sub-tablets, then such batched index lookups of ybctid requests should be sent |
1465 | | // to multiple tablets (the two sub-tablets). Hence, the request ends up not being a single tablet |
1466 | | // request. |
1467 | | Result<bool> Tablet::IsQueryOnlyForTablet( |
1468 | 1.49M | const PgsqlReadRequestPB& pgsql_read_request, size_t row_count) const { |
1469 | 1.49M | if ((!pgsql_read_request.ybctid_column_value().value().binary_value().empty() && |
1470 | 6.57k | (implicit_cast<size_t>(pgsql_read_request.batch_arguments_size()) == row_count || |
1471 | 4.80k | pgsql_read_request.batch_arguments_size() == 0)) || |
1472 | 1.49M | !pgsql_read_request.partition_column_values().empty() ) { |
1473 | 1.06M | return true; |
1474 | 1.06M | } |
1475 | | |
1476 | 435k | std::shared_ptr<const Schema> schema = metadata_->schema(); |
1477 | 435k | if (schema->has_pgtable_id() || schema->has_cotable_id()) { |
1478 | | // This is a colocated table. |
1479 | 0 | return true; |
1480 | 0 | } |
1481 | | |
1482 | 435k | if (schema->num_hash_key_columns() == 0 && |
1483 | 379k | schema->num_range_key_columns() == |
1484 | 25.6k | implicit_cast<size_t>(pgsql_read_request.range_column_values_size())) { |
1485 | | // PK is contained within this tablet. |
1486 | 25.6k | return true; |
1487 | 25.6k | } |
1488 | 410k | return false; |
1489 | 410k | } |
1490 | | |
1491 | | Result<bool> Tablet::HasScanReachedMaxPartitionKey( |
1492 | | const PgsqlReadRequestPB& pgsql_read_request, |
1493 | | const string& partition_key, |
1494 | 35.3k | size_t row_count) const { |
1495 | 35.3k | auto schema = metadata_->schema(); |
1496 | | |
1497 | 35.3k | if (schema->num_hash_key_columns() > 0) { |
1498 | 35.2k | uint16_t next_hash_code = PartitionSchema::DecodeMultiColumnHashValue(partition_key); |
1499 | | // For batched index lookup of ybctids, check if the current partition hash is lesser than |
1500 | | // upper bound. If it is, we can then avoid paging. Paging of batched index lookup of ybctids |
1501 | | // occur when tablets split after request is prepared. |
1502 | 35.2k | if (pgsql_read_request.has_ybctid_column_value() && |
1503 | 35 | implicit_cast<size_t>(pgsql_read_request.batch_arguments_size()) > row_count) { |
1504 | 35 | if (!pgsql_read_request.upper_bound().has_key()) { |
1505 | 0 | return false; |
1506 | 0 | } |
1507 | 35 | uint16_t upper_bound_hash = |
1508 | 35 | PartitionSchema::DecodeMultiColumnHashValue(pgsql_read_request.upper_bound().key()); |
1509 | 35 | uint16_t partition_hash = |
1510 | 35 | PartitionSchema::DecodeMultiColumnHashValue(partition_key); |
1511 | 35 | return pgsql_read_request.upper_bound().is_inclusive() ? |
1512 | 0 | partition_hash > upper_bound_hash : |
1513 | 35 | partition_hash >= upper_bound_hash; |
1514 | 35 | } |
1515 | 35.2k | if (pgsql_read_request.has_max_hash_code() && |
1516 | 1.15k | next_hash_code > pgsql_read_request.max_hash_code()) { |
1517 | 1.15k | return true; |
1518 | 1.15k | } |
1519 | 41 | } else if (pgsql_read_request.has_upper_bound()) { |
1520 | 30 | docdb::DocKey partition_doc_key(*schema); |
1521 | 30 | VERIFY_RESULT(partition_doc_key.DecodeFrom( |
1522 | 30 | partition_key, docdb::DocKeyPart::kWholeDocKey, docdb::AllowSpecial::kTrue)); |
1523 | 30 | docdb::DocKey max_partition_doc_key(*schema); |
1524 | 30 | VERIFY_RESULT(max_partition_doc_key.DecodeFrom( |
1525 | 30 | pgsql_read_request.upper_bound().key(), docdb::DocKeyPart::kWholeDocKey, |
1526 | 30 | docdb::AllowSpecial::kTrue)); |
1527 | | |
1528 | 30 | auto cmp = partition_doc_key.CompareTo(max_partition_doc_key); |
1529 | 30 | return pgsql_read_request.upper_bound().is_inclusive() ? cmp > 0 : cmp >= 0; |
1530 | 34.0k | } |
1531 | | |
1532 | 34.0k | return false; |
1533 | 34.0k | } |
1534 | | |
1535 | | namespace { |
1536 | | |
1537 | | void SetBackfillSpecForYsqlBackfill( |
1538 | | const PgsqlReadRequestPB& pgsql_read_request, |
1539 | | const size_t& row_count, |
1540 | 289 | PgsqlResponsePB* response) { |
1541 | 289 | PgsqlBackfillSpecPB in_spec; |
1542 | 289 | in_spec.ParseFromString(a2b_hex(pgsql_read_request.backfill_spec())); |
1543 | | |
1544 | 289 | auto limit = in_spec.limit(); |
1545 | 289 | PgsqlBackfillSpecPB out_spec; |
1546 | 289 | out_spec.set_limit(limit); |
1547 | 289 | out_spec.set_count(in_spec.count() + row_count); |
1548 | 289 | response->set_is_backfill_batch_done(!response->has_paging_state()); |
1549 | 0 | VLOG(2) << " limit is " << limit << " set_count to " << out_spec.count(); |
1550 | 289 | if (limit >= 0 && out_spec.count() >= limit) { |
1551 | | // Hint postgres to stop scanning now. And set up the |
1552 | | // next_row_key based on the paging state. |
1553 | 42 | if (response->has_paging_state()) { |
1554 | 22 | out_spec.set_next_row_key(response->paging_state().next_row_key()); |
1555 | 22 | } |
1556 | 42 | response->set_is_backfill_batch_done(true); |
1557 | 42 | } |
1558 | | |
1559 | 0 | VLOG(2) << "Got input spec " << yb::ToString(in_spec) |
1560 | 0 | << " set output spec " << yb::ToString(out_spec) |
1561 | 0 | << " batch_done=" << response->is_backfill_batch_done(); |
1562 | 289 | string serialized_pb; |
1563 | 289 | out_spec.SerializeToString(&serialized_pb); |
1564 | 289 | response->set_backfill_spec(b2a_hex(serialized_pb)); |
1565 | 289 | } |
1566 | | |
1567 | | } // namespace |
1568 | | |
1569 | | CHECKED_STATUS Tablet::CreatePagingStateForRead(const PgsqlReadRequestPB& pgsql_read_request, |
1570 | | const size_t row_count, |
1571 | 1.49M | PgsqlResponsePB* response) const { |
1572 | | // If there is no hash column in the read request, this is a full-table query. And if there is no |
1573 | | // paging state in the response, we are done reading from the current tablet. In this case, we |
1574 | | // should return the exclusive end partition key of this tablet if not empty which is the start |
1575 | | // key of the next tablet. Do so only if the request has no row count limit, or there is and we |
1576 | | // haven't hit it, or we are asked to return paging state even when we have hit the limit. |
1577 | | // Otherwise, leave the paging state empty which means we are completely done reading for the |
1578 | | // whole SELECT statement. |
1579 | 1.49M | const bool single_tablet_query = |
1580 | 1.49M | VERIFY_RESULT(IsQueryOnlyForTablet(pgsql_read_request, row_count)); |
1581 | 1.49M | if (!single_tablet_query && |
1582 | 409k | !response->has_paging_state() && |
1583 | 386k | (!pgsql_read_request.has_limit() || row_count < pgsql_read_request.limit() || |
1584 | 386k | pgsql_read_request.return_paging_state())) { |
1585 | | // For backward scans partition_key_start must be used as next_partition_key. |
1586 | | // Client level logic will check it and route next request to the preceding tablet. |
1587 | 386k | const auto& next_partition_key = |
1588 | 386k | pgsql_read_request.has_hash_code() || |
1589 | 353k | pgsql_read_request.is_forward_scan() |
1590 | 386k | ? metadata_->partition()->partition_key_end() |
1591 | 18.4E | : metadata_->partition()->partition_key_start(); |
1592 | | // Check we did not reach the last tablet. |
1593 | 386k | const bool end_scan = next_partition_key.empty() || |
1594 | 386k | VERIFY_RESULT(HasScanReachedMaxPartitionKey( |
1595 | 386k | pgsql_read_request, next_partition_key, row_count)); |
1596 | 386k | if (!end_scan) { |
1597 | 34.0k | response->mutable_paging_state()->set_next_partition_key(next_partition_key); |
1598 | 34.0k | } |
1599 | 386k | } |
1600 | | |
1601 | | // If there is a paging state, update the total number of rows read so far. |
1602 | 1.49M | if (response->has_paging_state()) { |
1603 | 57.8k | response->mutable_paging_state()->set_total_num_rows_read( |
1604 | 57.8k | pgsql_read_request.paging_state().total_num_rows_read() + row_count); |
1605 | 57.8k | } |
1606 | | |
1607 | 1.49M | if (pgsql_read_request.is_for_backfill()) { |
1608 | | // BackfillSpec is used to implement "paging" across multiple BackfillIndex |
1609 | | // rpcs from the master. |
1610 | 289 | SetBackfillSpecForYsqlBackfill(pgsql_read_request, row_count, response); |
1611 | 289 | } |
1612 | 1.49M | return Status::OK(); |
1613 | 1.49M | } |
1614 | | |
1615 | | //-------------------------------------------------------------------------------------------------- |
1616 | | |
1617 | 1.74M | void Tablet::AcquireLocksAndPerformDocOperations(std::unique_ptr<WriteQuery> query) { |
1618 | 1.74M | TRACE(__func__); |
1619 | 1.74M | if (table_type_ == TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
1620 | 0 | query->Cancel( |
1621 | 0 | STATUS(NotSupported, "Transaction status table does not support write")); |
1622 | 0 | return; |
1623 | 0 | } |
1624 | | |
1625 | 1.74M | if (!GetAtomicFlag(&FLAGS_disable_alter_vs_write_mutual_exclusion)) { |
1626 | 1.74M | auto write_permit = GetPermitToWrite(query->deadline()); |
1627 | 1.74M | if (!write_permit.ok()) { |
1628 | 7.17k | TRACE("Could not get the write permit."); |
1629 | 7.17k | WriteQuery::StartSynchronization(std::move(query), MoveStatus(write_permit)); |
1630 | 7.17k | return; |
1631 | 7.17k | } |
1632 | | // Save the write permit to be released after the operation is submitted |
1633 | | // to Raft queue. |
1634 | 1.73M | query->UseSubmitToken(std::move(write_permit)); |
1635 | 1.73M | } |
1636 | | |
1637 | 1.74M | WriteQuery::Execute(std::move(query)); |
1638 | 1.74M | } |
1639 | | |
1640 | 161k | Status Tablet::Flush(FlushMode mode, FlushFlags flags, int64_t ignore_if_flushed_after_tick) { |
1641 | 161k | TRACE_EVENT0("tablet", "Tablet::Flush"); |
1642 | | |
1643 | 161k | auto pending_op = CreateNonAbortableScopedRWOperation(); |
1644 | | |
1645 | 161k | rocksdb::FlushOptions options; |
1646 | 161k | options.ignore_if_flushed_after_tick = ignore_if_flushed_after_tick; |
1647 | 161k | bool flush_intents = intents_db_ && HasFlags(flags, FlushFlags::kIntents); |
1648 | 161k | if (flush_intents) { |
1649 | 82.1k | options.wait = false; |
1650 | 82.1k | WARN_NOT_OK(intents_db_->Flush(options), "Flush intents DB"); |
1651 | 82.1k | } |
1652 | | |
1653 | 161k | if (HasFlags(flags, FlushFlags::kRegular) && regular_db_) { |
1654 | 160k | options.wait = mode == FlushMode::kSync; |
1655 | 160k | WARN_NOT_OK(regular_db_->Flush(options), "Flush regular DB"); |
1656 | 160k | } |
1657 | | |
1658 | 161k | if (flush_intents && mode == FlushMode::kSync) { |
1659 | 47 | RETURN_NOT_OK(intents_db_->WaitForFlush()); |
1660 | 47 | } |
1661 | | |
1662 | 161k | return Status::OK(); |
1663 | 161k | } |
1664 | | |
1665 | 26 | Status Tablet::WaitForFlush() { |
1666 | 26 | TRACE_EVENT0("tablet", "Tablet::WaitForFlush"); |
1667 | | |
1668 | 26 | if (regular_db_) { |
1669 | 26 | RETURN_NOT_OK(regular_db_->WaitForFlush()); |
1670 | 26 | } |
1671 | 26 | if (intents_db_) { |
1672 | 26 | RETURN_NOT_OK(intents_db_->WaitForFlush()); |
1673 | 26 | } |
1674 | | |
1675 | 26 | return Status::OK(); |
1676 | 26 | } |
1677 | | |
1678 | 0 | Status Tablet::ImportData(const std::string& source_dir) { |
1679 | | // We import only regular records, so don't have to deal with intents here. |
1680 | 0 | return regular_db_->Import(source_dir); |
1681 | 0 | } |
1682 | | |
1683 | | // We apply intents by iterating over whole transaction reverse index. |
1684 | | // Using value of reverse index record we find original intent record and apply it. |
1685 | | // After that we delete both intent record and reverse index record. |
1686 | 737k | Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApplyData& data) { |
1687 | 701 | VLOG_WITH_PREFIX(4) << __func__ << ": " << data.transaction_id; |
1688 | | |
1689 | | // This flag enables tests to induce a situation where a transaction has committed but its intents |
1690 | | // haven't yet moved to regular db for a sufficiently long period. For example, it can help a test |
1691 | | // to reliably assert that conflict resolution/ concurrency control with a conflicting committed |
1692 | | // transaction is done properly in the rare situation where the committed transaction's intents |
1693 | | // are still in intents db and not yet in regular db. |
1694 | 737k | AtomicFlagSleepMs(&FLAGS_TEST_inject_sleep_before_applying_intents_ms); |
1695 | 737k | docdb::ApplyIntentsContext context( |
1696 | 737k | data.transaction_id, data.apply_state, data.aborted, data.commit_ht, data.log_ht, |
1697 | 737k | &key_bounds_, intents_db_.get()); |
1698 | 737k | docdb::IntentsWriter intents_writer( |
1699 | 737k | data.apply_state ? data.apply_state->key : Slice(), intents_db_.get(), &context); |
1700 | 737k | rocksdb::WriteBatch regular_write_batch; |
1701 | 737k | regular_write_batch.SetDirectWriter(&intents_writer); |
1702 | | // data.hybrid_time contains transaction commit time. |
1703 | | // We don't set transaction field of put_batch, otherwise we would write another bunch of intents. |
1704 | 737k | docdb::ConsensusFrontiers frontiers; |
1705 | 737k | auto frontiers_ptr = data.op_id.empty() ? nullptr : InitFrontiers(data, &frontiers); |
1706 | 737k | WriteToRocksDB(frontiers_ptr, ®ular_write_batch, StorageDbType::kRegular); |
1707 | 737k | return context.apply_state(); |
1708 | 737k | } |
1709 | | |
1710 | | template <class Ids> |
1711 | 968k | CHECKED_STATUS Tablet::RemoveIntentsImpl(const RemoveIntentsData& data, const Ids& ids) { |
1712 | 968k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
1713 | 968k | RETURN_NOT_OK(scoped_read_operation); |
1714 | | |
1715 | 968k | rocksdb::WriteBatch intents_write_batch; |
1716 | 968k | for (const auto& id : ids) { |
1717 | 968k | boost::optional<docdb::ApplyTransactionState> apply_state; |
1718 | 968k | for (;;) { |
1719 | 968k | docdb::RemoveIntentsContext context(id); |
1720 | 968k | docdb::IntentsWriter writer( |
1721 | 968k | apply_state ? apply_state->key : Slice(), intents_db_.get(), &context); |
1722 | 968k | intents_write_batch.SetDirectWriter(&writer); |
1723 | 968k | docdb::ConsensusFrontiers frontiers; |
1724 | 968k | auto frontiers_ptr = InitFrontiers(data, &frontiers); |
1725 | 968k | WriteToRocksDB(frontiers_ptr, &intents_write_batch, StorageDbType::kIntents); |
1726 | | |
1727 | 968k | if (!context.apply_state().active()) { |
1728 | 968k | break; |
1729 | 968k | } |
1730 | | |
1731 | 241 | apply_state = std::move(context.apply_state()); |
1732 | 241 | intents_write_batch.Clear(); |
1733 | | |
1734 | 241 | AtomicFlagSleepMs(&FLAGS_apply_intents_task_injected_delay_ms); |
1735 | 241 | } |
1736 | 968k | } |
1737 | | |
1738 | 968k | return Status::OK(); |
1739 | 968k | } _ZN2yb6tablet6Tablet17RemoveIntentsImplISt16initializer_listINS_17StronglyTypedUuidINS_17TransactionId_TagEEEEEENS_6StatusERKNS0_17RemoveIntentsDataERKT_ Line | Count | Source | 1711 | 968k | CHECKED_STATUS Tablet::RemoveIntentsImpl(const RemoveIntentsData& data, const Ids& ids) { | 1712 | 968k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); | 1713 | 968k | RETURN_NOT_OK(scoped_read_operation); | 1714 | | | 1715 | 968k | rocksdb::WriteBatch intents_write_batch; | 1716 | 968k | for (const auto& id : ids) { | 1717 | 968k | boost::optional<docdb::ApplyTransactionState> apply_state; | 1718 | 968k | for (;;) { | 1719 | 968k | docdb::RemoveIntentsContext context(id); | 1720 | 968k | docdb::IntentsWriter writer( | 1721 | 968k | apply_state ? apply_state->key : Slice(), intents_db_.get(), &context); | 1722 | 968k | intents_write_batch.SetDirectWriter(&writer); | 1723 | 968k | docdb::ConsensusFrontiers frontiers; | 1724 | 968k | auto frontiers_ptr = InitFrontiers(data, &frontiers); | 1725 | 968k | WriteToRocksDB(frontiers_ptr, &intents_write_batch, StorageDbType::kIntents); | 1726 | | | 1727 | 968k | if (!context.apply_state().active()) { | 1728 | 968k | break; | 1729 | 968k | } | 1730 | | | 1731 | 241 | apply_state = std::move(context.apply_state()); | 1732 | 241 | intents_write_batch.Clear(); | 1733 | | | 1734 | 241 | AtomicFlagSleepMs(&FLAGS_apply_intents_task_injected_delay_ms); | 1735 | 241 | } | 1736 | 968k | } | 1737 | | | 1738 | 968k | return Status::OK(); | 1739 | 968k | } |
Unexecuted instantiation: _ZN2yb6tablet6Tablet17RemoveIntentsImplINSt3__113unordered_setINS_17StronglyTypedUuidINS_17TransactionId_TagEEEN5boost4hashIS7_EENS3_8equal_toIS7_EENS3_9allocatorIS7_EEEEEENS_6StatusERKNS0_17RemoveIntentsDataERKT_ |
1740 | | |
1741 | | |
1742 | 968k | Status Tablet::RemoveIntents(const RemoveIntentsData& data, const TransactionId& id) { |
1743 | 968k | return RemoveIntentsImpl(data, std::initializer_list<TransactionId>{id}); |
1744 | 968k | } |
1745 | | |
1746 | 0 | Status Tablet::RemoveIntents(const RemoveIntentsData& data, const TransactionIdSet& transactions) { |
1747 | 0 | return RemoveIntentsImpl(data, transactions); |
1748 | 0 | } |
1749 | | |
1750 | | // We batch this as some tx could be very large and may not fit in one batch |
1751 | | CHECKED_STATUS Tablet::GetIntents( |
1752 | | const TransactionId& id, |
1753 | | std::vector<docdb::IntentKeyValueForCDC>* key_value_intents, |
1754 | 74 | docdb::ApplyTransactionState* stream_state) { |
1755 | 74 | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
1756 | 74 | RETURN_NOT_OK(scoped_read_operation); |
1757 | | |
1758 | 74 | docdb::ApplyTransactionState new_stream_state; |
1759 | | |
1760 | 74 | new_stream_state = VERIFY_RESULT( |
1761 | 74 | docdb::GetIntentsBatch(id, &key_bounds_, stream_state, intents_db_.get(), key_value_intents)); |
1762 | 74 | stream_state->key = new_stream_state.key; |
1763 | 74 | stream_state->write_id = new_stream_state.write_id; |
1764 | | |
1765 | 74 | return Status::OK(); |
1766 | 74 | } |
1767 | | |
1768 | 0 | Result<HybridTime> Tablet::ApplierSafeTime(HybridTime min_allowed, CoarseTimePoint deadline) { |
1769 | | // We could not use mvcc_ directly, because correct lease should be passed to it. |
1770 | 0 | return SafeTime(RequireLease::kFalse, min_allowed, deadline); |
1771 | 0 | } |
1772 | | |
1773 | | Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::CreateCDCSnapshotIterator( |
1774 | 6 | const Schema& projection, const ReadHybridTime& time, const string& next_key) { |
1775 | 0 | VLOG_WITH_PREFIX(2) << "The nextKey is " << next_key; |
1776 | | |
1777 | 6 | Slice next_slice; |
1778 | 6 | if (!next_key.empty()) { |
1779 | 0 | SubDocKey start_sub_doc_key; |
1780 | 0 | docdb::KeyBytes start_key_bytes(next_key); |
1781 | 0 | RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice())); |
1782 | 0 | next_slice = start_sub_doc_key.doc_key().Encode().AsSlice(); |
1783 | 0 | VLOG_WITH_PREFIX(2) << "The nextKey doc is " << next_key; |
1784 | 0 | } |
1785 | 6 | return NewRowIterator( |
1786 | 6 | projection, time, "", CoarseTimePoint::max(), AllowBootstrappingState::kFalse, next_slice); |
1787 | 6 | } |
1788 | | |
1789 | | Status Tablet::CreatePreparedChangeMetadata( |
1790 | 547k | ChangeMetadataOperation *operation, const Schema* schema) { |
1791 | 547k | if (schema) { |
1792 | 50.4k | auto key_schema = GetKeySchema(operation->has_table_id() ? operation->table_id() : ""); |
1793 | 50.7k | if (!key_schema.KeyEquals(*schema)) { |
1794 | 0 | return STATUS_FORMAT( |
1795 | 0 | InvalidArgument, |
1796 | 0 | "Schema keys cannot be altered. New schema key: $0. Existing schema key: $1", |
1797 | 0 | schema->CreateKeyProjection(), |
1798 | 0 | key_schema); |
1799 | 0 | } |
1800 | | |
1801 | 50.7k | if (!schema->has_column_ids()) { |
1802 | | // this probably means that the request is not from the Master |
1803 | 0 | return STATUS(InvalidArgument, "Missing Column IDs"); |
1804 | 0 | } |
1805 | 547k | } |
1806 | | |
1807 | 547k | operation->set_schema(schema); |
1808 | 547k | return Status::OK(); |
1809 | 547k | } |
1810 | | |
1811 | 489k | Status Tablet::AddTable(const TableInfoPB& table_info) { |
1812 | 489k | Schema schema; |
1813 | 489k | RETURN_NOT_OK(SchemaFromPB(table_info.schema(), &schema)); |
1814 | | |
1815 | 489k | PartitionSchema partition_schema; |
1816 | 489k | RETURN_NOT_OK(PartitionSchema::FromPB(table_info.partition_schema(), schema, &partition_schema)); |
1817 | | |
1818 | 489k | metadata_->AddTable( |
1819 | 489k | table_info.table_id(), table_info.namespace_name(), table_info.table_name(), |
1820 | 489k | table_info.table_type(), schema, IndexMap(), partition_schema, boost::none, |
1821 | 489k | table_info.schema_version()); |
1822 | | |
1823 | 489k | RETURN_NOT_OK(metadata_->Flush()); |
1824 | | |
1825 | 489k | return Status::OK(); |
1826 | 489k | } |
1827 | | |
1828 | 45 | Status Tablet::RemoveTable(const std::string& table_id) { |
1829 | 45 | metadata_->RemoveTable(table_id); |
1830 | 45 | RETURN_NOT_OK(metadata_->Flush()); |
1831 | 45 | return Status::OK(); |
1832 | 45 | } |
1833 | | |
1834 | 7.30k | Status Tablet::MarkBackfillDone(const TableId& table_id) { |
1835 | 7.30k | auto table_info = table_id.empty() ? |
1836 | 7.30k | metadata_->primary_table_info() : VERIFY_RESULT(metadata_->GetTableInfo(table_id)); |
1837 | 7.30k | LOG_WITH_PREFIX(INFO) << "Setting backfill as done. Current schema " |
1838 | 7.30k | << table_info->schema->ToString(); |
1839 | 7.30k | const vector<DeletedColumn> empty_deleted_cols; |
1840 | 7.30k | Schema new_schema = *table_info->schema; |
1841 | 7.30k | new_schema.SetRetainDeleteMarkers(false); |
1842 | 7.30k | metadata_->SetSchema( |
1843 | 7.30k | new_schema, *table_info->index_map, empty_deleted_cols, table_info->schema_version, table_id); |
1844 | 7.30k | return metadata_->Flush(); |
1845 | 7.30k | } |
1846 | | |
1847 | 48.2k | Status Tablet::AlterSchema(ChangeMetadataOperation *operation) { |
1848 | 48.2k | auto current_table_info = VERIFY_RESULT(metadata_->GetTableInfo( |
1849 | 48.2k | operation->request()->has_alter_table_id() ? |
1850 | 48.2k | operation->request()->alter_table_id() : "")); |
1851 | 48.2k | auto key_schema = current_table_info->schema->CreateKeyProjection(); |
1852 | | |
1853 | 48.2k | RSTATUS_DCHECK_NE(operation->schema(), static_cast<void*>(nullptr), InvalidArgument, |
1854 | 48.2k | "Schema could not be null"); |
1855 | 48.2k | RSTATUS_DCHECK(key_schema.KeyEquals(*DCHECK_NOTNULL(operation->schema())), InvalidArgument, |
1856 | 48.2k | "Schema keys cannot be altered"); |
1857 | | |
1858 | | // Abortable read/write operations could be long and they shouldn't access metadata_ without |
1859 | | // locks, so no need to wait for them here. |
1860 | 48.2k | auto op_pause = PauseReadWriteOperations(Abortable::kFalse); |
1861 | 48.2k | RETURN_NOT_OK(op_pause); |
1862 | | |
1863 | | // If the current version >= new version, there is nothing to do. |
1864 | 48.2k | if (current_table_info->schema_version >= operation->schema_version()) { |
1865 | 1.64k | LOG_WITH_PREFIX(INFO) |
1866 | 1.64k | << "Already running schema version " << current_table_info->schema_version |
1867 | 1.64k | << " got alter request for version " << operation->schema_version(); |
1868 | 1.64k | return Status::OK(); |
1869 | 1.64k | } |
1870 | | |
1871 | 46.6k | LOG_WITH_PREFIX(INFO) << "Alter schema from " << current_table_info->schema->ToString() |
1872 | 46.6k | << " version " << current_table_info->schema_version |
1873 | 46.6k | << " to " << operation->schema()->ToString() |
1874 | 46.6k | << " version " << operation->schema_version(); |
1875 | | |
1876 | | // Find out which columns have been deleted in this schema change, and add them to metadata. |
1877 | 46.6k | vector<DeletedColumn> deleted_cols; |
1878 | 248k | for (const auto& col : current_table_info->schema->column_ids()) { |
1879 | 248k | if (operation->schema()->find_column_by_id(col) == Schema::kColumnNotFound) { |
1880 | 963 | deleted_cols.emplace_back(col, clock_->Now()); |
1881 | 963 | LOG_WITH_PREFIX(INFO) << "Column " << col << " recorded as deleted."; |
1882 | 963 | } |
1883 | 248k | } |
1884 | | |
1885 | 46.6k | metadata_->SetSchema(*operation->schema(), operation->index_map(), deleted_cols, |
1886 | 46.6k | operation->schema_version(), current_table_info->table_id); |
1887 | 46.6k | if (operation->has_new_table_name()) { |
1888 | 46.6k | metadata_->SetTableName(current_table_info->namespace_name, operation->new_table_name()); |
1889 | 46.6k | if (table_metrics_entity_) { |
1890 | 46.6k | table_metrics_entity_->SetAttribute("table_name", operation->new_table_name()); |
1891 | 46.6k | table_metrics_entity_->SetAttribute("namespace_name", current_table_info->namespace_name); |
1892 | 46.6k | } |
1893 | 46.7k | if (tablet_metrics_entity_) { |
1894 | 46.7k | tablet_metrics_entity_->SetAttribute("table_name", operation->new_table_name()); |
1895 | 46.7k | tablet_metrics_entity_->SetAttribute("namespace_name", current_table_info->namespace_name); |
1896 | 46.7k | } |
1897 | 46.6k | } |
1898 | | |
1899 | | // Clear old index table metadata cache. |
1900 | 46.6k | ResetYBMetaDataCache(); |
1901 | | |
1902 | | // Create transaction manager and index table metadata cache for secondary index update. |
1903 | 46.6k | if (!operation->index_map().empty()) { |
1904 | 39.7k | if (current_table_info->schema->table_properties().is_transactional() && |
1905 | 30.0k | !transaction_manager_) { |
1906 | 4.05k | transaction_manager_ = std::make_unique<client::TransactionManager>( |
1907 | 4.05k | client_future_.get(), scoped_refptr<server::Clock>(clock_), local_tablet_filter_); |
1908 | 4.05k | } |
1909 | 39.7k | CreateNewYBMetaDataCache(); |
1910 | 39.7k | } |
1911 | | |
1912 | | // Flush the updated schema metadata to disk. |
1913 | 46.6k | return metadata_->Flush(); |
1914 | 46.6k | } |
1915 | | |
1916 | 2.55k | Status Tablet::AlterWalRetentionSecs(ChangeMetadataOperation* operation) { |
1917 | 2.55k | if (operation->has_wal_retention_secs()) { |
1918 | 2.55k | LOG_WITH_PREFIX(INFO) << "Altering metadata wal_retention_secs from " |
1919 | 2.55k | << metadata_->wal_retention_secs() |
1920 | 2.55k | << " to " << operation->wal_retention_secs(); |
1921 | 2.55k | metadata_->set_wal_retention_secs(operation->wal_retention_secs()); |
1922 | | // Flush the updated schema metadata to disk. |
1923 | 2.55k | return metadata_->Flush(); |
1924 | 2.55k | } |
1925 | 1 | return STATUS_SUBSTITUTE(InvalidArgument, "Invalid ChangeMetadataOperation: $0", |
1926 | 1 | operation->ToString()); |
1927 | 1 | } |
1928 | | |
1929 | | namespace { |
1930 | | |
1931 | | Result<pgwrapper::PGConnPtr> ConnectToPostgres( |
1932 | | const HostPort& pgsql_proxy_bind_address, |
1933 | | const std::string& database_name, |
1934 | 235 | const uint64_t postgres_auth_key) { |
1935 | | // Construct connection string. Note that the plain password in the connection string will be |
1936 | | // sent over the wire, but since it only goes over a unix-domain socket, there should be no |
1937 | | // eavesdropping/tampering issues. |
1938 | 235 | std::string conn_str = Format( |
1939 | 235 | "user=$0 password=$1 host=$2 port=$3 dbname=$4", |
1940 | 235 | "postgres", |
1941 | 235 | postgres_auth_key, |
1942 | 235 | PgDeriveSocketDir(pgsql_proxy_bind_address.host()), |
1943 | 235 | pgsql_proxy_bind_address.port(), |
1944 | 235 | pgwrapper::PqEscapeLiteral(database_name)); |
1945 | 235 | std::string conn_str_for_log = Format( |
1946 | 235 | "user=$0 password=$1 host=$2 port=$3 dbname=$4", |
1947 | 235 | "postgres", |
1948 | 235 | "<REDACTED>", |
1949 | 235 | PgDeriveSocketDir(pgsql_proxy_bind_address.host()), |
1950 | 235 | pgsql_proxy_bind_address.port(), |
1951 | 235 | pgwrapper::PqEscapeLiteral(database_name)); |
1952 | 0 | VLOG(1) << __func__ << ": libpq connection string: " << conn_str_for_log; |
1953 | | |
1954 | | // Connect. |
1955 | 235 | pgwrapper::PGConnPtr conn(PQconnectdb(conn_str.c_str())); |
1956 | 235 | if (!conn) { |
1957 | 0 | return STATUS(IllegalState, "backfill failed to connect to DB"); |
1958 | 0 | } |
1959 | 235 | if (PQstatus(conn.get()) == CONNECTION_BAD) { |
1960 | 0 | std::string msg(PQerrorMessage(conn.get())); |
1961 | | |
1962 | | // Avoid double newline (postgres adds a newline after the error message). |
1963 | 0 | if (msg.back() == '\n') { |
1964 | 0 | msg.resize(msg.size() - 1); |
1965 | 0 | } |
1966 | 0 | LOG(WARNING) << "libpq connection \"" << conn_str_for_log << "\" failed: " << msg; |
1967 | 0 | return STATUS_FORMAT(IllegalState, "backfill connection to DB failed: $0", msg); |
1968 | 0 | } |
1969 | 235 | return conn; |
1970 | 235 | } |
1971 | | |
1972 | 257 | string GenerateSerializedBackfillSpec(size_t batch_size, const string& next_row_to_backfill) { |
1973 | 257 | PgsqlBackfillSpecPB backfill_spec; |
1974 | 257 | std::string serialized_backfill_spec; |
1975 | | // Note that although we set the desired batch_size as the limit, postgres |
1976 | | // has its own internal paging size of 1024 (controlled by --ysql_prefetch_limit). So the actual |
1977 | | // rows processed could be larger than the limit set here; unless it happens |
1978 | | // to be a multiple of FLAGS_ysql_prefetch_limit |
1979 | 257 | backfill_spec.set_limit(batch_size); |
1980 | 257 | backfill_spec.set_next_row_key(next_row_to_backfill); |
1981 | 257 | backfill_spec.SerializeToString(&serialized_backfill_spec); |
1982 | 0 | VLOG(2) << "Generating backfill_spec " << yb::ToString(backfill_spec) << " encoded as " |
1983 | 0 | << b2a_hex(serialized_backfill_spec) << " a string of length " |
1984 | 0 | << serialized_backfill_spec.length(); |
1985 | 257 | return serialized_backfill_spec; |
1986 | 257 | } |
1987 | | |
1988 | | Result<PgsqlBackfillSpecPB> QueryPostgresToDoBackfill( |
1989 | 257 | const pgwrapper::PGConnPtr& conn, const string& query_str) { |
1990 | | // Execute. |
1991 | 257 | pgwrapper::PGResultPtr res(PQexec(conn.get(), query_str.c_str())); |
1992 | 257 | if (!res) { |
1993 | 0 | std::string msg(PQerrorMessage(conn.get())); |
1994 | | |
1995 | | // Avoid double newline (postgres adds a newline after the error message). |
1996 | 0 | if (msg.back() == '\n') { |
1997 | 0 | msg.resize(msg.size() - 1); |
1998 | 0 | } |
1999 | 0 | LOG(WARNING) << "libpq query \"" << query_str << "\" was not sent: " << msg; |
2000 | 0 | return STATUS_FORMAT(IllegalState, "backfill query couldn't be sent: $0", msg); |
2001 | 0 | } |
2002 | | |
2003 | 257 | ExecStatusType status = PQresultStatus(res.get()); |
2004 | | // TODO(jason): more properly handle bad statuses |
2005 | 257 | if (status != PGRES_TUPLES_OK) { |
2006 | 1 | std::string msg(PQresultErrorMessage(res.get())); |
2007 | | |
2008 | | // Avoid double newline (postgres adds a newline after the error message). |
2009 | 1 | if (msg.back() == '\n') { |
2010 | 1 | msg.resize(msg.size() - 1); |
2011 | 1 | } |
2012 | 1 | LOG(WARNING) << "libpq query \"" << query_str << "\" returned " << PQresStatus(status) << ": " |
2013 | 1 | << msg; |
2014 | 1 | return STATUS(IllegalState, msg); |
2015 | 1 | } |
2016 | | |
2017 | 256 | CHECK_EQ(PQntuples(res.get()), 1); |
2018 | 256 | CHECK_EQ(PQnfields(res.get()), 1); |
2019 | 256 | const std::string returned_spec = CHECK_RESULT(pgwrapper::GetString(res.get(), 0, 0)); |
2020 | 0 | VLOG(3) << "Got back " << returned_spec << " of length " << returned_spec.length(); |
2021 | | |
2022 | 256 | PgsqlBackfillSpecPB spec; |
2023 | 256 | spec.ParseFromString(a2b_hex(returned_spec)); |
2024 | 256 | return spec; |
2025 | 256 | } |
2026 | | |
2027 | | struct BackfillParams { |
2028 | | explicit BackfillParams(const CoarseTimePoint deadline) |
2029 | | : start_time(CoarseMonoClock::Now()), |
2030 | | deadline(deadline), |
2031 | | rate_per_sec(GetAtomicFlag(&FLAGS_backfill_index_rate_rows_per_sec)), |
2032 | 2.61k | batch_size(GetAtomicFlag(&FLAGS_backfill_index_write_batch_size)) { |
2033 | 2.61k | auto grace_margin_ms = GetAtomicFlag(&FLAGS_backfill_index_timeout_grace_margin_ms); |
2034 | 2.61k | if (grace_margin_ms < 0) { |
2035 | | // We need: grace_margin_ms >= 1000 * batch_size / rate_per_sec; |
2036 | | // By default, we will set it to twice the minimum value + 1s. |
2037 | 2.61k | grace_margin_ms = (rate_per_sec > 0 ? 1000 * (1 + 2.0 * batch_size / rate_per_sec) : 1000); |
2038 | 2.61k | YB_LOG_EVERY_N_SECS(INFO, 10) |
2039 | 713 | << "Using grace margin of " << grace_margin_ms << "ms, original deadline: " |
2040 | 713 | << MonoDelta(deadline - start_time); |
2041 | 2.61k | } |
2042 | 2.61k | modified_deadline = deadline - grace_margin_ms * 1ms; |
2043 | 2.61k | } |
2044 | | |
2045 | | CoarseTimePoint start_time; |
2046 | | CoarseTimePoint deadline; |
2047 | | size_t rate_per_sec; |
2048 | | size_t batch_size; |
2049 | | CoarseTimePoint modified_deadline; |
2050 | | }; |
2051 | | |
2052 | | // Slow down before the next batch to throttle the rate of processing. |
2053 | | void MaybeSleepToThrottleBackfill( |
2054 | | const CoarseTimePoint& start_time, |
2055 | 3.06k | size_t number_of_rows_processed) { |
2056 | 3.06k | if (FLAGS_backfill_index_rate_rows_per_sec <= 0) { |
2057 | 3.00k | return; |
2058 | 3.00k | } |
2059 | | |
2060 | 61 | auto now = CoarseMonoClock::Now(); |
2061 | 61 | auto duration_for_rows_processed = MonoDelta(now - start_time); |
2062 | 61 | auto expected_time_for_processing_rows = MonoDelta::FromMilliseconds( |
2063 | 61 | number_of_rows_processed * 1000 / FLAGS_backfill_index_rate_rows_per_sec); |
2064 | 7 | DVLOG(3) << "Duration since last batch " << duration_for_rows_processed << " expected duration " |
2065 | 7 | << expected_time_for_processing_rows << " extra time to sleep: " |
2066 | 7 | << expected_time_for_processing_rows - duration_for_rows_processed; |
2067 | 61 | if (duration_for_rows_processed < expected_time_for_processing_rows) { |
2068 | 51 | SleepFor(expected_time_for_processing_rows - duration_for_rows_processed); |
2069 | 51 | } |
2070 | 61 | } |
2071 | | |
2072 | | bool CanProceedToBackfillMoreRows( |
2073 | | const BackfillParams& backfill_params, |
2074 | 2.89k | size_t number_of_rows_processed) { |
2075 | 2.89k | auto now = CoarseMonoClock::Now(); |
2076 | 2.89k | if (now > backfill_params.modified_deadline || |
2077 | 2.89k | (FLAGS_TEST_backfill_paging_size > 0 && |
2078 | 296 | number_of_rows_processed >= FLAGS_TEST_backfill_paging_size)) { |
2079 | | // We are done if we are out of time. |
2080 | | // Or, if for testing purposes we have a bound on the size of batches to process. |
2081 | 96 | return false; |
2082 | 96 | } |
2083 | 2.79k | return true; |
2084 | 2.79k | } |
2085 | | |
2086 | | bool CanProceedToBackfillMoreRows( |
2087 | | const BackfillParams& backfill_params, |
2088 | | const string& backfilled_until, |
2089 | 256 | size_t number_of_rows_processed) { |
2090 | 256 | if (backfilled_until.empty()) { |
2091 | | // The backfill is done for this tablet. No need to do another batch. |
2092 | 234 | return false; |
2093 | 234 | } |
2094 | | |
2095 | 22 | return CanProceedToBackfillMoreRows(backfill_params, number_of_rows_processed); |
2096 | 22 | } |
2097 | | |
2098 | | } // namespace |
2099 | | |
2100 | | // Assume that we are already in the Backfilling mode. |
2101 | | Status Tablet::BackfillIndexesForYsql( |
2102 | | const std::vector<IndexInfo>& indexes, |
2103 | | const std::string& backfill_from, |
2104 | | const CoarseTimePoint deadline, |
2105 | | const HybridTime read_time, |
2106 | | const HostPort& pgsql_proxy_bind_address, |
2107 | | const std::string& database_name, |
2108 | | const uint64_t postgres_auth_key, |
2109 | | size_t* number_of_rows_processed, |
2110 | 235 | std::string* backfilled_until) { |
2111 | 235 | if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) { |
2112 | 0 | TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms); |
2113 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms)); |
2114 | 0 | } |
2115 | 235 | LOG(INFO) << "Begin " << __func__ << " at " << read_time << " from " |
2116 | 235 | << (backfill_from.empty() ? "<start-of-the-tablet>" : strings::b2a_hex(backfill_from)) |
2117 | 235 | << " for " << AsString(indexes); |
2118 | 235 | *backfilled_until = backfill_from; |
2119 | 235 | pgwrapper::PGConnPtr conn = |
2120 | 235 | VERIFY_RESULT(ConnectToPostgres(pgsql_proxy_bind_address, database_name, postgres_auth_key)); |
2121 | | |
2122 | | // Construct query string. |
2123 | 235 | std::string index_oids; |
2124 | 235 | { |
2125 | 235 | std::stringstream ss; |
2126 | 235 | for (auto& index : indexes) { |
2127 | | // Cannot use Oid type because for large OID such as 2147500041, it overflows Postgres |
2128 | | // lexer <ival> type. Use int to output as -2147467255 that is accepted by <ival>. |
2129 | 235 | int index_oid = VERIFY_RESULT(GetPgsqlTableOid(index.table_id())); |
2130 | 235 | ss << index_oid << ","; |
2131 | 235 | } |
2132 | 235 | index_oids = ss.str(); |
2133 | 235 | index_oids.pop_back(); |
2134 | 235 | } |
2135 | 235 | std::string partition_key = metadata_->partition()->partition_key_start(); |
2136 | | |
2137 | 235 | BackfillParams backfill_params(deadline); |
2138 | 235 | *number_of_rows_processed = 0; |
2139 | 257 | do { |
2140 | 257 | std::string serialized_backfill_spec = |
2141 | 257 | GenerateSerializedBackfillSpec(backfill_params.batch_size, *backfilled_until); |
2142 | | |
2143 | | // This should be safe from injection attacks because the parameters only consist of characters |
2144 | | // [-,0-9a-f]. |
2145 | 257 | std::string query_str = Format( |
2146 | 257 | "BACKFILL INDEX $0 WITH x'$1' READ TIME $2 PARTITION x'$3';", |
2147 | 257 | index_oids, |
2148 | 257 | b2a_hex(serialized_backfill_spec), |
2149 | 257 | read_time.ToUint64(), |
2150 | 257 | b2a_hex(partition_key)); |
2151 | 0 | VLOG(1) << __func__ << ": libpq query string: " << query_str; |
2152 | | |
2153 | 256 | PgsqlBackfillSpecPB spec = VERIFY_RESULT(QueryPostgresToDoBackfill(conn, query_str)); |
2154 | 256 | *number_of_rows_processed += spec.count(); |
2155 | 256 | *backfilled_until = spec.next_row_key(); |
2156 | | |
2157 | 0 | VLOG(2) << "Backfilled " << *number_of_rows_processed << " rows. " |
2158 | 0 | << "Setting backfilled_until to " << b2a_hex(*backfilled_until) << " of length " |
2159 | 0 | << backfilled_until->length(); |
2160 | | |
2161 | 256 | MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed); |
2162 | 256 | } while (CanProceedToBackfillMoreRows( |
2163 | 256 | backfill_params, *backfilled_until, *number_of_rows_processed)); |
2164 | | |
2165 | 0 | VLOG(1) << "Backfilled " << *number_of_rows_processed << " rows. " |
2166 | 0 | << "Set backfilled_until to " |
2167 | 0 | << (backfilled_until->empty() ? "(empty)" : b2a_hex(*backfilled_until)); |
2168 | 234 | return Status::OK(); |
2169 | 235 | } |
2170 | | |
2171 | | std::vector<yb::ColumnSchema> Tablet::GetColumnSchemasForIndex( |
2172 | 2.38k | const std::vector<IndexInfo>& indexes) { |
2173 | 2.38k | std::unordered_set<yb::ColumnId> col_ids_set; |
2174 | 2.38k | std::vector<yb::ColumnSchema> columns; |
2175 | | |
2176 | 12.2k | for (auto idx : schema()->column_ids()) { |
2177 | 12.2k | if (schema()->is_key_column(idx)) { |
2178 | 6.54k | col_ids_set.insert(idx); |
2179 | 6.54k | auto res = schema()->column_by_id(idx); |
2180 | 6.54k | if (res) { |
2181 | 6.53k | columns.push_back(*res); |
2182 | 10 | } else { |
2183 | 10 | LOG(DFATAL) << "Unexpected: cannot find the column in the main table for " |
2184 | 10 | << idx; |
2185 | 10 | } |
2186 | 6.54k | } |
2187 | 12.2k | } |
2188 | 2.42k | for (const IndexInfo& idx : indexes) { |
2189 | 9.73k | for (const auto& idx_col : idx.columns()) { |
2190 | 9.73k | if (col_ids_set.find(idx_col.indexed_column_id) == col_ids_set.end()) { |
2191 | 3.15k | col_ids_set.insert(idx_col.indexed_column_id); |
2192 | 3.15k | auto res = schema()->column_by_id(idx_col.indexed_column_id); |
2193 | 3.16k | if (res) { |
2194 | 3.16k | columns.push_back(*res); |
2195 | 18.4E | } else { |
2196 | 18.4E | LOG(DFATAL) << "Unexpected: cannot find the column in the main table for " |
2197 | 18.4E | << idx_col.indexed_column_id; |
2198 | 18.4E | } |
2199 | 3.15k | } |
2200 | 9.73k | } |
2201 | 2.42k | if (idx.where_predicate_spec()) { |
2202 | 1.32k | for (const auto col_in_pred : idx.where_predicate_spec()->column_ids()) { |
2203 | 1.32k | ColumnId col_id_in_pred(col_in_pred); |
2204 | 1.32k | if (col_ids_set.find(col_id_in_pred) == col_ids_set.end()) { |
2205 | 152 | col_ids_set.insert(col_id_in_pred); |
2206 | 152 | auto res = schema()->column_by_id(col_id_in_pred); |
2207 | 153 | if (res) { |
2208 | 153 | columns.push_back(*res); |
2209 | 18.4E | } else { |
2210 | 18.4E | LOG(DFATAL) << "Unexpected: cannot find the column in the main table for " << |
2211 | 18.4E | col_id_in_pred; |
2212 | 18.4E | } |
2213 | 152 | } |
2214 | 1.32k | } |
2215 | 1.09k | } |
2216 | 2.42k | } |
2217 | 2.38k | return columns; |
2218 | 2.38k | } |
2219 | | |
2220 | | namespace { |
2221 | | |
2222 | 2.38k | std::vector<TableId> GetIndexIds(const std::vector<IndexInfo>& indexes) { |
2223 | 2.38k | std::vector<TableId> index_ids; |
2224 | 2.40k | for (const IndexInfo& idx : indexes) { |
2225 | 2.40k | index_ids.push_back(idx.table_id()); |
2226 | 2.40k | } |
2227 | 2.38k | return index_ids; |
2228 | 2.38k | } |
2229 | | |
2230 | | template <typename SomeVector> |
2231 | | void SleepToThrottleRate( |
2232 | 0 | SomeVector* index_requests, int32 row_access_rate_per_sec, CoarseTimePoint* last_flushed_at) { |
2233 | 0 | auto now = CoarseMonoClock::Now(); |
2234 | 0 | if (row_access_rate_per_sec > 0) { |
2235 | 0 | auto duration_since_last_batch = MonoDelta(now - *last_flushed_at); |
2236 | 0 | auto expected_duration_ms = |
2237 | 0 | MonoDelta::FromMilliseconds(index_requests->size() * 1000 / row_access_rate_per_sec); |
2238 | 0 | DVLOG(3) << "Duration since last batch " << duration_since_last_batch << " expected duration " |
2239 | 0 | << expected_duration_ms |
2240 | 0 | << " extra time so sleep: " << expected_duration_ms - duration_since_last_batch; |
2241 | 0 | if (duration_since_last_batch < expected_duration_ms) { |
2242 | 0 | SleepFor(expected_duration_ms - duration_since_last_batch); |
2243 | 0 | } |
2244 | 0 | } |
2245 | 0 | } |
2246 | | |
2247 | | Result<client::YBTablePtr> GetTable( |
2248 | 9.54k | const TableId& table_id, const std::shared_ptr<client::YBMetaDataCache>& metadata_cache) { |
2249 | | // TODO create async version of GetTable. |
2250 | | // It is ok to have sync call here, because we use cache and it should not take too long. |
2251 | 9.54k | client::YBTablePtr index_table; |
2252 | 9.54k | bool cache_used_ignored = false; |
2253 | 9.54k | RETURN_NOT_OK(metadata_cache->GetTable(table_id, &index_table, &cache_used_ignored)); |
2254 | 9.54k | return index_table; |
2255 | 9.54k | } |
2256 | | |
2257 | | } // namespace |
2258 | | |
2259 | | // Should backfill the index with the information contained in this tablet. |
2260 | | // Assume that we are already in the Backfilling mode. |
2261 | | Status Tablet::BackfillIndexes( |
2262 | | const std::vector<IndexInfo>& indexes, |
2263 | | const std::string& backfill_from, |
2264 | | const CoarseTimePoint deadline, |
2265 | | const HybridTime read_time, |
2266 | | size_t* number_of_rows_processed, |
2267 | | std::string* backfilled_until, |
2268 | 2.38k | std::unordered_set<TableId>* failed_indexes) { |
2269 | 2.38k | TRACE(__func__); |
2270 | 2.38k | if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) { |
2271 | 155 | TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms); |
2272 | 155 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms)); |
2273 | 155 | } |
2274 | 4 | VLOG(2) << "Begin BackfillIndexes at " << read_time << " for " << AsString(indexes); |
2275 | | |
2276 | 2.38k | std::vector<TableId> index_ids = GetIndexIds(indexes); |
2277 | 2.38k | std::vector<yb::ColumnSchema> columns = GetColumnSchemasForIndex(indexes); |
2278 | | |
2279 | 2.38k | Schema projection(columns, {}, schema()->num_key_columns()); |
2280 | 2.38k | auto iter = VERIFY_RESULT(NewRowIterator( |
2281 | 2.38k | projection, ReadHybridTime::SingleTime(read_time), "" /* table_id */, deadline)); |
2282 | 2.38k | QLTableRow row; |
2283 | 2.38k | std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>> index_requests; |
2284 | | |
2285 | 2.38k | BackfillParams backfill_params{deadline}; |
2286 | 2.38k | constexpr auto kProgressInterval = 1000; |
2287 | | |
2288 | 2.38k | if (!backfill_from.empty()) { |
2289 | 0 | VLOG(1) << "Resuming backfill from " << b2a_hex(backfill_from); |
2290 | 97 | *backfilled_until = backfill_from; |
2291 | 97 | RETURN_NOT_OK(iter->SeekTuple(Slice(backfill_from))); |
2292 | 97 | } |
2293 | | |
2294 | 2.38k | string resume_backfill_from; |
2295 | 2.38k | *number_of_rows_processed = 0; |
2296 | 2.38k | int TEST_number_rows_corrupted = 0; |
2297 | 2.38k | int TEST_number_rows_dropped = 0; |
2298 | | |
2299 | 2.86k | while (VERIFY_RESULT(iter->HasNext())) { |
2300 | 2.86k | if (index_requests.empty()) { |
2301 | 427 | *backfilled_until = VERIFY_RESULT(iter->GetTupleId()).ToBuffer(); |
2302 | 427 | MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed); |
2303 | 427 | } |
2304 | | |
2305 | 2.86k | if (!CanProceedToBackfillMoreRows(backfill_params, *number_of_rows_processed)) { |
2306 | 96 | resume_backfill_from = VERIFY_RESULT(iter->GetTupleId()).ToBuffer(); |
2307 | 96 | break; |
2308 | 2.77k | } |
2309 | | |
2310 | 2.77k | RETURN_NOT_OK(iter->NextRow(&row)); |
2311 | 2.77k | if (FLAGS_TEST_backfill_sabotage_frequency > 0 && |
2312 | 0 | *number_of_rows_processed % FLAGS_TEST_backfill_sabotage_frequency == 0) { |
2313 | 0 | VLOG(1) << "Corrupting fetched row: " << row.ToString(); |
2314 | | // Corrupt first key column, since index should not be built on primary key |
2315 | 0 | row.MarkTombstoned(schema()->column_id(0)); |
2316 | 0 | TEST_number_rows_corrupted++; |
2317 | 0 | } |
2318 | | |
2319 | 2.77k | if (FLAGS_TEST_backfill_drop_frequency > 0 && |
2320 | 0 | *number_of_rows_processed % FLAGS_TEST_backfill_drop_frequency == 0) { |
2321 | 0 | (*number_of_rows_processed)++; |
2322 | 0 | VLOG(1) << "Dropping fetched row: " << row.ToString(); |
2323 | 0 | TEST_number_rows_dropped++; |
2324 | 0 | continue; |
2325 | 0 | } |
2326 | | |
2327 | 0 | DVLOG(2) << "Building index for fetched row: " << row.ToString(); |
2328 | 2.77k | RETURN_NOT_OK(UpdateIndexInBatches( |
2329 | 2.77k | row, indexes, read_time, backfill_params.deadline, &index_requests, |
2330 | 2.77k | failed_indexes)); |
2331 | | |
2332 | 2.77k | if (++(*number_of_rows_processed) % kProgressInterval == 0) { |
2333 | 0 | VLOG(1) << "Processed " << *number_of_rows_processed << " rows"; |
2334 | 0 | } |
2335 | 2.77k | } |
2336 | | |
2337 | 2.38k | if (FLAGS_TEST_backfill_sabotage_frequency > 0) { |
2338 | 0 | LOG(INFO) << "In total, " << TEST_number_rows_corrupted |
2339 | 0 | << " rows were corrupted in index backfill."; |
2340 | 0 | } |
2341 | | |
2342 | 2.38k | if (FLAGS_TEST_backfill_drop_frequency > 0) { |
2343 | 0 | LOG(INFO) << "In total, " << TEST_number_rows_dropped |
2344 | 0 | << " rows were dropped in index backfill."; |
2345 | 0 | } |
2346 | | |
2347 | 5 | VLOG(1) << "Processed " << *number_of_rows_processed << " rows"; |
2348 | 2.38k | RETURN_NOT_OK(FlushWriteIndexBatch( |
2349 | 2.38k | read_time, backfill_params.deadline, &index_requests, failed_indexes)); |
2350 | 2.38k | MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed); |
2351 | 2.38k | *backfilled_until = resume_backfill_from; |
2352 | 2.38k | LOG(INFO) << "Done BackfillIndexes at " << read_time << " for " << AsString(index_ids) |
2353 | 2.38k | << " until " |
2354 | 2.28k | << (backfilled_until->empty() ? "<end of the tablet>" : b2a_hex(*backfilled_until)); |
2355 | 2.38k | return Status::OK(); |
2356 | 2.38k | } |
2357 | | |
2358 | | Status Tablet::UpdateIndexInBatches( |
2359 | | const QLTableRow& row, |
2360 | | const std::vector<IndexInfo>& indexes, |
2361 | | const HybridTime write_time, |
2362 | | const CoarseTimePoint deadline, |
2363 | | std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests, |
2364 | 2.77k | std::unordered_set<TableId>* failed_indexes) { |
2365 | 2.77k | const QLTableRow& kEmptyRow = QLTableRow::empty_row(); |
2366 | 2.77k | QLExprExecutor expr_executor; |
2367 | | |
2368 | 9.59k | for (const IndexInfo& index : indexes) { |
2369 | 9.59k | QLWriteRequestPB* const index_request = VERIFY_RESULT( |
2370 | 9.59k | docdb::CreateAndSetupIndexInsertRequest( |
2371 | 9.59k | &expr_executor, /* index_has_write_permission */ true, |
2372 | 9.59k | kEmptyRow, row, &index, index_requests)); |
2373 | 9.59k | if (index_request) |
2374 | 9.54k | index_request->set_is_backfill(true); |
2375 | 9.59k | } |
2376 | | |
2377 | | // Update the index write op. |
2378 | 2.77k | return FlushWriteIndexBatchIfRequired(write_time, deadline, index_requests, failed_indexes); |
2379 | 2.77k | } |
2380 | | |
2381 | | Result<std::shared_ptr<YBSession>> Tablet::GetSessionForVerifyOrBackfill( |
2382 | 2.49k | const CoarseTimePoint deadline) { |
2383 | 2.49k | if (!client_future_.valid()) { |
2384 | 0 | return STATUS_FORMAT(IllegalState, "Client future is not set up for $0", tablet_id()); |
2385 | 0 | } |
2386 | | |
2387 | 2.49k | auto client = client_future_.get(); |
2388 | 2.49k | auto session = std::make_shared<YBSession>(client); |
2389 | 2.49k | session->SetDeadline(deadline); |
2390 | 2.49k | return session; |
2391 | 2.49k | } |
2392 | | |
2393 | | Status Tablet::FlushWriteIndexBatchIfRequired( |
2394 | | const HybridTime write_time, |
2395 | | const CoarseTimePoint deadline, |
2396 | | std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests, |
2397 | 2.77k | std::unordered_set<TableId>* failed_indexes) { |
2398 | 2.77k | if (index_requests->size() < FLAGS_backfill_index_write_batch_size) { |
2399 | 2.66k | return Status::OK(); |
2400 | 2.66k | } |
2401 | 109 | return FlushWriteIndexBatch(write_time, deadline, index_requests, failed_indexes); |
2402 | 109 | } |
2403 | | |
2404 | | Status Tablet::FlushWriteIndexBatch( |
2405 | | const HybridTime write_time, |
2406 | | const CoarseTimePoint deadline, |
2407 | | std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests, |
2408 | 2.49k | std::unordered_set<TableId>* failed_indexes) { |
2409 | 2.49k | if (!client_future_.valid()) { |
2410 | 0 | return STATUS_FORMAT(IllegalState, "Client future is not set up for $0", tablet_id()); |
2411 | 2.49k | } else if (!YBMetaDataCache()) { |
2412 | 0 | return STATUS(IllegalState, "Table metadata cache is not present for index update"); |
2413 | 0 | } |
2414 | 2.49k | std::shared_ptr<YBSession> session = VERIFY_RESULT(GetSessionForVerifyOrBackfill(deadline)); |
2415 | | |
2416 | 2.49k | std::unordered_set< |
2417 | 2.49k | client::YBqlWriteOpPtr, client::YBqlWritePrimaryKeyComparator, |
2418 | 2.49k | client::YBqlWritePrimaryKeyComparator> |
2419 | 2.49k | ops_by_primary_key; |
2420 | 2.49k | std::vector<shared_ptr<client::YBqlWriteOp>> write_ops; |
2421 | | |
2422 | 2.49k | constexpr int kMaxNumRetries = 10; |
2423 | 2.49k | auto metadata_cache = YBMetaDataCache(); |
2424 | | |
2425 | 9.54k | for (auto& pair : *index_requests) { |
2426 | 9.54k | client::YBTablePtr index_table = |
2427 | 9.54k | VERIFY_RESULT(GetTable(pair.first->table_id(), metadata_cache)); |
2428 | | |
2429 | 9.54k | shared_ptr<client::YBqlWriteOp> index_op(index_table->NewQLWrite()); |
2430 | 9.54k | index_op->set_write_time_for_backfill(write_time); |
2431 | 9.54k | index_op->mutable_request()->Swap(&pair.second); |
2432 | 9.54k | if (index_table->IsUniqueIndex()) { |
2433 | 545 | if (ops_by_primary_key.count(index_op) > 0) { |
2434 | 0 | VLOG(2) << "Splitting the batch of writes because " << index_op->ToString() |
2435 | 0 | << " collides with an existing update in this batch."; |
2436 | 0 | VLOG(1) << "Flushing " << ops_by_primary_key.size() << " ops to the index"; |
2437 | 4 | RETURN_NOT_OK(FlushWithRetries(session, write_ops, kMaxNumRetries, failed_indexes)); |
2438 | 0 | VLOG(3) << "Done flushing ops to the index"; |
2439 | 3 | ops_by_primary_key.clear(); |
2440 | 3 | } |
2441 | 544 | ops_by_primary_key.insert(index_op); |
2442 | 544 | } |
2443 | 9.54k | session->Apply(index_op); |
2444 | 9.54k | write_ops.push_back(index_op); |
2445 | 9.54k | } |
2446 | | |
2447 | 4 | VLOG(1) << Format("Flushing $0 ops to the index", |
2448 | 0 | (!ops_by_primary_key.empty() ? ops_by_primary_key.size() |
2449 | 4 | : write_ops.size())); |
2450 | 2.49k | RETURN_NOT_OK(FlushWithRetries(session, write_ops, kMaxNumRetries, failed_indexes)); |
2451 | 2.49k | index_requests->clear(); |
2452 | | |
2453 | 2.49k | return Status::OK(); |
2454 | 2.49k | } |
2455 | | |
2456 | | template <typename SomeYBqlOp> |
2457 | | Status Tablet::FlushWithRetries( |
2458 | | shared_ptr<YBSession> session, |
2459 | | const std::vector<shared_ptr<SomeYBqlOp>>& index_ops, |
2460 | | int num_retries, |
2461 | 2.48k | std::unordered_set<TableId>* failed_indexes) { |
2462 | 2.48k | auto retries_left = num_retries; |
2463 | 2.48k | std::vector<std::shared_ptr<SomeYBqlOp>> pending_ops = index_ops; |
2464 | 2.48k | std::unordered_map<string, int32_t> error_msg_cnts; |
2465 | 2.48k | do { |
2466 | 2.48k | std::vector<std::shared_ptr<SomeYBqlOp>> failed_ops; |
2467 | 2.48k | RETURN_NOT_OK_PREPEND(session->Flush(), "Flush failed."); |
2468 | 18.4E | VLOG(3) << "Done flushing ops to the index"; |
2469 | 9.55k | for (auto index_op : pending_ops) { |
2470 | 9.55k | if (index_op->response().status() == QLResponsePB::YQL_STATUS_OK) { |
2471 | 9.54k | continue; |
2472 | 9.54k | } |
2473 | | |
2474 | 0 | VLOG(2) << "Got response " << AsString(index_op->response()) << " for " |
2475 | 0 | << AsString(index_op->request()); |
2476 | 5 | if (index_op->response().status() != QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) { |
2477 | 5 | failed_indexes->insert(index_op->table()->id()); |
2478 | 5 | const string& error_message = index_op->response().error_message(); |
2479 | 5 | error_msg_cnts[error_message]++; |
2480 | 0 | VLOG_WITH_PREFIX(3) << "Failing index " << index_op->table()->id() |
2481 | 0 | << " due to non-retryable errors " << error_message; |
2482 | 5 | continue; |
2483 | 5 | } |
2484 | | |
2485 | 0 | failed_ops.push_back(index_op); |
2486 | 0 | session->Apply(index_op); |
2487 | 0 | } |
2488 | | |
2489 | 2.48k | if (!failed_ops.empty()) { |
2490 | 0 | VLOG(1) << Format("Flushing $0 failed ops again to the index", failed_ops.size()); |
2491 | 0 | } |
2492 | 2.48k | pending_ops = std::move(failed_ops); |
2493 | 2.48k | } while (!pending_ops.empty() && --retries_left > 0); |
2494 | | |
2495 | 2.48k | if (!failed_indexes->empty()) { |
2496 | 0 | VLOG_WITH_PREFIX(1) << "Failed due to non-retryable errors " << AsString(*failed_indexes); |
2497 | 5 | } |
2498 | 2.48k | if (!pending_ops.empty()) { |
2499 | 0 | for (auto index_op : pending_ops) { |
2500 | 0 | failed_indexes->insert(index_op->table()->id()); |
2501 | 0 | const string& error_message = index_op->response().error_message(); |
2502 | 0 | error_msg_cnts[error_message]++; |
2503 | 0 | } |
2504 | 0 | VLOG_WITH_PREFIX(1) << "Failed indexes including retryable and non-retryable errors are " |
2505 | 0 | << AsString(*failed_indexes); |
2506 | 0 | } |
2507 | 2.48k | return ( |
2508 | 2.48k | failed_indexes->empty() |
2509 | 2.49k | ? Status::OK() |
2510 | 18.4E | : STATUS_SUBSTITUTE( |
2511 | 2.48k | IllegalState, "Index op failed for $0 requests after $1 retries with errors: $2", |
2512 | 2.48k | pending_ops.size(), num_retries, AsString(error_msg_cnts))); |
2513 | 2.48k | } _ZN2yb6tablet6Tablet16FlushWithRetriesINS_6client11YBqlWriteOpEEENS_6StatusENSt3__110shared_ptrINS3_9YBSessionEEERKNS6_6vectorINS7_IT_EENS6_9allocatorISC_EEEEiPNS6_13unordered_setINS6_12basic_stringIcNS6_11char_traitsIcEENSD_IcEEEENS6_4hashISN_EENS6_8equal_toISN_EENSD_ISN_EEEE Line | Count | Source | 2461 | 2.48k | std::unordered_set<TableId>* failed_indexes) { | 2462 | 2.48k | auto retries_left = num_retries; | 2463 | 2.48k | std::vector<std::shared_ptr<SomeYBqlOp>> pending_ops = index_ops; | 2464 | 2.48k | std::unordered_map<string, int32_t> error_msg_cnts; | 2465 | 2.48k | do { | 2466 | 2.48k | std::vector<std::shared_ptr<SomeYBqlOp>> failed_ops; | 2467 | 2.48k | RETURN_NOT_OK_PREPEND(session->Flush(), "Flush failed."); | 2468 | 18.4E | VLOG(3) << "Done flushing ops to the index"; | 2469 | 9.55k | for (auto index_op : pending_ops) { | 2470 | 9.55k | if (index_op->response().status() == QLResponsePB::YQL_STATUS_OK) { | 2471 | 9.54k | continue; | 2472 | 9.54k | } | 2473 | | | 2474 | 0 | VLOG(2) << "Got response " << AsString(index_op->response()) << " for " | 2475 | 0 | << AsString(index_op->request()); | 2476 | 5 | if (index_op->response().status() != QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) { | 2477 | 5 | failed_indexes->insert(index_op->table()->id()); | 2478 | 5 | const string& error_message = index_op->response().error_message(); | 2479 | 5 | error_msg_cnts[error_message]++; | 2480 | 0 | VLOG_WITH_PREFIX(3) << "Failing index " << index_op->table()->id() | 2481 | 0 | << " due to non-retryable errors " << error_message; | 2482 | 5 | continue; | 2483 | 5 | } | 2484 | | | 2485 | 0 | failed_ops.push_back(index_op); | 2486 | 0 | session->Apply(index_op); | 2487 | 0 | } | 2488 | | | 2489 | 2.48k | if (!failed_ops.empty()) { | 2490 | 0 | VLOG(1) << Format("Flushing $0 failed ops again to the index", failed_ops.size()); | 2491 | 0 | } | 2492 | 2.48k | pending_ops = std::move(failed_ops); | 2493 | 2.48k | } while (!pending_ops.empty() && --retries_left > 0); | 2494 | | | 2495 | 2.48k | if (!failed_indexes->empty()) { | 2496 | 0 | VLOG_WITH_PREFIX(1) << "Failed due to non-retryable errors " << AsString(*failed_indexes); | 2497 | 5 | } | 2498 | 2.48k | if (!pending_ops.empty()) { | 2499 | 0 | for (auto index_op : pending_ops) { | 2500 | 0 | failed_indexes->insert(index_op->table()->id()); | 2501 | 0 | const string& error_message = index_op->response().error_message(); | 2502 | 0 | error_msg_cnts[error_message]++; | 2503 | 0 | } | 2504 | 0 | VLOG_WITH_PREFIX(1) << "Failed indexes including retryable and non-retryable errors are " | 2505 | 0 | << AsString(*failed_indexes); | 2506 | 0 | } | 2507 | 2.48k | return ( | 2508 | 2.48k | failed_indexes->empty() | 2509 | 2.49k | ? Status::OK() | 2510 | 18.4E | : STATUS_SUBSTITUTE( | 2511 | 2.48k | IllegalState, "Index op failed for $0 requests after $1 retries with errors: $2", | 2512 | 2.48k | pending_ops.size(), num_retries, AsString(error_msg_cnts))); | 2513 | 2.48k | } |
Unexecuted instantiation: _ZN2yb6tablet6Tablet16FlushWithRetriesINS_6client10YBqlReadOpEEENS_6StatusENSt3__110shared_ptrINS3_9YBSessionEEERKNS6_6vectorINS7_IT_EENS6_9allocatorISC_EEEEiPNS6_13unordered_setINS6_12basic_stringIcNS6_11char_traitsIcEENSD_IcEEEENS6_4hashISN_EENS6_8equal_toISN_EENSD_ISN_EEEE |
2514 | | |
2515 | | Status Tablet::VerifyIndexTableConsistencyForCQL( |
2516 | | const std::vector<IndexInfo>& indexes, |
2517 | | const std::string& start_key, |
2518 | | const int num_rows, |
2519 | | const CoarseTimePoint deadline, |
2520 | | const HybridTime read_time, |
2521 | | std::unordered_map<TableId, uint64>* consistency_stats, |
2522 | 0 | std::string* verified_until) { |
2523 | 0 | std::vector<TableId> index_ids = GetIndexIds(indexes); |
2524 | 0 | std::vector<yb::ColumnSchema> columns = GetColumnSchemasForIndex(indexes); |
2525 | 0 | return VerifyTableConsistencyForCQL( |
2526 | 0 | index_ids, columns, start_key, num_rows, deadline, read_time, false, consistency_stats, |
2527 | 0 | verified_until); |
2528 | 0 | } |
2529 | | |
2530 | | Status Tablet::VerifyMainTableConsistencyForCQL( |
2531 | | const TableId& main_table_id, |
2532 | | const std::string& start_key, |
2533 | | const int num_rows, |
2534 | | const CoarseTimePoint deadline, |
2535 | | const HybridTime read_time, |
2536 | | std::unordered_map<TableId, uint64>* consistency_stats, |
2537 | 0 | std::string* verified_until) { |
2538 | 0 | const std::vector<yb::ColumnSchema>& columns = schema()->columns(); |
2539 | 0 | const std::vector<TableId>& table_ids = {main_table_id}; |
2540 | 0 | return VerifyTableConsistencyForCQL( |
2541 | 0 | table_ids, columns, start_key, num_rows, deadline, read_time, true, consistency_stats, |
2542 | 0 | verified_until); |
2543 | 0 | } |
2544 | | |
2545 | | Status Tablet::VerifyTableConsistencyForCQL( |
2546 | | const std::vector<TableId>& table_ids, |
2547 | | const std::vector<yb::ColumnSchema>& columns, |
2548 | | const std::string& start_key, |
2549 | | const int num_rows, |
2550 | | const CoarseTimePoint deadline, |
2551 | | const HybridTime read_time, |
2552 | | const bool is_main_table, |
2553 | | std::unordered_map<TableId, uint64>* consistency_stats, |
2554 | 0 | std::string* verified_until) { |
2555 | 0 | Schema projection(columns, {}, schema()->num_key_columns()); |
2556 | 0 | auto iter = VERIFY_RESULT(NewRowIterator( |
2557 | 0 | projection, ReadHybridTime::SingleTime(read_time), "" /* table_id */, deadline)); |
2558 | |
|
2559 | 0 | if (!start_key.empty()) { |
2560 | 0 | VLOG(2) << "Starting verify index from " << b2a_hex(start_key); |
2561 | 0 | RETURN_NOT_OK(iter->SeekTuple(Slice(start_key))); |
2562 | 0 | } |
2563 | |
|
2564 | 0 | constexpr int kProgressInterval = 1000; |
2565 | 0 | CoarseTimePoint last_flushed_at; |
2566 | |
|
2567 | 0 | QLTableRow row; |
2568 | 0 | std::vector<std::pair<const TableId, QLReadRequestPB>> requests; |
2569 | 0 | std::unordered_set<TableId> failed_indexes; |
2570 | 0 | std::string resume_verified_from; |
2571 | |
|
2572 | 0 | int rows_verified = 0; |
2573 | 0 | while (VERIFY_RESULT(iter->HasNext()) && rows_verified < num_rows && |
2574 | 0 | CoarseMonoClock::Now() < deadline) { |
2575 | 0 | resume_verified_from = VERIFY_RESULT(iter->GetTupleId()).ToBuffer(); |
2576 | 0 | RETURN_NOT_OK(iter->NextRow(&row)); |
2577 | 0 | VLOG(1) << "Verifying index for main table row: " << row.ToString(); |
2578 | |
|
2579 | 0 | RETURN_NOT_OK(VerifyTableInBatches( |
2580 | 0 | row, table_ids, read_time, deadline, is_main_table, &requests, &last_flushed_at, |
2581 | 0 | &failed_indexes, consistency_stats)); |
2582 | 0 | if (++rows_verified % kProgressInterval == 0) { |
2583 | 0 | VLOG(1) << "Verified " << rows_verified << " rows"; |
2584 | 0 | } |
2585 | 0 | *verified_until = resume_verified_from; |
2586 | 0 | } |
2587 | 0 | return FlushVerifyBatch( |
2588 | 0 | read_time, deadline, &requests, &last_flushed_at, &failed_indexes, consistency_stats); |
2589 | 0 | } |
2590 | | |
2591 | | namespace { |
2592 | | |
2593 | 0 | QLConditionPB* InitWhereOp(QLReadRequestPB* req) { |
2594 | | // Add the hash column values |
2595 | 0 | DCHECK(req->hashed_column_values().empty()); |
2596 | | |
2597 | | // Add the range column values to the where clause |
2598 | 0 | QLConditionPB* where_pb = req->mutable_where_expr()->mutable_condition(); |
2599 | 0 | if (!where_pb->has_op()) { |
2600 | 0 | where_pb->set_op(QL_OP_AND); |
2601 | 0 | } |
2602 | 0 | DCHECK_EQ(where_pb->op(), QL_OP_AND); |
2603 | 0 | return where_pb; |
2604 | 0 | } |
2605 | | |
2606 | 0 | void SetSelectedExprToTrue(QLReadRequestPB* req) { |
2607 | | // Set TRUE as selected exprs helps reduce |
2608 | | // the need for row retrieval in the index read request |
2609 | 0 | req->add_selected_exprs()->mutable_value()->set_bool_value(true); |
2610 | 0 | QLRSRowDescPB* rsrow_desc = req->mutable_rsrow_desc(); |
2611 | 0 | QLRSColDescPB* rscol_desc = rsrow_desc->add_rscol_descs(); |
2612 | 0 | rscol_desc->set_name("1"); |
2613 | 0 | rscol_desc->mutable_ql_type()->set_main(yb::DataType::BOOL); |
2614 | 0 | } |
2615 | | |
2616 | | Status WhereMainTableToPB( |
2617 | | const QLTableRow& key, |
2618 | | const IndexInfo& index_info, |
2619 | | const Schema& main_table_schema, |
2620 | 0 | QLReadRequestPB* req) { |
2621 | 0 | std::unordered_map<ColumnId, ColumnId> column_id_map; |
2622 | 0 | for (const auto& col : index_info.columns()) { |
2623 | 0 | column_id_map.insert({col.indexed_column_id, col.column_id}); |
2624 | 0 | } |
2625 | |
|
2626 | 0 | auto column_refs = req->mutable_column_refs(); |
2627 | 0 | QLConditionPB* where_pb = InitWhereOp(req); |
2628 | |
|
2629 | 0 | for (const auto& col_id : main_table_schema.column_ids()) { |
2630 | 0 | if (main_table_schema.is_hash_key_column(col_id)) { |
2631 | 0 | *req->add_hashed_column_values()->mutable_value() = *key.GetValue(column_id_map[col_id]); |
2632 | 0 | column_refs->add_ids(col_id); |
2633 | 0 | } else { |
2634 | 0 | auto it = column_id_map.find(col_id); |
2635 | 0 | if (it != column_id_map.end()) { |
2636 | 0 | QLConditionPB* col_cond_pb = where_pb->add_operands()->mutable_condition(); |
2637 | 0 | col_cond_pb->set_op(QL_OP_EQUAL); |
2638 | 0 | col_cond_pb->add_operands()->set_column_id(col_id); |
2639 | 0 | *col_cond_pb->add_operands()->mutable_value() = *key.GetValue(it->second); |
2640 | 0 | column_refs->add_ids(col_id); |
2641 | 0 | } |
2642 | 0 | } |
2643 | 0 | } |
2644 | |
|
2645 | 0 | SetSelectedExprToTrue(req); |
2646 | 0 | return Status::OK(); |
2647 | 0 | } |
2648 | | |
2649 | | // Schema is index schema while key is row from main table |
2650 | | Status WhereIndexToPB( |
2651 | | const QLTableRow& key, |
2652 | | const IndexInfo& index_info, |
2653 | | const Schema& schema, |
2654 | 0 | QLReadRequestPB* req) { |
2655 | 0 | QLConditionPB* where_pb = InitWhereOp(req); |
2656 | 0 | auto column_refs = req->mutable_column_refs(); |
2657 | |
|
2658 | 0 | for (size_t idx = 0; idx < index_info.columns().size(); idx++) { |
2659 | 0 | const ColumnId& column_id = index_info.column(idx).column_id; |
2660 | 0 | const ColumnId& indexed_column_id = index_info.column(idx).indexed_column_id; |
2661 | 0 | if (schema.is_hash_key_column(column_id)) { |
2662 | 0 | *req->add_hashed_column_values()->mutable_value() = *key.GetValue(indexed_column_id); |
2663 | 0 | } else { |
2664 | 0 | QLConditionPB* col_cond_pb = where_pb->add_operands()->mutable_condition(); |
2665 | 0 | col_cond_pb->set_op(QL_OP_EQUAL); |
2666 | 0 | col_cond_pb->add_operands()->set_column_id(column_id); |
2667 | 0 | *col_cond_pb->add_operands()->mutable_value() = *key.GetValue(indexed_column_id); |
2668 | 0 | } |
2669 | 0 | column_refs->add_ids(column_id); |
2670 | 0 | } |
2671 | |
|
2672 | 0 | SetSelectedExprToTrue(req); |
2673 | 0 | return Status::OK(); |
2674 | 0 | } |
2675 | | |
2676 | | } // namespace |
2677 | | |
2678 | | Status Tablet::VerifyTableInBatches( |
2679 | | const QLTableRow& row, |
2680 | | const std::vector<TableId>& table_ids, |
2681 | | const HybridTime read_time, |
2682 | | const CoarseTimePoint deadline, |
2683 | | const bool is_main_table, |
2684 | | std::vector<std::pair<const TableId, QLReadRequestPB>>* requests, |
2685 | | CoarseTimePoint* last_flushed_at, |
2686 | | std::unordered_set<TableId>* failed_indexes, |
2687 | 0 | std::unordered_map<TableId, uint64>* consistency_stats) { |
2688 | 0 | auto client = client_future_.get(); |
2689 | 0 | auto local_index_info = metadata_->primary_table_info()->index_info.get(); |
2690 | 0 | for (const TableId& table_id : table_ids) { |
2691 | 0 | std::shared_ptr<client::YBTable> table; |
2692 | 0 | RETURN_NOT_OK(client->OpenTable(table_id, &table)); |
2693 | 0 | std::shared_ptr<client::YBqlReadOp> read_op(table->NewQLSelect()); |
2694 | |
|
2695 | 0 | QLReadRequestPB* req = read_op->mutable_request(); |
2696 | 0 | if (is_main_table) { |
2697 | 0 | RETURN_NOT_OK(WhereMainTableToPB(row, *local_index_info, table->InternalSchema(), req)); |
2698 | 0 | } else { |
2699 | 0 | RETURN_NOT_OK(WhereIndexToPB(row, table->index_info(), table->InternalSchema(), req)); |
2700 | 0 | } |
2701 | |
|
2702 | 0 | requests->emplace_back(table_id, *req); |
2703 | 0 | } |
2704 | |
|
2705 | 0 | return FlushVerifyBatchIfRequired( |
2706 | 0 | read_time, deadline, requests, last_flushed_at, failed_indexes, consistency_stats); |
2707 | 0 | } |
2708 | | |
2709 | | Status Tablet::FlushVerifyBatchIfRequired( |
2710 | | const HybridTime read_time, |
2711 | | const CoarseTimePoint deadline, |
2712 | | std::vector<std::pair<const TableId, QLReadRequestPB>>* requests, |
2713 | | CoarseTimePoint* last_flushed_at, |
2714 | | std::unordered_set<TableId>* failed_indexes, |
2715 | 0 | std::unordered_map<TableId, uint64>* consistency_stats) { |
2716 | 0 | if (requests->size() < FLAGS_verify_index_read_batch_size) { |
2717 | 0 | return Status::OK(); |
2718 | 0 | } |
2719 | 0 | return FlushVerifyBatch( |
2720 | 0 | read_time, deadline, requests, last_flushed_at, failed_indexes, consistency_stats); |
2721 | 0 | } |
2722 | | |
2723 | | Status Tablet::FlushVerifyBatch( |
2724 | | const HybridTime read_time, |
2725 | | const CoarseTimePoint deadline, |
2726 | | std::vector<std::pair<const TableId, QLReadRequestPB>>* requests, |
2727 | | CoarseTimePoint* last_flushed_at, |
2728 | | std::unordered_set<TableId>* failed_indexes, |
2729 | 0 | std::unordered_map<TableId, uint64>* consistency_stats) { |
2730 | 0 | std::vector<client::YBqlReadOpPtr> read_ops; |
2731 | 0 | std::shared_ptr<YBSession> session = VERIFY_RESULT(GetSessionForVerifyOrBackfill(deadline)); |
2732 | |
|
2733 | 0 | auto client = client_future_.get(); |
2734 | 0 | for (auto& pair : *requests) { |
2735 | 0 | client::YBTablePtr table; |
2736 | 0 | RETURN_NOT_OK(client->OpenTable(pair.first, &table)); |
2737 | |
|
2738 | 0 | client::YBqlReadOpPtr read_op(table->NewQLRead()); |
2739 | 0 | read_op->mutable_request()->Swap(&pair.second); |
2740 | 0 | read_op->SetReadTime(ReadHybridTime::SingleTime(read_time)); |
2741 | |
|
2742 | 0 | session->Apply(read_op); |
2743 | | |
2744 | | // Note: always emplace at tail because row keys must |
2745 | | // correspond sequentially with the read_ops in the vector |
2746 | 0 | read_ops.push_back(read_op); |
2747 | 0 | } |
2748 | |
|
2749 | 0 | RETURN_NOT_OK(FlushWithRetries(session, read_ops, 0, failed_indexes)); |
2750 | |
|
2751 | 0 | for (size_t idx = 0; idx < requests->size(); idx++) { |
2752 | 0 | const client::YBqlReadOpPtr& read_op = read_ops[idx]; |
2753 | 0 | auto row_block = read_op->MakeRowBlock(); |
2754 | 0 | if (row_block && row_block->row_count() == 1) continue; |
2755 | 0 | (*consistency_stats)[read_op->table()->id()]++; |
2756 | 0 | } |
2757 | |
|
2758 | 0 | SleepToThrottleRate(requests, FLAGS_verify_index_rate_rows_per_sec, last_flushed_at); |
2759 | 0 | *last_flushed_at = CoarseMonoClock::Now(); |
2760 | 0 | requests->clear(); |
2761 | |
|
2762 | 0 | return Status::OK(); |
2763 | 0 | } |
2764 | | |
2765 | | ScopedRWOperationPause Tablet::PauseReadWriteOperations( |
2766 | 465k | const Abortable abortable, const Stop stop) { |
2767 | 465k | VTRACE(1, LogPrefix()); |
2768 | 465k | LOG_SLOW_EXECUTION(WARNING, 1000, |
2769 | 466k | Substitute("$0Waiting for pending ops to complete", LogPrefix())) { |
2770 | 466k | return ScopedRWOperationPause( |
2771 | 257k | abortable ? &pending_abortable_op_counter_ : &pending_non_abortable_op_counter_, |
2772 | 466k | CoarseMonoClock::Now() + |
2773 | 466k | MonoDelta::FromMilliseconds(FLAGS_tablet_rocksdb_ops_quiet_down_timeout_ms), |
2774 | 466k | stop); |
2775 | 466k | } |
2776 | 18.4E | FATAL_ERROR("Unreachable code -- the previous block must always return"); |
2777 | 18.4E | } |
2778 | | |
2779 | 90 | ScopedRWOperation Tablet::CreateAbortableScopedRWOperation(const CoarseTimePoint deadline) const { |
2780 | 90 | return ScopedRWOperation(&pending_abortable_op_counter_, deadline); |
2781 | 90 | } |
2782 | | |
2783 | | ScopedRWOperation Tablet::CreateNonAbortableScopedRWOperation( |
2784 | 43.5M | const CoarseTimePoint deadline) const { |
2785 | 43.5M | return ScopedRWOperation(&pending_non_abortable_op_counter_, deadline); |
2786 | 43.5M | } |
2787 | | |
2788 | | Status Tablet::ModifyFlushedFrontier( |
2789 | | const docdb::ConsensusFrontier& frontier, |
2790 | 160k | rocksdb::FrontierModificationMode mode) { |
2791 | 160k | const Status s = regular_db_->ModifyFlushedFrontier(frontier.Clone(), mode); |
2792 | 160k | if (PREDICT_FALSE(!s.ok())) { |
2793 | 0 | auto status = STATUS(IllegalState, "Failed to set flushed frontier", s.ToString()); |
2794 | 0 | LOG_WITH_PREFIX(WARNING) << status; |
2795 | 0 | return status; |
2796 | 0 | } |
2797 | 160k | { |
2798 | 160k | auto flushed_frontier = regular_db_->GetFlushedFrontier(); |
2799 | 160k | const auto& consensus_flushed_frontier = *down_cast<docdb::ConsensusFrontier*>( |
2800 | 160k | flushed_frontier.get()); |
2801 | 160k | DCHECK_EQ(frontier.op_id(), consensus_flushed_frontier.op_id()); |
2802 | 160k | DCHECK_EQ(frontier.hybrid_time(), consensus_flushed_frontier.hybrid_time()); |
2803 | 160k | } |
2804 | | |
2805 | 160k | if (FLAGS_TEST_tablet_verify_flushed_frontier_after_modifying && |
2806 | 0 | mode == rocksdb::FrontierModificationMode::kForce) { |
2807 | 0 | LOG(INFO) << "Verifying that flushed frontier was force-set successfully"; |
2808 | 0 | string test_data_dir = VERIFY_RESULT(Env::Default()->GetTestDirectory()); |
2809 | 0 | const string checkpoint_dir_for_test = Format( |
2810 | 0 | "$0/test_checkpoint_$1_$2", test_data_dir, tablet_id(), MonoTime::Now().ToUint64()); |
2811 | 0 | RETURN_NOT_OK( |
2812 | 0 | rocksdb::checkpoint::CreateCheckpoint(regular_db_.get(), checkpoint_dir_for_test)); |
2813 | 0 | auto se = ScopeExit([checkpoint_dir_for_test] { |
2814 | 0 | CHECK_OK(Env::Default()->DeleteRecursively(checkpoint_dir_for_test)); |
2815 | 0 | }); |
2816 | 0 | rocksdb::Options rocksdb_options; |
2817 | 0 | docdb::InitRocksDBOptions( |
2818 | 0 | &rocksdb_options, LogPrefix(), /* statistics */ nullptr, tablet_options_); |
2819 | 0 | rocksdb_options.create_if_missing = false; |
2820 | 0 | LOG_WITH_PREFIX(INFO) << "Opening the test RocksDB at " << checkpoint_dir_for_test |
2821 | 0 | << ", expecting to see flushed frontier of " << frontier.ToString(); |
2822 | 0 | std::unique_ptr<rocksdb::DB> test_db = VERIFY_RESULT( |
2823 | 0 | rocksdb::DB::Open(rocksdb_options, checkpoint_dir_for_test)); |
2824 | 0 | LOG_WITH_PREFIX(INFO) << "Getting flushed frontier from test RocksDB at " |
2825 | 0 | << checkpoint_dir_for_test; |
2826 | 0 | auto restored_flushed_frontier = test_db->GetFlushedFrontier(); |
2827 | 0 | if (!restored_flushed_frontier) { |
2828 | 0 | LOG_WITH_PREFIX(FATAL) << LogPrefix() << "Restored flushed frontier not present"; |
2829 | 0 | } |
2830 | 0 | CHECK_EQ( |
2831 | 0 | frontier, |
2832 | 0 | down_cast<docdb::ConsensusFrontier&>(*restored_flushed_frontier)); |
2833 | 0 | LOG_WITH_PREFIX(INFO) << "Successfully verified persistently stored flushed frontier: " |
2834 | 0 | << frontier.ToString(); |
2835 | 0 | } |
2836 | | |
2837 | 160k | if (intents_db_) { |
2838 | | // It is OK to flush intents even if the regular DB is not yet flushed, |
2839 | | // because it would wait for flush of regular DB if we have unflushed intents. |
2840 | | // Otherwise it does not matter which flushed op id is stored. |
2841 | 82.1k | RETURN_NOT_OK(intents_db_->ModifyFlushedFrontier(frontier.Clone(), mode)); |
2842 | 82.1k | } |
2843 | | |
2844 | 160k | return Flush(FlushMode::kAsync); |
2845 | 160k | } |
2846 | | |
2847 | 159k | Status Tablet::Truncate(TruncateOperation* operation) { |
2848 | 159k | if (metadata_->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
2849 | | // We use only Raft log for transaction status table. |
2850 | 0 | return Status::OK(); |
2851 | 0 | } |
2852 | | |
2853 | 159k | auto op_pauses = VERIFY_RESULT(StartShutdownRocksDBs(DisableFlushOnShutdown::kTrue)); |
2854 | | |
2855 | | // Check if tablet is in shutdown mode. |
2856 | 159k | if (IsShutdownRequested()) { |
2857 | 1 | return STATUS(IllegalState, "Tablet was shut down"); |
2858 | 1 | } |
2859 | | |
2860 | 159k | const rocksdb::SequenceNumber sequence_number = regular_db_->GetLatestSequenceNumber(); |
2861 | 159k | const string db_dir = regular_db_->GetName(); |
2862 | | |
2863 | 159k | auto s = CompleteShutdownRocksDBs(Destroy::kTrue, &op_pauses); |
2864 | 159k | if (PREDICT_FALSE(!s.ok())) { |
2865 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to clean up db dir " << db_dir << ": " << s; |
2866 | 0 | return STATUS(IllegalState, "Failed to clean up db dir", s.ToString()); |
2867 | 0 | } |
2868 | | |
2869 | | // Create a new database. |
2870 | | // Note: db_dir == metadata()->rocksdb_dir() is still valid db dir. |
2871 | 159k | s = OpenKeyValueTablet(); |
2872 | 159k | if (PREDICT_FALSE(!s.ok())) { |
2873 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to create a new db: " << s; |
2874 | 0 | return s; |
2875 | 0 | } |
2876 | | |
2877 | 159k | docdb::ConsensusFrontier frontier; |
2878 | 159k | frontier.set_op_id(operation->op_id()); |
2879 | 159k | frontier.set_hybrid_time(operation->hybrid_time()); |
2880 | | // We use the kUpdate mode here, because unlike the case of restoring a snapshot to a completely |
2881 | | // different tablet in an arbitrary Raft group, here there is no possibility of the flushed |
2882 | | // frontier needing to go backwards. |
2883 | 159k | RETURN_NOT_OK(ModifyFlushedFrontier(frontier, rocksdb::FrontierModificationMode::kUpdate)); |
2884 | | |
2885 | 159k | LOG_WITH_PREFIX(INFO) << "Created new db for truncated tablet"; |
2886 | 159k | LOG_WITH_PREFIX(INFO) << "Sequence numbers: old=" << sequence_number |
2887 | 159k | << ", new=" << regular_db_->GetLatestSequenceNumber(); |
2888 | | // Ensure that op_pauses stays in scope throughout this function. |
2889 | 319k | for (auto* op_pause : op_pauses.AsArray()) { |
2890 | 319k | DFATAL_OR_RETURN_NOT_OK(op_pause->status()); |
2891 | 319k | } |
2892 | 159k | return DoEnableCompactions(); |
2893 | 159k | } |
2894 | | |
2895 | 6.07M | void Tablet::UpdateMonotonicCounter(int64_t value) { |
2896 | 6.07M | int64_t counter = monotonic_counter_; |
2897 | 6.07M | while (true) { |
2898 | 6.07M | if (counter >= value) { |
2899 | 6.07M | break; |
2900 | 6.07M | } |
2901 | 120 | if (monotonic_counter_.compare_exchange_weak(counter, value)) { |
2902 | 120 | break; |
2903 | 120 | } |
2904 | 72 | } |
2905 | 6.07M | } |
2906 | | |
2907 | | //////////////////////////////////////////////////////////// |
2908 | | // Tablet |
2909 | | //////////////////////////////////////////////////////////// |
2910 | | |
2911 | 89.1k | Result<bool> Tablet::HasSSTables() const { |
2912 | 89.1k | if (!regular_db_) { |
2913 | 29.1k | return false; |
2914 | 29.1k | } |
2915 | | |
2916 | 60.0k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
2917 | 60.0k | RETURN_NOT_OK(scoped_read_operation); |
2918 | | |
2919 | 60.0k | std::vector<rocksdb::LiveFileMetaData> live_files_metadata; |
2920 | 60.0k | regular_db_->GetLiveFilesMetaData(&live_files_metadata); |
2921 | 60.0k | return !live_files_metadata.empty(); |
2922 | 60.0k | } |
2923 | | |
2924 | 7.43M | yb::OpId MaxPersistentOpIdForDb(rocksdb::DB* db, bool invalid_if_no_new_data) { |
2925 | | // A possible race condition could happen, when data is written between this query and |
2926 | | // actual log gc. But it is not a problem as long as we are reading committed op id |
2927 | | // before MaxPersistentOpId, since we always keep last committed entry in the log during garbage |
2928 | | // collection. |
2929 | | // See TabletPeer::GetEarliestNeededLogIndex |
2930 | 7.43M | if (db == nullptr || |
2931 | 5.44M | (invalid_if_no_new_data && |
2932 | 5.44M | db->GetFlushAbility() == rocksdb::FlushAbility::kNoNewData)) { |
2933 | 5.44M | return yb::OpId::Invalid(); |
2934 | 5.44M | } |
2935 | | |
2936 | 1.99M | rocksdb::UserFrontierPtr frontier = db->GetFlushedFrontier(); |
2937 | 1.99M | if (!frontier) { |
2938 | 1.57M | return yb::OpId(); |
2939 | 1.57M | } |
2940 | | |
2941 | 413k | return down_cast<docdb::ConsensusFrontier*>(frontier.get())->op_id(); |
2942 | 413k | } |
2943 | | |
2944 | 3.71M | Result<DocDbOpIds> Tablet::MaxPersistentOpId(bool invalid_if_no_new_data) const { |
2945 | 3.71M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
2946 | 3.71M | RETURN_NOT_OK(scoped_read_operation); |
2947 | | |
2948 | 3.71M | return DocDbOpIds{ |
2949 | 3.71M | MaxPersistentOpIdForDb(regular_db_.get(), invalid_if_no_new_data), |
2950 | 3.71M | MaxPersistentOpIdForDb(intents_db_.get(), invalid_if_no_new_data) |
2951 | 3.71M | }; |
2952 | 3.71M | } |
2953 | | |
2954 | 3.71M | void Tablet::FlushIntentsDbIfNecessary(const yb::OpId& lastest_log_entry_op_id) { |
2955 | 3.71M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
2956 | 3.71M | if (!scoped_read_operation.ok()) { |
2957 | 1 | return; |
2958 | 1 | } |
2959 | | |
2960 | 3.71M | auto intents_frontier = intents_db_ |
2961 | 1.99M | ? MemTableFrontierFromDb(intents_db_.get(), rocksdb::UpdateUserValueType::kLargest) : nullptr; |
2962 | 3.71M | if (intents_frontier) { |
2963 | 426k | auto index_delta = |
2964 | 426k | lastest_log_entry_op_id.index - |
2965 | 426k | down_cast<docdb::ConsensusFrontier*>(intents_frontier.get())->op_id().index; |
2966 | 426k | if (index_delta > FLAGS_num_raft_ops_to_force_idle_intents_db_to_flush) { |
2967 | 2 | auto intents_flush_ability = intents_db_->GetFlushAbility(); |
2968 | 2 | if (intents_flush_ability == rocksdb::FlushAbility::kHasNewData) { |
2969 | 2 | LOG_WITH_PREFIX(INFO) |
2970 | 2 | << "Force flushing intents DB since it was not flushed for " << index_delta |
2971 | 2 | << " operations, while only " |
2972 | 2 | << FLAGS_num_raft_ops_to_force_idle_intents_db_to_flush << " is allowed"; |
2973 | 2 | rocksdb::FlushOptions options; |
2974 | 2 | options.wait = false; |
2975 | 2 | WARN_NOT_OK(intents_db_->Flush(options), "Flush intents db failed"); |
2976 | 2 | } |
2977 | 2 | } |
2978 | 426k | } |
2979 | 3.71M | } |
2980 | | |
2981 | 8.22M | bool Tablet::IsTransactionalRequest(bool is_ysql_request) const { |
2982 | | // We consider all YSQL tables within the sys catalog transactional. |
2983 | 8.22M | return txns_enabled_ && ( |
2984 | 8.22M | schema()->table_properties().is_transactional() || |
2985 | 7.78M | (is_sys_catalog_ && is_ysql_request)); |
2986 | 8.22M | } |
2987 | | |
2988 | 1.73k | Result<HybridTime> Tablet::MaxPersistentHybridTime() const { |
2989 | 1.73k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
2990 | 1.73k | RETURN_NOT_OK(scoped_read_operation); |
2991 | | |
2992 | 1.73k | if (!regular_db_) { |
2993 | 192 | return HybridTime::kMin; |
2994 | 192 | } |
2995 | | |
2996 | 1.53k | HybridTime result = HybridTime::kMin; |
2997 | 1.53k | auto temp = regular_db_->GetFlushedFrontier(); |
2998 | 1.53k | if (temp) { |
2999 | 686 | result.MakeAtLeast(down_cast<docdb::ConsensusFrontier*>(temp.get())->hybrid_time()); |
3000 | 686 | } |
3001 | 1.53k | if (intents_db_) { |
3002 | 133 | temp = intents_db_->GetFlushedFrontier(); |
3003 | 133 | if (temp) { |
3004 | 64 | result.MakeAtLeast(down_cast<docdb::ConsensusFrontier*>(temp.get())->hybrid_time()); |
3005 | 64 | } |
3006 | 133 | } |
3007 | 1.53k | return result; |
3008 | 1.53k | } |
3009 | | |
3010 | 2.53k | Result<HybridTime> Tablet::OldestMutableMemtableWriteHybridTime() const { |
3011 | 2.53k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
3012 | 2.53k | RETURN_NOT_OK(scoped_read_operation); |
3013 | | |
3014 | 2.53k | HybridTime result = HybridTime::kMax; |
3015 | 5.07k | for (auto* db : { regular_db_.get(), intents_db_.get() }) { |
3016 | 5.07k | if (db) { |
3017 | 1.70k | auto mem_frontier = MemTableFrontierFromDb(db, rocksdb::UpdateUserValueType::kSmallest); |
3018 | 1.70k | if (mem_frontier) { |
3019 | 820 | const auto hybrid_time = |
3020 | 820 | static_cast<const docdb::ConsensusFrontier&>(*mem_frontier).hybrid_time(); |
3021 | 820 | result = std::min(result, hybrid_time); |
3022 | 820 | } |
3023 | 1.70k | } |
3024 | 5.07k | } |
3025 | 2.53k | return result; |
3026 | 2.53k | } |
3027 | | |
3028 | 8.52M | const yb::SchemaPtr Tablet::schema() const { |
3029 | 8.52M | return metadata_->schema(); |
3030 | 8.52M | } |
3031 | | |
3032 | 0 | Status Tablet::DebugDump(vector<string> *lines) { |
3033 | 0 | switch (table_type_) { |
3034 | 0 | case TableType::PGSQL_TABLE_TYPE: FALLTHROUGH_INTENDED; |
3035 | 0 | case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED; |
3036 | 0 | case TableType::REDIS_TABLE_TYPE: |
3037 | 0 | DocDBDebugDump(lines); |
3038 | 0 | return Status::OK(); |
3039 | 0 | case TableType::TRANSACTION_STATUS_TABLE_TYPE: |
3040 | 0 | return Status::OK(); |
3041 | 0 | } |
3042 | 0 | FATAL_INVALID_ENUM_VALUE(TableType, table_type_); |
3043 | 0 | } |
3044 | | |
3045 | 0 | void Tablet::DocDBDebugDump(vector<string> *lines) { |
3046 | 0 | LOG_STRING(INFO, lines) << "Dumping tablet:"; |
3047 | 0 | LOG_STRING(INFO, lines) << "---------------------------"; |
3048 | 0 | docdb::DocDBDebugDump(regular_db_.get(), LOG_STRING(INFO, lines), docdb::StorageDbType::kRegular); |
3049 | 0 | } |
3050 | | |
3051 | 0 | Status Tablet::TEST_SwitchMemtable() { |
3052 | 0 | auto scoped_operation = CreateNonAbortableScopedRWOperation(); |
3053 | 0 | RETURN_NOT_OK(scoped_operation); |
3054 | |
|
3055 | 0 | if (regular_db_) { |
3056 | 0 | regular_db_->TEST_SwitchMemtable(); |
3057 | 0 | } else { |
3058 | 0 | LOG_WITH_PREFIX(INFO) << "Ignoring TEST_SwitchMemtable: no regular RocksDB"; |
3059 | 0 | } |
3060 | 0 | return Status::OK(); |
3061 | 0 | } |
3062 | | |
3063 | | Result<HybridTime> Tablet::DoGetSafeTime( |
3064 | 4.90M | RequireLease require_lease, HybridTime min_allowed, CoarseTimePoint deadline) const { |
3065 | 4.90M | if (require_lease == RequireLease::kFalse) { |
3066 | 118k | return mvcc_.SafeTimeForFollower(min_allowed, deadline); |
3067 | 118k | } |
3068 | 4.78M | FixedHybridTimeLease ht_lease; |
3069 | 4.78M | if (ht_lease_provider_) { |
3070 | | // This will block until a leader lease reaches the given value or a timeout occurs. |
3071 | 4.76M | auto ht_lease_result = ht_lease_provider_(min_allowed, deadline); |
3072 | 4.76M | if (!ht_lease_result.ok()) { |
3073 | 5 | if (require_lease == RequireLease::kFallbackToFollower && |
3074 | 0 | ht_lease_result.status().IsIllegalState()) { |
3075 | 0 | return mvcc_.SafeTimeForFollower(min_allowed, deadline); |
3076 | 0 | } |
3077 | 5 | return ht_lease_result.status(); |
3078 | 5 | } |
3079 | 4.76M | ht_lease = *ht_lease_result; |
3080 | 4.76M | if (min_allowed > ht_lease.time) { |
3081 | 0 | return STATUS_FORMAT( |
3082 | 0 | InternalError, "Read request hybrid time after current time: $0, lease: $1", |
3083 | 0 | min_allowed, ht_lease); |
3084 | 0 | } |
3085 | 14.6k | } else if (min_allowed) { |
3086 | 1.01k | RETURN_NOT_OK(WaitUntil(clock_.get(), min_allowed, deadline)); |
3087 | 1.01k | } |
3088 | 4.78M | if (min_allowed > ht_lease.lease) { |
3089 | 0 | return STATUS_FORMAT( |
3090 | 0 | InternalError, "Read request hybrid time after leader lease: $0, lease: $1", |
3091 | 0 | min_allowed, ht_lease); |
3092 | 0 | } |
3093 | 4.78M | return mvcc_.SafeTime(min_allowed, deadline, ht_lease); |
3094 | 4.78M | } |
3095 | | |
3096 | 18.0k | ScopedRWOperationPause Tablet::PauseWritePermits(CoarseTimePoint deadline) { |
3097 | 18.0k | TRACE("Blocking write permit(s)"); |
3098 | 18.2k | auto se = ScopeExit([] { TRACE("Blocking write permit(s) done"); }); |
3099 | | // Prevent new write ops from being submitted. |
3100 | 18.0k | return ScopedRWOperationPause(&write_ops_being_submitted_counter_, deadline, Stop::kFalse); |
3101 | 18.0k | } |
3102 | | |
3103 | 1.74M | ScopedRWOperation Tablet::GetPermitToWrite(CoarseTimePoint deadline) { |
3104 | 1.74M | TRACE("Acquiring write permit"); |
3105 | 1.75M | auto se = ScopeExit([] { TRACE("Acquiring write permit done"); }); |
3106 | 1.74M | return ScopedRWOperation(&write_ops_being_submitted_counter_); |
3107 | 1.74M | } |
3108 | | |
3109 | 722k | Result<bool> Tablet::StillHasOrphanedPostSplitData() { |
3110 | 722k | auto scoped_operation = CreateNonAbortableScopedRWOperation(); |
3111 | 722k | RETURN_NOT_OK(scoped_operation); |
3112 | 722k | return doc_db().key_bounds->IsInitialized() && !metadata()->has_been_fully_compacted(); |
3113 | 722k | } |
3114 | | |
3115 | 295k | bool Tablet::MayHaveOrphanedPostSplitData() { |
3116 | 295k | auto res = StillHasOrphanedPostSplitData(); |
3117 | 295k | if (!res.ok()) { |
3118 | 223 | LOG(WARNING) << "Failed to call StillHasOrphanedPostSplitData: " << res.ToString(); |
3119 | 223 | return true; |
3120 | 223 | } |
3121 | 295k | return res.get(); |
3122 | 295k | } |
3123 | | |
3124 | 344k | bool Tablet::ShouldDisableLbMove() { |
3125 | 344k | auto still_has_parent_data_result = StillHasOrphanedPostSplitData(); |
3126 | 344k | if (still_has_parent_data_result.ok()) { |
3127 | 344k | return still_has_parent_data_result.get(); |
3128 | 344k | } |
3129 | | // If this call failed, one of three things may be true: |
3130 | | // 1. We are in the middle of a tablet shutdown. |
3131 | | // |
3132 | | // In this case, what we report is not of much consequence, as the load balancer shouldn't try to |
3133 | | // move us anyways. We choose to return false. |
3134 | | // |
3135 | | // 2. We are in the middle of a TRUNCATE. |
3136 | | // |
3137 | | // In this case, any concurrent attempted LB move should fail before trying to move data, |
3138 | | // since the RocksDB instances are destroyed. On top of that, we do want to allow the LB to move |
3139 | | // this tablet after the TRUNCATE completes, so we should return false. |
3140 | | // |
3141 | | // 3. We are in the middle of an AlterSchema operation. This is only true for tablets belonging to |
3142 | | // colocated tables. |
3143 | | // |
3144 | | // In this case, we want to disable tablet moves. We conservatively return true for any failure |
3145 | | // if the tablet is part of a colocated table. |
3146 | 3 | return metadata_->schema()->has_pgtable_id(); |
3147 | 3 | } |
3148 | | |
3149 | 6 | void Tablet::ForceRocksDBCompactInTest() { |
3150 | 6 | CHECK_OK(ForceFullRocksDBCompact()); |
3151 | 6 | } |
3152 | | |
3153 | 90 | Status Tablet::ForceFullRocksDBCompact() { |
3154 | 90 | auto scoped_operation = CreateAbortableScopedRWOperation(); |
3155 | 90 | RETURN_NOT_OK(scoped_operation); |
3156 | | |
3157 | 90 | if (regular_db_) { |
3158 | 90 | RETURN_NOT_OK(docdb::ForceRocksDBCompact(regular_db_.get())); |
3159 | 90 | } |
3160 | 90 | if (intents_db_) { |
3161 | 44 | RETURN_NOT_OK_PREPEND( |
3162 | 44 | intents_db_->Flush(rocksdb::FlushOptions()), "Pre-compaction flush of intents db failed"); |
3163 | 44 | RETURN_NOT_OK(docdb::ForceRocksDBCompact(intents_db_.get())); |
3164 | 44 | } |
3165 | 90 | return Status::OK(); |
3166 | 90 | } |
3167 | | |
3168 | 7 | std::string Tablet::TEST_DocDBDumpStr(IncludeIntents include_intents) { |
3169 | 7 | if (!regular_db_) return ""; |
3170 | | |
3171 | 7 | if (!include_intents) { |
3172 | 0 | return docdb::DocDBDebugDumpToStr(doc_db().WithoutIntents()); |
3173 | 0 | } |
3174 | | |
3175 | 7 | return docdb::DocDBDebugDumpToStr(doc_db()); |
3176 | 7 | } |
3177 | | |
3178 | | void Tablet::TEST_DocDBDumpToContainer( |
3179 | 7 | IncludeIntents include_intents, std::unordered_set<std::string>* out) { |
3180 | 7 | if (!regular_db_) return; |
3181 | | |
3182 | 7 | if (!include_intents) { |
3183 | 0 | return docdb::DocDBDebugDumpToContainer(doc_db().WithoutIntents(), out); |
3184 | 0 | } |
3185 | | |
3186 | 7 | return docdb::DocDBDebugDumpToContainer(doc_db(), out); |
3187 | 7 | } |
3188 | | |
3189 | 0 | void Tablet::TEST_DocDBDumpToLog(IncludeIntents include_intents) { |
3190 | 0 | if (!regular_db_) { |
3191 | 0 | LOG_WITH_PREFIX(INFO) << "No RocksDB to dump"; |
3192 | 0 | return; |
3193 | 0 | } |
3194 | | |
3195 | 0 | docdb::DumpRocksDBToLog(regular_db_.get(), StorageDbType::kRegular, LogPrefix()); |
3196 | |
|
3197 | 0 | if (include_intents && intents_db_) { |
3198 | 0 | docdb::DumpRocksDBToLog(intents_db_.get(), StorageDbType::kIntents, LogPrefix()); |
3199 | 0 | } |
3200 | 0 | } |
3201 | | |
3202 | 0 | size_t Tablet::TEST_CountRegularDBRecords() { |
3203 | 0 | if (!regular_db_) return 0; |
3204 | 0 | rocksdb::ReadOptions read_opts; |
3205 | 0 | read_opts.query_id = rocksdb::kDefaultQueryId; |
3206 | 0 | docdb::BoundedRocksDbIterator iter(regular_db_.get(), read_opts, &key_bounds_); |
3207 | |
|
3208 | 0 | size_t result = 0; |
3209 | 0 | for (iter.SeekToFirst(); iter.Valid(); iter.Next()) { |
3210 | 0 | ++result; |
3211 | 0 | } |
3212 | 0 | return result; |
3213 | 0 | } |
3214 | | |
3215 | | template <class F> |
3216 | 24.0M | auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const { |
3217 | 24.0M | auto scoped_operation = CreateNonAbortableScopedRWOperation(); |
3218 | 24.0M | std::lock_guard<rw_spinlock> lock(component_lock_); |
3219 | | |
3220 | | // In order to get actual stats we would have to wait. |
3221 | | // This would give us correct stats but would make this request slower. |
3222 | 24.0M | if (!scoped_operation.ok() || !regular_db_) { |
3223 | 5.73M | return default_value; |
3224 | 5.73M | } |
3225 | 18.2M | return func(); |
3226 | 18.2M | } tablet.cc:_ZNK2yb6tablet6Tablet16GetRegularDbStatIZNKS1_29GetCurrentVersionSstFilesSizeEvE3$_3EEDaRKT_RKDTclfL0p_EE Line | Count | Source | 3216 | 7.29k | auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const { | 3217 | 7.29k | auto scoped_operation = CreateNonAbortableScopedRWOperation(); | 3218 | 7.29k | std::lock_guard<rw_spinlock> lock(component_lock_); | 3219 | | | 3220 | | // In order to get actual stats we would have to wait. | 3221 | | // This would give us correct stats but would make this request slower. | 3222 | 7.29k | if (!scoped_operation.ok() || !regular_db_) { | 3223 | 2.75k | return default_value; | 3224 | 2.75k | } | 3225 | 4.54k | return func(); | 3226 | 4.54k | } |
tablet.cc:_ZNK2yb6tablet6Tablet16GetRegularDbStatIZNKS1_41GetCurrentVersionSstFilesUncompressedSizeEvE3$_4EEDaRKT_RKDTclfL0p_EE Line | Count | Source | 3216 | 7.29k | auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const { | 3217 | 7.29k | auto scoped_operation = CreateNonAbortableScopedRWOperation(); | 3218 | 7.29k | std::lock_guard<rw_spinlock> lock(component_lock_); | 3219 | | | 3220 | | // In order to get actual stats we would have to wait. | 3221 | | // This would give us correct stats but would make this request slower. | 3222 | 7.29k | if (!scoped_operation.ok() || !regular_db_) { | 3223 | 2.75k | return default_value; | 3224 | 2.75k | } | 3225 | 4.54k | return func(); | 3226 | 4.54k | } |
tablet.cc:_ZNK2yb6tablet6Tablet16GetRegularDbStatIZNKS1_33GetCurrentVersionSstFilesAllSizesEvE3$_5EEDaRKT_RKDTclfL0p_EE Line | Count | Source | 3216 | 295k | auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const { | 3217 | 295k | auto scoped_operation = CreateNonAbortableScopedRWOperation(); | 3218 | 295k | std::lock_guard<rw_spinlock> lock(component_lock_); | 3219 | | | 3220 | | // In order to get actual stats we would have to wait. | 3221 | | // This would give us correct stats but would make this request slower. | 3222 | 295k | if (!scoped_operation.ok() || !regular_db_) { | 3223 | 132k | return default_value; | 3224 | 132k | } | 3225 | 163k | return func(); | 3226 | 163k | } |
tablet.cc:_ZNK2yb6tablet6Tablet16GetRegularDbStatIZNKS1_28GetCurrentVersionNumSSTFilesEvE3$_6EEDaRKT_RKDTclfL0p_EE Line | Count | Source | 3216 | 23.7M | auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const { | 3217 | 23.7M | auto scoped_operation = CreateNonAbortableScopedRWOperation(); | 3218 | 23.7M | std::lock_guard<rw_spinlock> lock(component_lock_); | 3219 | | | 3220 | | // In order to get actual stats we would have to wait. | 3221 | | // This would give us correct stats but would make this request slower. | 3222 | 23.7M | if (!scoped_operation.ok() || !regular_db_) { | 3223 | 5.59M | return default_value; | 3224 | 5.59M | } | 3225 | 18.1M | return func(); | 3226 | 18.1M | } |
|
3227 | | |
3228 | 7.29k | uint64_t Tablet::GetCurrentVersionSstFilesSize() const { |
3229 | 4.54k | return GetRegularDbStat([this] { |
3230 | 4.54k | return regular_db_->GetCurrentVersionSstFilesSize(); |
3231 | 4.54k | }, 0); |
3232 | 7.29k | } |
3233 | | |
3234 | 7.29k | uint64_t Tablet::GetCurrentVersionSstFilesUncompressedSize() const { |
3235 | 4.54k | return GetRegularDbStat([this] { |
3236 | 4.54k | return regular_db_->GetCurrentVersionSstFilesUncompressedSize(); |
3237 | 4.54k | }, 0); |
3238 | 7.29k | } |
3239 | | |
3240 | 295k | std::pair<uint64_t, uint64_t> Tablet::GetCurrentVersionSstFilesAllSizes() const { |
3241 | 163k | return GetRegularDbStat([this] { |
3242 | 163k | return regular_db_->GetCurrentVersionSstFilesAllSizes(); |
3243 | 163k | }, std::pair<uint64_t, uint64_t>(0, 0)); |
3244 | 295k | } |
3245 | | |
3246 | 23.7M | uint64_t Tablet::GetCurrentVersionNumSSTFiles() const { |
3247 | 18.1M | return GetRegularDbStat([this] { |
3248 | 18.1M | return regular_db_->GetCurrentVersionNumSSTFiles(); |
3249 | 18.1M | }, 0); |
3250 | 23.7M | } |
3251 | | |
3252 | 3.74k | std::pair<int, int> Tablet::GetNumMemtables() const { |
3253 | 3.74k | int intents_num_memtables = 0; |
3254 | 3.74k | int regular_num_memtables = 0; |
3255 | | |
3256 | 3.74k | { |
3257 | 3.74k | auto scoped_operation = CreateNonAbortableScopedRWOperation(); |
3258 | 3.74k | std::lock_guard<rw_spinlock> lock(component_lock_); |
3259 | 3.74k | if (intents_db_) { |
3260 | | // NOTE: 1 is added on behalf of cfd->mem(). |
3261 | 1.04k | intents_num_memtables = 1 + intents_db_->GetCfdImmNumNotFlushed(); |
3262 | 1.04k | } |
3263 | 3.74k | if (regular_db_) { |
3264 | | // NOTE: 1 is added on behalf of cfd->mem(). |
3265 | 1.34k | regular_num_memtables = 1 + regular_db_->GetCfdImmNumNotFlushed(); |
3266 | 1.34k | } |
3267 | 3.74k | } |
3268 | | |
3269 | 3.74k | return std::make_pair(intents_num_memtables, regular_num_memtables); |
3270 | 3.74k | } |
3271 | | |
3272 | | // ------------------------------------------------------------------------------------------------ |
3273 | | |
3274 | | Result<TransactionOperationContext> Tablet::CreateTransactionOperationContext( |
3275 | | const TransactionMetadataPB& transaction_metadata, |
3276 | | bool is_ysql_catalog_table, |
3277 | 6.86M | const SubTransactionMetadataPB* subtransaction_metadata) const { |
3278 | 6.86M | if (!txns_enabled_) |
3279 | 101k | return TransactionOperationContext(); |
3280 | | |
3281 | 6.76M | if (transaction_metadata.has_transaction_id()) { |
3282 | 1.70M | Result<TransactionId> txn_id = FullyDecodeTransactionId( |
3283 | 1.70M | transaction_metadata.transaction_id()); |
3284 | 1.70M | RETURN_NOT_OK(txn_id); |
3285 | 1.70M | return CreateTransactionOperationContext( |
3286 | 1.70M | boost::make_optional(*txn_id), is_ysql_catalog_table, subtransaction_metadata); |
3287 | 5.06M | } else { |
3288 | 5.06M | return CreateTransactionOperationContext( |
3289 | 5.06M | /* transaction_id */ boost::none, is_ysql_catalog_table, subtransaction_metadata); |
3290 | 5.06M | } |
3291 | 6.76M | } |
3292 | | |
3293 | | Result<TransactionOperationContext> Tablet::CreateTransactionOperationContext( |
3294 | | const boost::optional<TransactionId>& transaction_id, |
3295 | | bool is_ysql_catalog_table, |
3296 | 6.89M | const SubTransactionMetadataPB* subtransaction_metadata) const { |
3297 | 6.89M | if (!txns_enabled_) { |
3298 | 263 | return TransactionOperationContext(); |
3299 | 263 | } |
3300 | | |
3301 | 6.89M | const TransactionId* txn_id = nullptr; |
3302 | | |
3303 | 6.89M | if (transaction_id.is_initialized()) { |
3304 | 1.70M | txn_id = transaction_id.get_ptr(); |
3305 | 5.19M | } else if (metadata_->schema()->table_properties().is_transactional() || is_ysql_catalog_table) { |
3306 | | // deadbeef-dead-beef-dead-beef00000075 |
3307 | 530k | static const TransactionId kArbitraryTxnIdForNonTxnReads( |
3308 | 530k | 17275436393656397278ULL, 8430738506459819486ULL); |
3309 | | // We still need context with transaction participant in order to resolve intents during |
3310 | | // possible reads. |
3311 | 530k | txn_id = &kArbitraryTxnIdForNonTxnReads; |
3312 | 4.66M | } else { |
3313 | 4.66M | return TransactionOperationContext(); |
3314 | 4.66M | } |
3315 | | |
3316 | 2.23M | if (!subtransaction_metadata) { |
3317 | 129k | return TransactionOperationContext(*txn_id, transaction_participant()); |
3318 | 129k | } |
3319 | | |
3320 | 2.10M | auto subtxn = VERIFY_RESULT(SubTransactionMetadata::FromPB(*subtransaction_metadata)); |
3321 | 2.10M | return TransactionOperationContext(*txn_id, std::move(subtxn), transaction_participant()); |
3322 | 2.10M | } |
3323 | | |
3324 | | Status Tablet::CreateReadIntents( |
3325 | | const TransactionMetadataPB& transaction_metadata, |
3326 | | const SubTransactionMetadataPB& subtransaction_metadata, |
3327 | | const google::protobuf::RepeatedPtrField<QLReadRequestPB>& ql_batch, |
3328 | | const google::protobuf::RepeatedPtrField<PgsqlReadRequestPB>& pgsql_batch, |
3329 | 140k | docdb::KeyValueWriteBatchPB* write_batch) { |
3330 | 140k | auto txn_op_ctx = VERIFY_RESULT(CreateTransactionOperationContext( |
3331 | 140k | transaction_metadata, |
3332 | 140k | /* is_ysql_catalog_table */ pgsql_batch.size() > 0 && is_sys_catalog_, |
3333 | 140k | &subtransaction_metadata)); |
3334 | | |
3335 | 0 | for (const auto& ql_read : ql_batch) { |
3336 | 0 | docdb::QLReadOperation doc_op(ql_read, txn_op_ctx); |
3337 | 0 | RETURN_NOT_OK(doc_op.GetIntents(*GetSchema(), write_batch)); |
3338 | 0 | } |
3339 | | |
3340 | 1.33M | for (const auto& pgsql_read : pgsql_batch) { |
3341 | 1.33M | docdb::PgsqlReadOperation doc_op(pgsql_read, txn_op_ctx); |
3342 | 1.33M | RETURN_NOT_OK(doc_op.GetIntents(*GetSchema(pgsql_read.table_id()), write_batch)); |
3343 | 1.33M | } |
3344 | | |
3345 | 140k | return Status::OK(); |
3346 | 140k | } |
3347 | | |
3348 | 2.93M | bool Tablet::ShouldApplyWrite() { |
3349 | 2.93M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
3350 | 2.93M | if (!scoped_read_operation.ok()) { |
3351 | 0 | return false; |
3352 | 0 | } |
3353 | | |
3354 | 2.93M | return !regular_db_->NeedsDelay(); |
3355 | 2.93M | } |
3356 | | |
3357 | 676k | Result<IsolationLevel> Tablet::GetIsolationLevel(const TransactionMetadataPB& transaction) { |
3358 | 676k | if (transaction.has_isolation()) { |
3359 | 443k | return transaction.isolation(); |
3360 | 443k | } |
3361 | 232k | return VERIFY_RESULT(transaction_participant_->PrepareMetadata(transaction)).isolation; |
3362 | 232k | } |
3363 | | |
3364 | | Result<RaftGroupMetadataPtr> Tablet::CreateSubtablet( |
3365 | | const TabletId& tablet_id, const Partition& partition, const docdb::KeyBounds& key_bounds, |
3366 | 93 | const yb::OpId& split_op_id, const HybridTime& split_op_hybrid_time) { |
3367 | 93 | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
3368 | 93 | RETURN_NOT_OK(scoped_read_operation); |
3369 | | |
3370 | 93 | RETURN_NOT_OK(Flush(FlushMode::kSync)); |
3371 | | |
3372 | 93 | auto metadata = VERIFY_RESULT(metadata_->CreateSubtabletMetadata( |
3373 | 93 | tablet_id, partition, key_bounds.lower.ToStringBuffer(), key_bounds.upper.ToStringBuffer())); |
3374 | | |
3375 | 93 | RETURN_NOT_OK(snapshots_->CreateCheckpoint( |
3376 | 93 | metadata->rocksdb_dir(), CreateIntentsCheckpointIn::kSubDir)); |
3377 | | |
3378 | | // We want flushed frontier to cover split_op_id, so during bootstrap of after-split tablets |
3379 | | // we don't replay split operation. |
3380 | 93 | docdb::ConsensusFrontier frontier; |
3381 | 93 | frontier.set_op_id(split_op_id); |
3382 | 93 | frontier.set_hybrid_time(split_op_hybrid_time); |
3383 | | |
3384 | 93 | struct RocksDbDirWithType { |
3385 | 93 | std::string db_dir; |
3386 | 93 | docdb::StorageDbType db_type; |
3387 | 93 | }; |
3388 | 93 | boost::container::static_vector<RocksDbDirWithType, 2> subtablet_rocksdbs( |
3389 | 93 | {{ metadata->rocksdb_dir(), docdb::StorageDbType::kRegular }}); |
3390 | 93 | if (intents_db_) { |
3391 | 47 | subtablet_rocksdbs.push_back( |
3392 | 47 | { metadata->intents_rocksdb_dir(), docdb::StorageDbType::kIntents }); |
3393 | 47 | } |
3394 | 140 | for (auto rocksdb : subtablet_rocksdbs) { |
3395 | 140 | rocksdb::Options rocksdb_options; |
3396 | 140 | docdb::InitRocksDBOptions( |
3397 | 140 | &rocksdb_options, MakeTabletLogPrefix(tablet_id, log_prefix_suffix_, rocksdb.db_type), |
3398 | 140 | /* statistics */ nullptr, tablet_options_); |
3399 | 140 | rocksdb_options.create_if_missing = false; |
3400 | | // Disable background compactions, we only need to update flushed frontier. |
3401 | 140 | rocksdb_options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone; |
3402 | 140 | std::unique_ptr<rocksdb::DB> db = |
3403 | 140 | VERIFY_RESULT(rocksdb::DB::Open(rocksdb_options, rocksdb.db_dir)); |
3404 | 140 | RETURN_NOT_OK( |
3405 | 140 | db->ModifyFlushedFrontier(frontier.Clone(), rocksdb::FrontierModificationMode::kUpdate)); |
3406 | 140 | } |
3407 | 93 | return metadata; |
3408 | 93 | } |
3409 | | |
3410 | 0 | Result<int64_t> Tablet::CountIntents() { |
3411 | 0 | auto pending_op = CreateNonAbortableScopedRWOperation(); |
3412 | 0 | RETURN_NOT_OK(pending_op); |
3413 | |
|
3414 | 0 | if (!intents_db_) { |
3415 | 0 | return 0; |
3416 | 0 | } |
3417 | 0 | rocksdb::ReadOptions read_options; |
3418 | 0 | auto intent_iter = std::unique_ptr<rocksdb::Iterator>( |
3419 | 0 | intents_db_->NewIterator(read_options)); |
3420 | 0 | int64_t num_intents = 0; |
3421 | 0 | intent_iter->SeekToFirst(); |
3422 | 0 | while (intent_iter->Valid()) { |
3423 | 0 | num_intents++; |
3424 | 0 | intent_iter->Next(); |
3425 | 0 | } |
3426 | 0 | return num_intents; |
3427 | 0 | } |
3428 | | |
3429 | 136k | void Tablet::ListenNumSSTFilesChanged(std::function<void()> listener) { |
3430 | 136k | std::lock_guard<std::mutex> lock(num_sst_files_changed_listener_mutex_); |
3431 | 136k | bool has_new_listener = listener != nullptr; |
3432 | 136k | bool has_old_listener = num_sst_files_changed_listener_ != nullptr; |
3433 | 28 | LOG_IF_WITH_PREFIX(DFATAL, has_new_listener == has_old_listener) |
3434 | 28 | << __func__ << " in wrong state, has_old_listener: " << has_old_listener; |
3435 | 136k | num_sst_files_changed_listener_ = std::move(listener); |
3436 | 136k | } |
3437 | | |
3438 | | void Tablet::InitRocksDBOptions( |
3439 | | rocksdb::Options* options, const std::string& log_prefix, |
3440 | 382k | rocksdb::BlockBasedTableOptions table_options) { |
3441 | 382k | docdb::InitRocksDBOptions( |
3442 | 382k | options, log_prefix, regulardb_statistics_, tablet_options_, std::move(table_options)); |
3443 | 382k | } |
3444 | | |
3445 | 1.72k | rocksdb::Env& Tablet::rocksdb_env() const { |
3446 | 1.72k | return *tablet_options_.rocksdb_env; |
3447 | 1.72k | } |
3448 | | |
3449 | 2.86M | const std::string& Tablet::tablet_id() const { |
3450 | 2.86M | return metadata_->raft_group_id(); |
3451 | 2.86M | } |
3452 | | |
3453 | 47 | Result<std::string> Tablet::GetEncodedMiddleSplitKey() const { |
3454 | 0 | auto error_prefix = [this]() { |
3455 | 0 | return Format( |
3456 | 0 | "Failed to detect middle key for tablet $0 (key_bounds: $1 - $2)", |
3457 | 0 | tablet_id(), |
3458 | 0 | Slice(key_bounds_.lower).ToDebugHexString(), |
3459 | 0 | Slice(key_bounds_.upper).ToDebugHexString()); |
3460 | 0 | }; |
3461 | | |
3462 | | // TODO(tsplit): should take key_bounds_ into account. |
3463 | 46 | auto middle_key = VERIFY_RESULT(regular_db_->GetMiddleKey()); |
3464 | | |
3465 | | // In some rare cases middle key can point to a special internal record which is not visible |
3466 | | // for a user, but tablet splitting routines expect the specific structure for partition keys |
3467 | | // that does not match the struct of the internally used records. Moreover, it is expected |
3468 | | // to have two child tablets with alive user records after the splitting, but the split |
3469 | | // by the internal record will lead to a case when one tablet will consist of internal records |
3470 | | // only and these records will be compacted out at some point making an empty tablet. |
3471 | 46 | if (PREDICT_FALSE(docdb::IsInternalRecordKeyType(docdb::DecodeValueType(middle_key[0])))) { |
3472 | 0 | return STATUS_FORMAT( |
3473 | 0 | IllegalState, "$0: got internal record \"$1\"", |
3474 | 0 | error_prefix(), Slice(middle_key).ToDebugHexString()); |
3475 | 0 | } |
3476 | | |
3477 | 46 | const auto key_part = metadata()->partition_schema()->IsHashPartitioning() |
3478 | 46 | ? docdb::DocKeyPart::kUpToHashCode |
3479 | 0 | : docdb::DocKeyPart::kWholeDocKey; |
3480 | 46 | const auto split_key_size = VERIFY_RESULT(DocKey::EncodedSize(middle_key, key_part)); |
3481 | 46 | if (PREDICT_FALSE(split_key_size == 0)) { |
3482 | | // Using this verification just to have a more sensible message. The below verification will |
3483 | | // not pass with split_key_size == 0 also, but its message is not accurate enough. This failure |
3484 | | // may happen when a key cannot be decoded with key_part inside DocKey::EncodedSize and the key |
3485 | | // still valid for any reason (e.g. gettining non-hash key for hash partitioning). |
3486 | 0 | return STATUS_FORMAT( |
3487 | 0 | IllegalState, "$0: got unexpected key \"$1\"", |
3488 | 0 | error_prefix(), Slice(middle_key).ToDebugHexString()); |
3489 | 0 | } |
3490 | | |
3491 | 46 | middle_key.resize(split_key_size); |
3492 | 46 | const Slice middle_key_slice(middle_key); |
3493 | 46 | if (middle_key_slice.compare(key_bounds_.lower) <= 0 || |
3494 | 46 | (!key_bounds_.upper.empty() && middle_key_slice.compare(key_bounds_.upper) >= 0)) { |
3495 | 0 | return STATUS_FORMAT( |
3496 | 0 | IllegalState, |
3497 | 0 | "$0: got \"$1\". This can happen if post-split tablet wasn't fully compacted after split", |
3498 | 0 | error_prefix(), middle_key_slice.ToDebugHexString()); |
3499 | 0 | } |
3500 | 46 | return middle_key; |
3501 | 46 | } |
3502 | | |
3503 | | Status Tablet::TriggerPostSplitCompactionIfNeeded( |
3504 | 83.3k | std::function<std::unique_ptr<ThreadPoolToken>()> get_token_for_compaction) { |
3505 | 83.3k | if (post_split_compaction_task_pool_token_) { |
3506 | 0 | return STATUS( |
3507 | 0 | IllegalState, "Already triggered post split compaction for this tablet instance."); |
3508 | 0 | } |
3509 | 83.3k | if (VERIFY_RESULT(StillHasOrphanedPostSplitData())) { |
3510 | 83 | post_split_compaction_task_pool_token_ = get_token_for_compaction(); |
3511 | 83 | return post_split_compaction_task_pool_token_->SubmitFunc( |
3512 | 83 | std::bind(&Tablet::TriggerPostSplitCompactionSync, this)); |
3513 | 83 | } |
3514 | 83.3k | return Status::OK(); |
3515 | 83.3k | } |
3516 | | |
3517 | 84 | void Tablet::TriggerPostSplitCompactionSync() { |
3518 | 84 | TEST_PAUSE_IF_FLAG(TEST_pause_before_post_split_compaction); |
3519 | 84 | WARN_WITH_PREFIX_NOT_OK( |
3520 | 84 | ForceFullRocksDBCompact(), LogPrefix() + "Failed to compact post-split tablet."); |
3521 | 84 | } |
3522 | | |
3523 | 3 | Status Tablet::VerifyDataIntegrity() { |
3524 | 3 | LOG_WITH_PREFIX(INFO) << "Beginning data integrity checks on this tablet"; |
3525 | | |
3526 | | // Verify regular db. |
3527 | 3 | if (regular_db_) { |
3528 | 3 | const auto& db_dir = metadata()->rocksdb_dir(); |
3529 | 3 | RETURN_NOT_OK(OpenDbAndCheckIntegrity(db_dir)); |
3530 | 3 | } |
3531 | | |
3532 | | // Verify intents db. |
3533 | 1 | if (intents_db_) { |
3534 | 0 | const auto& db_dir = metadata()->intents_rocksdb_dir(); |
3535 | 0 | RETURN_NOT_OK(OpenDbAndCheckIntegrity(db_dir)); |
3536 | 0 | } |
3537 | | |
3538 | 1 | return Status::OK(); |
3539 | 1 | } |
3540 | | |
3541 | 3 | Status Tablet::OpenDbAndCheckIntegrity(const std::string& db_dir) { |
3542 | | // Similar to ldb's CheckConsistency, we open db as read-only with paranoid checks on. |
3543 | | // If any corruption is detected then the open will fail with a Corruption status. |
3544 | 3 | rocksdb::Options db_opts; |
3545 | 3 | InitRocksDBOptions(&db_opts, LogPrefix()); |
3546 | 3 | db_opts.paranoid_checks = true; |
3547 | | |
3548 | 3 | std::unique_ptr<rocksdb::DB> db; |
3549 | 3 | rocksdb::DB* db_raw = nullptr; |
3550 | 3 | rocksdb::Status st = rocksdb::DB::OpenForReadOnly(db_opts, db_dir, &db_raw); |
3551 | 3 | if (db_raw != nullptr) { |
3552 | 1 | db.reset(db_raw); |
3553 | 1 | } |
3554 | 3 | if (!st.ok()) { |
3555 | 2 | if (st.IsCorruption()) { |
3556 | 2 | LOG_WITH_PREFIX(WARNING) << "Detected rocksdb data corruption: " << st; |
3557 | | // TODO: should we bump metric here or in top-level validation or both? |
3558 | 2 | metrics()->tablet_data_corruptions->Increment(); |
3559 | 2 | return st; |
3560 | 2 | } |
3561 | | |
3562 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to open read-only RocksDB in directory " << db_dir |
3563 | 0 | << ": " << st; |
3564 | 0 | return Status::OK(); |
3565 | 0 | } |
3566 | | |
3567 | | // TODO: we can add more checks here to verify block contents/checksums |
3568 | | |
3569 | 1 | return Status::OK(); |
3570 | 1 | } |
3571 | | |
3572 | 44 | void Tablet::SplitDone() { |
3573 | 44 | { |
3574 | 44 | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
3575 | 44 | if (completed_split_operation_filter_) { |
3576 | 0 | LOG_WITH_PREFIX(DFATAL) << "Already have split operation filter"; |
3577 | 0 | return; |
3578 | 0 | } |
3579 | | |
3580 | 44 | completed_split_operation_filter_ = MakeFunctorOperationFilter( |
3581 | 14 | [this](const OpId& op_id, consensus::OperationType op_type) -> Status { |
3582 | 14 | if (SplitOperation::ShouldAllowOpAfterSplitTablet(op_type)) { |
3583 | 3 | return Status::OK(); |
3584 | 3 | } |
3585 | | |
3586 | 11 | auto children = metadata_->split_child_tablet_ids(); |
3587 | 11 | return SplitOperation::RejectionStatus(OpId(), op_id, op_type, children[0], children[1]); |
3588 | 11 | }); |
3589 | 44 | operation_filters_.push_back(*completed_split_operation_filter_); |
3590 | | |
3591 | 44 | completed_split_log_anchor_ = std::make_unique<log::LogAnchor>(); |
3592 | | |
3593 | 44 | log_anchor_registry_->Register( |
3594 | 44 | metadata_->split_op_id().index, "Splitted tablet", completed_split_log_anchor_.get()); |
3595 | 44 | } |
3596 | 44 | } |
3597 | | |
3598 | 89.0k | void Tablet::SyncRestoringOperationFilter(ResetSplit reset_split) { |
3599 | 89.0k | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
3600 | | |
3601 | 89.0k | if (reset_split) { |
3602 | 0 | if (completed_split_log_anchor_) { |
3603 | 0 | WARN_NOT_OK(log_anchor_registry_->Unregister(completed_split_log_anchor_.get()), |
3604 | 0 | "Unregister split anchor"); |
3605 | 0 | completed_split_log_anchor_ = nullptr; |
3606 | 0 | } |
3607 | |
|
3608 | 0 | if (completed_split_operation_filter_) { |
3609 | 0 | UnregisterOperationFilterUnlocked(completed_split_operation_filter_.get()); |
3610 | 0 | completed_split_operation_filter_ = nullptr; |
3611 | 0 | } |
3612 | 0 | } |
3613 | | |
3614 | 89.0k | if (metadata_->has_active_restoration()) { |
3615 | 0 | if (restoring_operation_filter_) { |
3616 | 0 | return; |
3617 | 0 | } |
3618 | 0 | restoring_operation_filter_ = MakeFunctorOperationFilter( |
3619 | 0 | [](const OpId& op_id, consensus::OperationType op_type) -> Status { |
3620 | 0 | if (SnapshotOperation::ShouldAllowOpDuringRestore(op_type)) { |
3621 | 0 | return Status::OK(); |
3622 | 0 | } |
3623 | | |
3624 | 0 | return SnapshotOperation::RejectionStatus(op_id, op_type); |
3625 | 0 | }); |
3626 | 0 | operation_filters_.push_back(*restoring_operation_filter_); |
3627 | 89.0k | } else { |
3628 | 89.1k | if (!restoring_operation_filter_) { |
3629 | 89.1k | return; |
3630 | 89.1k | } |
3631 | | |
3632 | 18.4E | UnregisterOperationFilterUnlocked(restoring_operation_filter_.get()); |
3633 | 18.4E | restoring_operation_filter_ = nullptr; |
3634 | 18.4E | } |
3635 | 89.0k | } |
3636 | | |
3637 | 0 | Status Tablet::RestoreStarted(const TxnSnapshotRestorationId& restoration_id) { |
3638 | 0 | metadata_->RegisterRestoration(restoration_id); |
3639 | 0 | RETURN_NOT_OK(metadata_->Flush()); |
3640 | |
|
3641 | 0 | SyncRestoringOperationFilter(ResetSplit::kTrue); |
3642 | |
|
3643 | 0 | return Status::OK(); |
3644 | 0 | } |
3645 | | |
3646 | | Status Tablet::RestoreFinished( |
3647 | 0 | const TxnSnapshotRestorationId& restoration_id, HybridTime restoration_hybrid_time) { |
3648 | 0 | metadata_->UnregisterRestoration(restoration_id); |
3649 | 0 | if (restoration_hybrid_time) { |
3650 | 0 | metadata_->SetRestorationHybridTime(restoration_hybrid_time); |
3651 | 0 | if (transaction_participant_ && FLAGS_consistent_restore) { |
3652 | 0 | transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time); |
3653 | 0 | } |
3654 | 0 | } |
3655 | 0 | RETURN_NOT_OK(metadata_->Flush()); |
3656 | |
|
3657 | 0 | SyncRestoringOperationFilter(ResetSplit::kFalse); |
3658 | |
|
3659 | 0 | return Status::OK(); |
3660 | 0 | } |
3661 | | |
3662 | 1.41k | Status Tablet::CheckRestorations(const RestorationCompleteTimeMap& restoration_complete_time) { |
3663 | 1.41k | auto restoration_hybrid_time = metadata_->CheckCompleteRestorations(restoration_complete_time); |
3664 | 1.41k | if (restoration_hybrid_time != HybridTime::kMin |
3665 | 0 | && transaction_participant_ |
3666 | 0 | && FLAGS_consistent_restore) { |
3667 | 0 | transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time); |
3668 | 0 | } |
3669 | | |
3670 | | // We cannot do it in a single shot, because should update transaction participant before |
3671 | | // removing active transactions. |
3672 | 1.41k | if (!metadata_->CleanupRestorations(restoration_complete_time)) { |
3673 | 1.41k | return Status::OK(); |
3674 | 1.41k | } |
3675 | | |
3676 | 0 | RETURN_NOT_OK(metadata_->Flush()); |
3677 | 0 | SyncRestoringOperationFilter(ResetSplit::kFalse); |
3678 | |
|
3679 | 0 | return Status::OK(); |
3680 | 0 | } |
3681 | | |
3682 | 2.73M | Status Tablet::CheckOperationAllowed(const OpId& op_id, consensus::OperationType op_type) { |
3683 | 2.73M | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
3684 | 405 | for (const auto& filter : operation_filters_) { |
3685 | 405 | RETURN_NOT_OK(filter.CheckOperationAllowed(op_id, op_type)); |
3686 | 405 | } |
3687 | | |
3688 | 2.73M | return Status::OK(); |
3689 | 2.73M | } |
3690 | | |
3691 | 912 | void Tablet::RegisterOperationFilter(OperationFilter* filter) { |
3692 | 912 | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
3693 | 912 | operation_filters_.push_back(*filter); |
3694 | 912 | } |
3695 | | |
3696 | 906 | void Tablet::UnregisterOperationFilter(OperationFilter* filter) { |
3697 | 906 | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
3698 | 906 | UnregisterOperationFilterUnlocked(filter); |
3699 | 906 | } |
3700 | | |
3701 | 927 | void Tablet::UnregisterOperationFilterUnlocked(OperationFilter* filter) { |
3702 | 927 | operation_filters_.erase(operation_filters_.iterator_to(*filter)); |
3703 | 927 | } |
3704 | | |
3705 | 6.65M | SchemaPtr Tablet::GetSchema(const std::string& table_id) const { |
3706 | 6.65M | if (table_id.empty()) { |
3707 | 3.66M | return metadata_->schema(); |
3708 | 3.66M | } |
3709 | 2.98M | auto table_info = CHECK_RESULT(metadata_->GetTableInfo(table_id)); |
3710 | 2.98M | return SchemaPtr(table_info, table_info->schema.get()); |
3711 | 2.98M | } |
3712 | | |
3713 | 50.5k | Schema Tablet::GetKeySchema(const std::string& table_id) const { |
3714 | 50.5k | if (table_id.empty()) { |
3715 | 2 | return *key_schema_; |
3716 | 2 | } |
3717 | 50.5k | auto table_info = CHECK_RESULT(metadata_->GetTableInfo(table_id)); |
3718 | 50.5k | return table_info->schema->CreateKeyProjection(); |
3719 | 50.5k | } |
3720 | | |
3721 | | // ------------------------------------------------------------------------------------------------ |
3722 | | |
3723 | | Result<ScopedReadOperation> ScopedReadOperation::Create( |
3724 | | AbstractTablet* tablet, |
3725 | | RequireLease require_lease, |
3726 | 4.87M | ReadHybridTime read_time) { |
3727 | 4.87M | if (!read_time) { |
3728 | 2.88k | read_time = ReadHybridTime::SingleTime(VERIFY_RESULT(tablet->SafeTime(require_lease))); |
3729 | 2.88k | } |
3730 | 4.87M | auto* retention_policy = tablet->RetentionPolicy(); |
3731 | 4.87M | if (retention_policy) { |
3732 | 4.61M | RETURN_NOT_OK(retention_policy->RegisterReaderTimestamp(read_time.read)); |
3733 | 4.61M | } |
3734 | 4.87M | return ScopedReadOperation(tablet, read_time); |
3735 | 4.87M | } |
3736 | | |
3737 | | ScopedReadOperation::ScopedReadOperation( |
3738 | | AbstractTablet* tablet, const ReadHybridTime& read_time) |
3739 | 4.87M | : tablet_(tablet), read_time_(read_time) { |
3740 | 4.87M | } |
3741 | | |
3742 | 20.6M | ScopedReadOperation::~ScopedReadOperation() { |
3743 | 20.6M | Reset(); |
3744 | 20.6M | } |
3745 | | |
3746 | 4.64M | void ScopedReadOperation::operator=(ScopedReadOperation&& rhs) { |
3747 | 4.64M | Reset(); |
3748 | 4.64M | tablet_ = rhs.tablet_; |
3749 | 4.64M | read_time_ = rhs.read_time_; |
3750 | 4.64M | rhs.tablet_ = nullptr; |
3751 | 4.64M | } |
3752 | | |
3753 | 25.3M | void ScopedReadOperation::Reset() { |
3754 | 25.3M | if (tablet_) { |
3755 | 4.86M | auto* retention_policy = tablet_->RetentionPolicy(); |
3756 | 4.86M | if (retention_policy) { |
3757 | 4.58M | retention_policy->UnregisterReaderTimestamp(read_time_.read); |
3758 | 4.58M | } |
3759 | 4.86M | tablet_ = nullptr; |
3760 | 4.86M | } |
3761 | 25.3M | } |
3762 | | |
3763 | | } // namespace tablet |
3764 | | } // namespace yb |