/Users/deen/code/yugabyte-db/src/yb/tablet/tablet.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tablet/tablet.h" |
34 | | |
35 | | #include <boost/container/static_vector.hpp> |
36 | | |
37 | | #include "yb/client/client.h" |
38 | | #include "yb/client/error.h" |
39 | | #include "yb/client/meta_data_cache.h" |
40 | | #include "yb/client/session.h" |
41 | | #include "yb/client/table.h" |
42 | | #include "yb/client/transaction.h" |
43 | | #include "yb/client/transaction_manager.h" |
44 | | #include "yb/client/yb_op.h" |
45 | | |
46 | | #include "yb/common/index_column.h" |
47 | | #include "yb/common/pgsql_error.h" |
48 | | #include "yb/common/ql_rowblock.h" |
49 | | #include "yb/common/row_mark.h" |
50 | | #include "yb/common/schema.h" |
51 | | #include "yb/common/transaction_error.h" |
52 | | #include "yb/common/wire_protocol.h" |
53 | | |
54 | | #include "yb/consensus/consensus.pb.h" |
55 | | #include "yb/consensus/log_anchor_registry.h" |
56 | | #include "yb/consensus/opid_util.h" |
57 | | |
58 | | #include "yb/docdb/compaction_file_filter.h" |
59 | | #include "yb/docdb/conflict_resolution.h" |
60 | | #include "yb/docdb/consensus_frontier.h" |
61 | | #include "yb/docdb/cql_operation.h" |
62 | | #include "yb/docdb/doc_rowwise_iterator.h" |
63 | | #include "yb/docdb/doc_write_batch.h" |
64 | | #include "yb/docdb/docdb.h" |
65 | | #include "yb/docdb/docdb_compaction_filter.h" |
66 | | #include "yb/docdb/docdb_compaction_filter_intents.h" |
67 | | #include "yb/docdb/docdb_debug.h" |
68 | | #include "yb/docdb/docdb_rocksdb_util.h" |
69 | | #include "yb/docdb/pgsql_operation.h" |
70 | | #include "yb/docdb/ql_rocksdb_storage.h" |
71 | | #include "yb/docdb/redis_operation.h" |
72 | | #include "yb/docdb/rocksdb_writer.h" |
73 | | |
74 | | #include "yb/gutil/casts.h" |
75 | | |
76 | | #include "yb/rocksdb/db/memtable.h" |
77 | | #include "yb/rocksdb/utilities/checkpoint.h" |
78 | | |
79 | | #include "yb/rocksutil/yb_rocksdb.h" |
80 | | |
81 | | #include "yb/server/hybrid_clock.h" |
82 | | |
83 | | #include "yb/tablet/operations/change_metadata_operation.h" |
84 | | #include "yb/tablet/operations/operation.h" |
85 | | #include "yb/tablet/operations/snapshot_operation.h" |
86 | | #include "yb/tablet/operations/split_operation.h" |
87 | | #include "yb/tablet/operations/truncate_operation.h" |
88 | | #include "yb/tablet/operations/write_operation.h" |
89 | | #include "yb/tablet/read_result.h" |
90 | | #include "yb/tablet/snapshot_coordinator.h" |
91 | | #include "yb/tablet/tablet_bootstrap_if.h" |
92 | | #include "yb/tablet/tablet_metadata.h" |
93 | | #include "yb/tablet/tablet_metrics.h" |
94 | | #include "yb/tablet/tablet_retention_policy.h" |
95 | | #include "yb/tablet/tablet_snapshots.h" |
96 | | #include "yb/tablet/transaction_coordinator.h" |
97 | | #include "yb/tablet/transaction_participant.h" |
98 | | #include "yb/tablet/write_query.h" |
99 | | |
100 | | #include "yb/tserver/tserver.pb.h" |
101 | | |
102 | | #include "yb/util/debug-util.h" |
103 | | #include "yb/util/debug/trace_event.h" |
104 | | #include "yb/util/flag_tags.h" |
105 | | #include "yb/util/format.h" |
106 | | #include "yb/util/logging.h" |
107 | | #include "yb/util/mem_tracker.h" |
108 | | #include "yb/util/metrics.h" |
109 | | #include "yb/util/net/net_util.h" |
110 | | #include "yb/util/pg_util.h" |
111 | | #include "yb/util/scope_exit.h" |
112 | | #include "yb/util/status_format.h" |
113 | | #include "yb/util/status_log.h" |
114 | | #include "yb/util/stopwatch.h" |
115 | | #include "yb/util/trace.h" |
116 | | #include "yb/util/yb_pg_errcodes.h" |
117 | | |
118 | | #include "yb/yql/pgwrapper/libpq_utils.h" |
119 | | |
120 | | DEFINE_bool(tablet_do_dup_key_checks, true, |
121 | | "Whether to check primary keys for duplicate on insertion. " |
122 | | "Use at your own risk!"); |
123 | | TAG_FLAG(tablet_do_dup_key_checks, unsafe); |
124 | | |
125 | | DEFINE_bool(tablet_do_compaction_cleanup_for_intents, true, |
126 | | "Whether to clean up intents for aborted transactions in compaction."); |
127 | | |
128 | | DEFINE_int32(tablet_bloom_block_size, 4096, |
129 | | "Block size of the bloom filters used for tablet keys."); |
130 | | TAG_FLAG(tablet_bloom_block_size, advanced); |
131 | | |
132 | | DEFINE_double(tablet_bloom_target_fp_rate, 0.01f, |
133 | | "Target false-positive rate (between 0 and 1) to size tablet key bloom filters. " |
134 | | "A lower false positive rate may reduce the number of disk seeks required " |
135 | | "in heavy insert workloads, at the expense of more space and RAM " |
136 | | "required for bloom filters."); |
137 | | TAG_FLAG(tablet_bloom_target_fp_rate, advanced); |
138 | | |
139 | | METRIC_DEFINE_entity(table); |
140 | | METRIC_DEFINE_entity(tablet); |
141 | | |
142 | | // TODO: use a lower default for truncate / snapshot restore Raft operations. The one-minute timeout |
143 | | // is probably OK for shutdown. |
144 | | DEFINE_int32(tablet_rocksdb_ops_quiet_down_timeout_ms, 60000, |
145 | | "Max amount of time we can wait for read/write operations on RocksDB to finish " |
146 | | "so that we can perform exclusive-ownership operations on RocksDB, such as removing " |
147 | | "all data in the tablet by replacing the RocksDB instance with an empty one."); |
148 | | |
149 | | DEFINE_int32(intents_flush_max_delay_ms, 2000, |
150 | | "Max time to wait for regular db to flush during flush of intents. " |
151 | | "After this time flush of regular db will be forced."); |
152 | | |
153 | | DEFINE_int32(num_raft_ops_to_force_idle_intents_db_to_flush, 1000, |
154 | | "When writes to intents RocksDB are stopped and the number of Raft operations after " |
155 | | "the last write to the intents RocksDB " |
156 | | "is greater than this value, the intents RocksDB would be requested to flush."); |
157 | | |
158 | | DEFINE_bool(delete_intents_sst_files, true, |
159 | | "Delete whole intents .SST files when possible."); |
160 | | |
161 | | DEFINE_uint64(backfill_index_write_batch_size, 128, "The batch size for backfilling the index."); |
162 | | TAG_FLAG(backfill_index_write_batch_size, advanced); |
163 | | TAG_FLAG(backfill_index_write_batch_size, runtime); |
164 | | |
165 | | DEFINE_int32(backfill_index_rate_rows_per_sec, 0, "Rate of at which the " |
166 | | "indexed table's entries are populated into the index table during index " |
167 | | "backfill. This is a per-tablet flag, i.e. a tserver responsible for " |
168 | | "multiple tablets could be processing more than this."); |
169 | | TAG_FLAG(backfill_index_rate_rows_per_sec, advanced); |
170 | | TAG_FLAG(backfill_index_rate_rows_per_sec, runtime); |
171 | | |
172 | | DEFINE_uint64(verify_index_read_batch_size, 128, "The batch size for reading the index."); |
173 | | TAG_FLAG(verify_index_read_batch_size, advanced); |
174 | | TAG_FLAG(verify_index_read_batch_size, runtime); |
175 | | |
176 | | DEFINE_int32(verify_index_rate_rows_per_sec, 0, |
177 | | "Rate of at which the indexed table's entries are read during index consistency checks." |
178 | | "This is a per-tablet flag, i.e. a tserver responsible for multiple tablets could be " |
179 | | "processing more than this."); |
180 | | TAG_FLAG(verify_index_rate_rows_per_sec, advanced); |
181 | | TAG_FLAG(verify_index_rate_rows_per_sec, runtime); |
182 | | |
183 | | DEFINE_int32(backfill_index_timeout_grace_margin_ms, -1, |
184 | | "The time we give the backfill process to wrap up the current set " |
185 | | "of writes and return successfully the RPC with the information about " |
186 | | "how far we have processed the rows."); |
187 | | TAG_FLAG(backfill_index_timeout_grace_margin_ms, advanced); |
188 | | TAG_FLAG(backfill_index_timeout_grace_margin_ms, runtime); |
189 | | |
190 | | DEFINE_bool(yql_allow_compatible_schema_versions, true, |
191 | | "Allow YCQL requests to be accepted even if they originate from a client who is ahead " |
192 | | "of the server's schema, but is determined to be compatible with the current version."); |
193 | | TAG_FLAG(yql_allow_compatible_schema_versions, advanced); |
194 | | TAG_FLAG(yql_allow_compatible_schema_versions, runtime); |
195 | | |
196 | | DEFINE_bool(disable_alter_vs_write_mutual_exclusion, false, |
197 | | "A safety switch to disable the changes from D8710 which makes a schema " |
198 | | "operation take an exclusive lock making all write operations wait for it."); |
199 | | TAG_FLAG(disable_alter_vs_write_mutual_exclusion, advanced); |
200 | | TAG_FLAG(disable_alter_vs_write_mutual_exclusion, runtime); |
201 | | |
202 | | DEFINE_bool(cleanup_intents_sst_files, true, |
203 | | "Cleanup intents files that are no more relevant to any running transaction."); |
204 | | |
205 | | DEFINE_int32(ysql_transaction_abort_timeout_ms, 15 * 60 * 1000, // 15 minutes |
206 | | "Max amount of time we can wait for active transactions to abort on a tablet " |
207 | | "after DDL (eg. DROP TABLE) is executed. This deadline is same as " |
208 | | "unresponsive_ts_rpc_timeout_ms"); |
209 | | |
210 | | DEFINE_test_flag(int32, backfill_sabotage_frequency, 0, |
211 | | "If set to value greater than 0, every nth row will be corrupted in the backfill process " |
212 | | "to create an inconsistency between the index and the indexed tables where n is the " |
213 | | "input parameter given."); |
214 | | |
215 | | DEFINE_test_flag(int32, backfill_drop_frequency, 0, |
216 | | "If set to value greater than 0, every nth row will be dropped in the backfill process " |
217 | | "to create an inconsistency between the index and the indexed tables where n is the " |
218 | | "input parameter given."); |
219 | | |
220 | | DEFINE_bool(tablet_enable_ttl_file_filter, false, |
221 | | "Enables compaction to directly delete files that have expired based on TTL, " |
222 | | "rather than removing them via the normal compaction process."); |
223 | | |
224 | | DEFINE_test_flag(int32, slowdown_backfill_by_ms, 0, |
225 | | "If set > 0, slows down the backfill process by this amount."); |
226 | | |
227 | | DEFINE_test_flag(uint64, backfill_paging_size, 0, |
228 | | "If set > 0, returns early after processing this number of rows."); |
229 | | |
230 | | DEFINE_test_flag(bool, tablet_verify_flushed_frontier_after_modifying, false, |
231 | | "After modifying the flushed frontier in RocksDB, verify that the restored value " |
232 | | "of it is as expected. Used for testing."); |
233 | | |
234 | | DEFINE_test_flag(bool, docdb_log_write_batches, false, |
235 | | "Dump write batches being written to RocksDB"); |
236 | | |
237 | | DEFINE_test_flag(bool, export_intentdb_metrics, false, |
238 | | "Dump intentsdb statistics to prometheus metrics"); |
239 | | |
240 | | DEFINE_test_flag(bool, pause_before_post_split_compaction, false, |
241 | | "Pause before triggering post split compaction."); |
242 | | |
243 | | DEFINE_test_flag(bool, disable_adding_user_frontier_to_sst, false, |
244 | | "Prevents adding the UserFrontier to SST file in order to mimic older files."); |
245 | | |
246 | | // FLAGS_TEST_disable_getting_user_frontier_from_mem_table is used in conjunction with |
247 | | // FLAGS_TEST_disable_adding_user_frontier_to_sst. Two flags are needed for the case in which |
248 | | // we're writing a mixture of SST files with and without UserFrontiers, to ensure that we're |
249 | | // not attempting to read the UserFrontier from the MemTable in either case. |
250 | | DEFINE_test_flag(bool, disable_getting_user_frontier_from_mem_table, false, |
251 | | "Prevents checking the MemTable for a UserFrontier for test cases where we are " |
252 | | "generating SST files without UserFrontiers."); |
253 | | |
254 | | DECLARE_int32(client_read_write_timeout_ms); |
255 | | DECLARE_bool(consistent_restore); |
256 | | DECLARE_int32(rocksdb_level0_slowdown_writes_trigger); |
257 | | DECLARE_int32(rocksdb_level0_stop_writes_trigger); |
258 | | DECLARE_uint64(rocksdb_max_file_size_for_compaction); |
259 | | DECLARE_int64(apply_intents_task_injected_delay_ms); |
260 | | DECLARE_string(regular_tablets_data_block_key_value_encoding); |
261 | | |
262 | | DEFINE_test_flag(uint64, inject_sleep_before_applying_intents_ms, 0, |
263 | | "Sleep before applying intents to docdb after transaction commit"); |
264 | | |
265 | | using namespace std::placeholders; |
266 | | |
267 | | using std::shared_ptr; |
268 | | using std::make_shared; |
269 | | using std::string; |
270 | | using std::unordered_set; |
271 | | using std::vector; |
272 | | using std::unique_ptr; |
273 | | using namespace std::literals; // NOLINT |
274 | | |
275 | | using rocksdb::WriteBatch; |
276 | | using rocksdb::SequenceNumber; |
277 | | using yb::tserver::WriteRequestPB; |
278 | | using yb::tserver::WriteResponsePB; |
279 | | using yb::docdb::KeyValueWriteBatchPB; |
280 | | using yb::tserver::ReadRequestPB; |
281 | | using yb::docdb::DocOperation; |
282 | | using yb::docdb::RedisWriteOperation; |
283 | | using yb::docdb::QLWriteOperation; |
284 | | using yb::docdb::PgsqlWriteOperation; |
285 | | using yb::docdb::DocDBCompactionFilterFactory; |
286 | | using yb::docdb::InitMarkerBehavior; |
287 | | |
288 | | namespace yb { |
289 | | namespace tablet { |
290 | | |
291 | | using consensus::MaximumOpId; |
292 | | using log::LogAnchorRegistry; |
293 | | using strings::Substitute; |
294 | | using base::subtle::Barrier_AtomicIncrement; |
295 | | |
296 | | using client::ChildTransactionData; |
297 | | using client::TransactionManager; |
298 | | using client::YBSession; |
299 | | using client::YBTransaction; |
300 | | using client::YBTablePtr; |
301 | | |
302 | | using docdb::DocKey; |
303 | | using docdb::DocPath; |
304 | | using docdb::DocRowwiseIterator; |
305 | | using docdb::DocWriteBatch; |
306 | | using docdb::SubDocKey; |
307 | | using docdb::PrimitiveValue; |
308 | | using docdb::StorageDbType; |
309 | | |
310 | | //////////////////////////////////////////////////////////// |
311 | | // Tablet |
312 | | //////////////////////////////////////////////////////////// |
313 | | |
314 | | namespace { |
315 | | |
316 | | std::string MakeTabletLogPrefix( |
317 | 3.02M | const TabletId& tablet_id, const std::string& log_prefix_suffix) { |
318 | 3.02M | return Format("T $0$1: ", tablet_id, log_prefix_suffix); |
319 | 3.02M | } |
320 | | |
321 | | docdb::ConsensusFrontiers* InitFrontiers( |
322 | | const OpId op_id, |
323 | | const HybridTime log_ht, |
324 | 11.8M | docdb::ConsensusFrontiers* frontiers) { |
325 | 11.8M | if (FLAGS_TEST_disable_adding_user_frontier_to_sst) { |
326 | 0 | return nullptr; |
327 | 0 | } |
328 | 11.8M | set_op_id(op_id, frontiers); |
329 | 11.8M | set_hybrid_time(log_ht, frontiers); |
330 | 11.8M | return frontiers; |
331 | 11.8M | } |
332 | | |
333 | | template <class Data> |
334 | 2.97M | docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) { |
335 | 2.97M | return InitFrontiers(data.op_id, data.log_ht, frontiers); |
336 | 2.97M | } tablet.cc:rocksdb::UserFrontiersBase<yb::docdb::ConsensusFrontier>* yb::tablet::(anonymous namespace)::InitFrontiers<yb::tablet::TransactionApplyData>(yb::tablet::TransactionApplyData const&, rocksdb::UserFrontiersBase<yb::docdb::ConsensusFrontier>*) Line | Count | Source | 334 | 1.30M | docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) { | 335 | 1.30M | return InitFrontiers(data.op_id, data.log_ht, frontiers); | 336 | 1.30M | } |
tablet.cc:rocksdb::UserFrontiersBase<yb::docdb::ConsensusFrontier>* yb::tablet::(anonymous namespace)::InitFrontiers<yb::tablet::RemoveIntentsData>(yb::tablet::RemoveIntentsData const&, rocksdb::UserFrontiersBase<yb::docdb::ConsensusFrontier>*) Line | Count | Source | 334 | 1.67M | docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) { | 335 | 1.67M | return InitFrontiers(data.op_id, data.log_ht, frontiers); | 336 | 1.67M | } |
|
337 | | |
338 | | rocksdb::UserFrontierPtr MemTableFrontierFromDb( |
339 | | rocksdb::DB* db, |
340 | 19.0M | rocksdb::UpdateUserValueType type) { |
341 | 19.0M | if (FLAGS_TEST_disable_getting_user_frontier_from_mem_table) { |
342 | 0 | return nullptr; |
343 | 0 | } |
344 | 19.0M | return db->GetMutableMemTableFrontier(type); |
345 | 19.0M | } |
346 | | |
347 | | } // namespace |
348 | | |
349 | | class Tablet::RegularRocksDbListener : public rocksdb::EventListener { |
350 | | public: |
351 | | RegularRocksDbListener(Tablet* tablet, const std::string& log_prefix) |
352 | | : tablet_(*CHECK_NOTNULL(tablet)), |
353 | 276k | log_prefix_(log_prefix) {} |
354 | | |
355 | 482 | void OnCompactionCompleted(rocksdb::DB* db, const rocksdb::CompactionJobInfo& ci) override { |
356 | 482 | if (ci.is_full_compaction) { |
357 | 301 | auto& metadata = *CHECK_NOTNULL(tablet_.metadata()); |
358 | 301 | if (!metadata.has_been_fully_compacted()) { |
359 | 155 | metadata.set_has_been_fully_compacted(true); |
360 | 155 | ERROR_NOT_OK(metadata.Flush(), log_prefix_); |
361 | 155 | } |
362 | 301 | } |
363 | 482 | } |
364 | | |
365 | | private: |
366 | | Tablet& tablet_; |
367 | | const std::string log_prefix_; |
368 | | }; |
369 | | |
370 | | Tablet::Tablet(const TabletInitData& data) |
371 | | : key_schema_(std::make_unique<Schema>(data.metadata->schema()->CreateKeyProjection())), |
372 | | metadata_(data.metadata), |
373 | | table_type_(data.metadata->table_type()), |
374 | | log_anchor_registry_(data.log_anchor_registry), |
375 | | mem_tracker_(MemTracker::CreateTracker( |
376 | | Format("tablet-$0", tablet_id()), data.parent_mem_tracker, AddToParent::kTrue, |
377 | | CreateMetrics::kFalse)), |
378 | | block_based_table_mem_tracker_(data.block_based_table_mem_tracker), |
379 | | clock_(data.clock), |
380 | | mvcc_( |
381 | | MakeTabletLogPrefix(data.metadata->raft_group_id(), data.log_prefix_suffix), data.clock), |
382 | | tablet_options_(data.tablet_options), |
383 | | pending_non_abortable_op_counter_("RocksDB non-abortable read/write operations"), |
384 | | pending_abortable_op_counter_("RocksDB abortable read/write operations"), |
385 | | write_ops_being_submitted_counter_("Tablet schema"), |
386 | | client_future_(data.client_future), |
387 | | local_tablet_filter_(data.local_tablet_filter), |
388 | | log_prefix_suffix_(data.log_prefix_suffix), |
389 | | is_sys_catalog_(data.is_sys_catalog), |
390 | | txns_enabled_(data.txns_enabled), |
391 | | retention_policy_(std::make_shared<TabletRetentionPolicy>( |
392 | 150k | clock_, data.allowed_history_cutoff_provider, metadata_.get())) { |
393 | 150k | CHECK(schema()->has_column_ids()); |
394 | 150k | LOG_WITH_PREFIX(INFO) << "Schema version for " << metadata_->table_name() << " is " |
395 | 150k | << metadata_->schema_version(); |
396 | | |
397 | 150k | if (data.metric_registry) { |
398 | 150k | MetricEntity::AttributeMap attrs; |
399 | | // TODO(KUDU-745): table_id is apparently not set in the metadata. |
400 | 150k | attrs["table_id"] = metadata_->table_id(); |
401 | 150k | attrs["table_name"] = metadata_->table_name(); |
402 | 150k | attrs["namespace_name"] = metadata_->namespace_name(); |
403 | 150k | table_metrics_entity_ = |
404 | 150k | METRIC_ENTITY_table.Instantiate(data.metric_registry, metadata_->table_id(), attrs); |
405 | 150k | tablet_metrics_entity_ = |
406 | 150k | METRIC_ENTITY_tablet.Instantiate(data.metric_registry, tablet_id(), attrs); |
407 | | // If we are creating a KV table create the metrics callback. |
408 | 150k | regulardb_statistics_ = |
409 | 150k | rocksdb::CreateDBStatistics(table_metrics_entity_, tablet_metrics_entity_); |
410 | 150k | intentsdb_statistics_ = |
411 | 150k | (GetAtomicFlag(&FLAGS_TEST_export_intentdb_metrics) |
412 | 150k | ? rocksdb::CreateDBStatistics(table_metrics_entity_, tablet_metrics_entity_, true)186 |
413 | 150k | : rocksdb::CreateDBStatistics(table_metrics_entity_, nullptr, true)150k ); |
414 | | |
415 | 150k | metrics_.reset(new TabletMetrics(table_metrics_entity_, tablet_metrics_entity_)); |
416 | | |
417 | 150k | mem_tracker_->SetMetricEntity(tablet_metrics_entity_); |
418 | 150k | } |
419 | | |
420 | 150k | auto table_info = metadata_->primary_table_info(); |
421 | 150k | bool has_index = !table_info->index_map->empty(); |
422 | 150k | bool transactional = data.metadata->schema()->table_properties().is_transactional(); |
423 | 150k | if (transactional) { |
424 | 48.9k | server::HybridClock::EnableClockSkewControl(); |
425 | 48.9k | } |
426 | 150k | if (txns_enabled_ && |
427 | 150k | data.transaction_participant_context150k && |
428 | 150k | (149k is_sys_catalog_149k || transactional142k )) { |
429 | 56.8k | transaction_participant_ = std::make_unique<TransactionParticipant>( |
430 | 56.8k | data.transaction_participant_context, this, tablet_metrics_entity_); |
431 | | // Create transaction manager for secondary index update. |
432 | 56.8k | if (has_index) { |
433 | 10 | transaction_manager_ = std::make_unique<client::TransactionManager>( |
434 | 10 | client_future_.get(), scoped_refptr<server::Clock>(clock_), local_tablet_filter_); |
435 | 10 | } |
436 | 56.8k | } |
437 | | |
438 | | // Create index table metadata cache for secondary index update. |
439 | 150k | if (has_index) { |
440 | 10 | CreateNewYBMetaDataCache(); |
441 | 10 | } |
442 | | |
443 | | // If this is a unique index tablet, set up the index primary key schema. |
444 | 150k | if (table_info->index_info && table_info->index_info->is_unique()14.7k ) { |
445 | 3.87k | unique_index_key_schema_ = std::make_unique<Schema>(); |
446 | 3.87k | const auto ids = table_info->index_info->index_key_column_ids(); |
447 | 3.87k | CHECK_OK(table_info->schema->CreateProjectionByIdsIgnoreMissing( |
448 | 3.87k | ids, unique_index_key_schema_.get())); |
449 | 3.87k | } |
450 | | |
451 | 150k | if (data.transaction_coordinator_context && |
452 | 150k | table_info->table_type == TableType::TRANSACTION_STATUS_TABLE_TYPE150k ) { |
453 | 48.0k | transaction_coordinator_ = std::make_unique<TransactionCoordinator>( |
454 | 48.0k | metadata_->fs_manager()->uuid(), |
455 | 48.0k | data.transaction_coordinator_context, |
456 | 48.0k | metrics_->expired_transactions.get()); |
457 | 48.0k | } |
458 | | |
459 | 150k | snapshots_ = std::make_unique<TabletSnapshots>(this); |
460 | | |
461 | 150k | snapshot_coordinator_ = data.snapshot_coordinator; |
462 | | |
463 | 150k | if (metadata_->tablet_data_state() == TabletDataState::TABLET_DATA_SPLIT_COMPLETED) { |
464 | 1 | SplitDone(); |
465 | 1 | } |
466 | 150k | auto restoration_hybrid_time = metadata_->restoration_hybrid_time(); |
467 | 150k | if (restoration_hybrid_time && transaction_participant_150k && FLAGS_consistent_restore56.9k ) { |
468 | 0 | transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time); |
469 | 0 | } |
470 | 150k | SyncRestoringOperationFilter(ResetSplit::kFalse); |
471 | 150k | } |
472 | | |
473 | 76.0k | Tablet::~Tablet() { |
474 | 76.0k | if (StartShutdown()) { |
475 | 490 | CompleteShutdown(); |
476 | 75.5k | } else { |
477 | 75.5k | auto state = state_; |
478 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, state != kShutdown) |
479 | 18.4E | << "Destroying Tablet that did not complete shutdown: " << state; |
480 | 75.5k | } |
481 | 76.0k | if (block_based_table_mem_tracker_) { |
482 | 75.5k | block_based_table_mem_tracker_->UnregisterFromParent(); |
483 | 75.5k | } |
484 | 76.0k | mem_tracker_->UnregisterFromParent(); |
485 | 76.0k | } |
486 | | |
487 | 150k | Status Tablet::Open() { |
488 | 150k | TRACE_EVENT0("tablet", "Tablet::Open"); |
489 | 150k | std::lock_guard<rw_spinlock> lock(component_lock_); |
490 | 150k | CHECK_EQ(state_, kInitialized) << "already open"0 ; |
491 | 150k | CHECK(schema()->has_column_ids()); |
492 | | |
493 | 150k | switch (table_type_) { |
494 | 39.6k | case TableType::PGSQL_TABLE_TYPE: FALLTHROUGH_INTENDED; |
495 | 90.6k | case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED; |
496 | 102k | case TableType::REDIS_TABLE_TYPE: |
497 | 102k | RETURN_NOT_OK(OpenKeyValueTablet()); |
498 | 102k | state_ = kBootstrapping; |
499 | 102k | return Status::OK(); |
500 | 47.9k | case TableType::TRANSACTION_STATUS_TABLE_TYPE: |
501 | 47.9k | state_ = kBootstrapping; |
502 | 47.9k | return Status::OK(); |
503 | 150k | } |
504 | 0 | FATAL_INVALID_ENUM_VALUE(TableType, table_type_); |
505 | |
|
506 | 0 | return Status::OK(); |
507 | 150k | } |
508 | | |
509 | 276k | Status Tablet::CreateTabletDirectories(const string& db_dir, FsManager* fs) { |
510 | 276k | LOG_WITH_PREFIX(INFO) << "Creating RocksDB database in dir " << db_dir; |
511 | | |
512 | | // Create the directory table-uuid first. |
513 | 276k | RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(DirName(db_dir)), |
514 | 276k | Format("Failed to create RocksDB table directory $0", DirName(db_dir))); |
515 | | |
516 | 276k | RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(db_dir), |
517 | 276k | Format("Failed to create RocksDB tablet directory $0", db_dir)); |
518 | | |
519 | 276k | RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(db_dir + kIntentsDBSuffix), |
520 | 276k | Format("Failed to create RocksDB tablet intents directory $0", db_dir)); |
521 | | |
522 | 276k | RETURN_NOT_OK(snapshots_->CreateDirectories(db_dir, fs)); |
523 | | |
524 | 276k | return Status::OK(); |
525 | 276k | } |
526 | | |
527 | 66.9k | void Tablet::ResetYBMetaDataCache() { |
528 | 66.9k | std::atomic_store_explicit(&metadata_cache_, {}, std::memory_order_release); |
529 | 66.9k | } |
530 | | |
531 | 53.2k | void Tablet::CreateNewYBMetaDataCache() { |
532 | 53.2k | std::atomic_store_explicit(&metadata_cache_, |
533 | 53.2k | std::make_shared<client::YBMetaDataCache>(client_future_.get(), |
534 | 53.2k | false /* Update permissions cache */), |
535 | 53.2k | std::memory_order_release); |
536 | 53.2k | } |
537 | | |
538 | 41.3k | std::shared_ptr<client::YBMetaDataCache> Tablet::YBMetaDataCache() { |
539 | 41.3k | return std::atomic_load_explicit(&metadata_cache_, std::memory_order_acquire); |
540 | 41.3k | } |
541 | | |
542 | | template <class F> |
543 | 421k | auto MakeMemTableFlushFilterFactory(const F& f) { |
544 | | // Trick to get type of mem_table_flush_filter_factory field. |
545 | 421k | typedef typename decltype( |
546 | 421k | static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type |
547 | 421k | MemTableFlushFilterFactoryType; |
548 | 421k | return std::make_shared<MemTableFlushFilterFactoryType>(f); |
549 | 421k | } tablet.cc:auto yb::tablet::MakeMemTableFlushFilterFactory<yb::tablet::Tablet::OpenKeyValueTablet()::$_0>(yb::tablet::Tablet::OpenKeyValueTablet()::$_0 const&) Line | Count | Source | 543 | 276k | auto MakeMemTableFlushFilterFactory(const F& f) { | 544 | | // Trick to get type of mem_table_flush_filter_factory field. | 545 | 276k | typedef typename decltype( | 546 | 276k | static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type | 547 | 276k | MemTableFlushFilterFactoryType; | 548 | 276k | return std::make_shared<MemTableFlushFilterFactoryType>(f); | 549 | 276k | } |
tablet.cc:auto yb::tablet::MakeMemTableFlushFilterFactory<yb::tablet::Tablet::OpenKeyValueTablet()::$_2>(yb::tablet::Tablet::OpenKeyValueTablet()::$_2 const&) Line | Count | Source | 543 | 145k | auto MakeMemTableFlushFilterFactory(const F& f) { | 544 | | // Trick to get type of mem_table_flush_filter_factory field. | 545 | 145k | typedef typename decltype( | 546 | 145k | static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type | 547 | 145k | MemTableFlushFilterFactoryType; | 548 | 145k | return std::make_shared<MemTableFlushFilterFactoryType>(f); | 549 | 145k | } |
|
550 | | |
551 | | template <class F> |
552 | 276k | auto MakeMaxFileSizeWithTableTTLFunction(const F& f) { |
553 | | // Trick to get type of max_file_size_for_compaction field. |
554 | 276k | typedef typename decltype( |
555 | 276k | static_cast<rocksdb::Options*>(nullptr)->max_file_size_for_compaction)::element_type |
556 | 276k | MaxFileSizeWithTableTTLFunction; |
557 | 276k | return std::make_shared<MaxFileSizeWithTableTTLFunction>(f); |
558 | 276k | } |
559 | | |
560 | 31.9k | Result<bool> Tablet::IntentsDbFlushFilter(const rocksdb::MemTable& memtable) { |
561 | 31.9k | VLOG_WITH_PREFIX0 (4) << __func__0 ; |
562 | | |
563 | 31.9k | auto frontiers = memtable.Frontiers(); |
564 | 31.9k | if (frontiers) { |
565 | 31.9k | const auto& intents_largest = |
566 | 31.9k | down_cast<const docdb::ConsensusFrontier&>(frontiers->Largest()); |
567 | | |
568 | | // We allow to flush intents DB only after regular DB. |
569 | | // Otherwise we could lose applied intents when corresponding regular records were not |
570 | | // flushed. |
571 | 31.9k | auto regular_flushed_frontier = regular_db_->GetFlushedFrontier(); |
572 | 31.9k | if (regular_flushed_frontier) { |
573 | 15.9k | const auto& regular_flushed_largest = |
574 | 15.9k | static_cast<const docdb::ConsensusFrontier&>(*regular_flushed_frontier); |
575 | 15.9k | if (regular_flushed_largest.op_id().index >= intents_largest.op_id().index) { |
576 | 330 | VLOG_WITH_PREFIX0 (4) << __func__ << ", regular already flushed"0 ; |
577 | 330 | return true; |
578 | 330 | } |
579 | 15.9k | } |
580 | 31.9k | } else { |
581 | 5 | VLOG_WITH_PREFIX0 (4) << __func__ << ", no frontiers"0 ; |
582 | 5 | } |
583 | | |
584 | | // If regular db does not have anything to flush, it means that we have just added intents, |
585 | | // without apply, so it is OK to flush the intents RocksDB. |
586 | 31.6k | auto flush_intention = regular_db_->GetFlushAbility(); |
587 | 31.6k | if (flush_intention == rocksdb::FlushAbility::kNoNewData) { |
588 | 418 | VLOG_WITH_PREFIX0 (4) << __func__ << ", no new data"0 ; |
589 | 418 | return true; |
590 | 418 | } |
591 | | |
592 | | // Force flush of regular DB if we were not able to flush for too long. |
593 | 31.2k | auto timeout = std::chrono::milliseconds(FLAGS_intents_flush_max_delay_ms); |
594 | 31.2k | if (flush_intention != rocksdb::FlushAbility::kAlreadyFlushing && |
595 | 31.2k | (5.34k shutdown_requested_.load(std::memory_order_acquire)5.34k || |
596 | 5.34k | std::chrono::steady_clock::now() > memtable.FlushStartTime() + timeout5.34k )) { |
597 | 70 | VLOG_WITH_PREFIX0 (2) << __func__ << ", force flush"0 ; |
598 | | |
599 | 70 | rocksdb::FlushOptions options; |
600 | 70 | options.wait = false; |
601 | 70 | RETURN_NOT_OK(regular_db_->Flush(options)); |
602 | 70 | } |
603 | | |
604 | 31.2k | return false; |
605 | 31.2k | } |
606 | | |
607 | 2.44M | std::string Tablet::LogPrefix() const { |
608 | 2.44M | return MakeTabletLogPrefix(tablet_id(), log_prefix_suffix_); |
609 | 2.44M | } |
610 | | |
611 | | namespace { |
612 | | |
613 | 421k | std::string LogDbTypePrefix(docdb::StorageDbType db_type) { |
614 | 421k | switch (db_type) { |
615 | 276k | case docdb::StorageDbType::kRegular: |
616 | 276k | return "R"; |
617 | 145k | case docdb::StorageDbType::kIntents: |
618 | 145k | return "I"; |
619 | 421k | } |
620 | 0 | FATAL_INVALID_ENUM_VALUE(docdb::StorageDbType, db_type); |
621 | 0 | } |
622 | | |
623 | | std::string MakeTabletLogPrefix( |
624 | 421k | const TabletId& tablet_id, const std::string& log_prefix_suffix, docdb::StorageDbType db_type) { |
625 | 421k | return MakeTabletLogPrefix( |
626 | 421k | tablet_id, Format("$0 [$1]", log_prefix_suffix, LogDbTypePrefix(db_type))); |
627 | 421k | } |
628 | | |
629 | | } // namespace |
630 | | |
631 | 421k | std::string Tablet::LogPrefix(docdb::StorageDbType db_type) const { |
632 | 421k | return MakeTabletLogPrefix(tablet_id(), log_prefix_suffix_, db_type); |
633 | 421k | } |
634 | | |
635 | 276k | Status Tablet::OpenKeyValueTablet() { |
636 | 276k | static const std::string kRegularDB = "RegularDB"s; |
637 | 276k | static const std::string kIntentsDB = "IntentsDB"s; |
638 | | |
639 | 276k | rocksdb::BlockBasedTableOptions table_options; |
640 | 276k | if (!metadata()->primary_table_info()->index_info || metadata()->colocated()96.8k ) { |
641 | | // This tablet is not dedicated to the index table, so it should be effective to use |
642 | | // advanced key-value encoding algorithm optimized for docdb keys structure. |
643 | 179k | table_options.use_delta_encoding = true; |
644 | 179k | table_options.data_block_key_value_encoding_format = |
645 | 179k | VERIFY_RESULT(docdb::GetConfiguredKeyValueEncodingFormat( |
646 | 179k | FLAGS_regular_tablets_data_block_key_value_encoding)); |
647 | 179k | } |
648 | 276k | rocksdb::Options rocksdb_options; |
649 | 276k | InitRocksDBOptions( |
650 | 276k | &rocksdb_options, LogPrefix(docdb::StorageDbType::kRegular), std::move(table_options)); |
651 | 276k | rocksdb_options.mem_tracker = MemTracker::FindOrCreateTracker(kRegularDB, mem_tracker_); |
652 | 276k | rocksdb_options.block_based_table_mem_tracker = |
653 | 276k | MemTracker::FindOrCreateTracker( |
654 | 276k | Format("$0-$1", kRegularDB, tablet_id()), block_based_table_mem_tracker_, |
655 | 276k | AddToParent::kTrue, CreateMetrics::kFalse); |
656 | | // We may not have a metrics_entity_ instantiated in tests. |
657 | 276k | if (tablet_metrics_entity_) { |
658 | 275k | rocksdb_options.block_based_table_mem_tracker->SetMetricEntity( |
659 | 275k | tablet_metrics_entity_, Format("$0_$1", "BlockBasedTable", kRegularDB)); |
660 | 275k | } |
661 | | |
662 | 276k | key_bounds_ = docdb::KeyBounds(metadata()->lower_bound_key(), metadata()->upper_bound_key()); |
663 | | |
664 | | // Install the history cleanup handler. Note that TabletRetentionPolicy is going to hold a raw ptr |
665 | | // to this tablet. So, we ensure that rocksdb_ is reset before this tablet gets destroyed. |
666 | 276k | rocksdb_options.compaction_filter_factory = make_shared<DocDBCompactionFilterFactory>( |
667 | 276k | retention_policy_, &key_bounds_); |
668 | | |
669 | 276k | rocksdb_options.mem_table_flush_filter_factory = MakeMemTableFlushFilterFactory([this] { |
670 | 3.62k | if (mem_table_flush_filter_factory_) { |
671 | 3.06k | return mem_table_flush_filter_factory_(); |
672 | 3.06k | } |
673 | 560 | return rocksdb::MemTableFilter(); |
674 | 3.62k | }); |
675 | 276k | if (FLAGS_tablet_enable_ttl_file_filter) { |
676 | 0 | rocksdb_options.compaction_file_filter_factory = |
677 | 0 | std::make_shared<docdb::DocDBCompactionFileFilterFactory>(retention_policy_, clock()); |
678 | 0 | } |
679 | | |
680 | | // Use a function that checks the table TTL before returning a value for max file size |
681 | | // for compactions. |
682 | 1.11M | rocksdb_options.max_file_size_for_compaction = MakeMaxFileSizeWithTableTTLFunction([this] { |
683 | 1.11M | if (FLAGS_rocksdb_max_file_size_for_compaction > 0 && |
684 | 1.11M | retention_policy_->GetRetentionDirective().table_ttl != |
685 | 0 | docdb::ValueControlFields::kMaxTtl) { |
686 | 0 | return FLAGS_rocksdb_max_file_size_for_compaction; |
687 | 0 | } |
688 | 1.11M | return std::numeric_limits<uint64_t>::max(); |
689 | 1.11M | }); |
690 | | |
691 | 276k | rocksdb_options.disable_auto_compactions = true; |
692 | 276k | rocksdb_options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max(); |
693 | 276k | rocksdb_options.level0_stop_writes_trigger = std::numeric_limits<int>::max(); |
694 | | |
695 | 276k | rocksdb::Options regular_rocksdb_options(rocksdb_options); |
696 | 276k | regular_rocksdb_options.listeners.push_back( |
697 | 276k | std::make_shared<RegularRocksDbListener>(this, regular_rocksdb_options.log_prefix)); |
698 | | |
699 | 276k | const string db_dir = metadata()->rocksdb_dir(); |
700 | 276k | RETURN_NOT_OK(CreateTabletDirectories(db_dir, metadata()->fs_manager())); |
701 | | |
702 | 276k | LOG(INFO) << "Opening RocksDB at: " << db_dir; |
703 | 276k | rocksdb::DB* db = nullptr; |
704 | 276k | rocksdb::Status rocksdb_open_status = rocksdb::DB::Open(regular_rocksdb_options, db_dir, &db); |
705 | 276k | if (!rocksdb_open_status.ok()) { |
706 | 5 | LOG_WITH_PREFIX(ERROR) << "Failed to open a RocksDB database in directory " << db_dir << ": " |
707 | 5 | << rocksdb_open_status; |
708 | 5 | if (db != nullptr) { |
709 | 0 | delete db; |
710 | 0 | } |
711 | 5 | return STATUS(IllegalState, rocksdb_open_status.ToString()); |
712 | 5 | } |
713 | 276k | regular_db_.reset(db); |
714 | 276k | regular_db_->ListenFilesChanged(std::bind(&Tablet::RegularDbFilesChanged, this)); |
715 | | |
716 | 276k | if (transaction_participant_) { |
717 | 145k | LOG_WITH_PREFIX(INFO) << "Opening intents DB at: " << db_dir + kIntentsDBSuffix; |
718 | 145k | rocksdb::Options intents_rocksdb_options(rocksdb_options); |
719 | 145k | docdb::SetLogPrefix(&intents_rocksdb_options, LogPrefix(docdb::StorageDbType::kIntents)); |
720 | | |
721 | 145k | intents_rocksdb_options.mem_table_flush_filter_factory = MakeMemTableFlushFilterFactory([this] { |
722 | 31.8k | return std::bind(&Tablet::IntentsDbFlushFilter, this, _1); |
723 | 31.8k | }); |
724 | | |
725 | 145k | intents_rocksdb_options.compaction_filter_factory = |
726 | 145k | FLAGS_tablet_do_compaction_cleanup_for_intents ? |
727 | 18.4E | std::make_shared<docdb::DocDBIntentsCompactionFilterFactory>(this, &key_bounds_)145k : nullptr; |
728 | | |
729 | 145k | intents_rocksdb_options.mem_tracker = MemTracker::FindOrCreateTracker(kIntentsDB, mem_tracker_); |
730 | 145k | intents_rocksdb_options.block_based_table_mem_tracker = |
731 | 145k | MemTracker::FindOrCreateTracker( |
732 | 145k | Format("$0-$1", kIntentsDB, tablet_id()), block_based_table_mem_tracker_, |
733 | 145k | AddToParent::kTrue, CreateMetrics::kFalse); |
734 | | // We may not have a metrics_entity_ instantiated in tests. |
735 | 145k | if (tablet_metrics_entity_145k ) { |
736 | 145k | intents_rocksdb_options.block_based_table_mem_tracker->SetMetricEntity( |
737 | 145k | tablet_metrics_entity_, Format("$0_$1", "BlockBasedTable", kIntentsDB)); |
738 | 145k | } |
739 | 145k | intents_rocksdb_options.statistics = intentsdb_statistics_; |
740 | | |
741 | 145k | rocksdb::DB* intents_db = nullptr; |
742 | 145k | RETURN_NOT_OK( |
743 | 145k | rocksdb::DB::Open(intents_rocksdb_options, db_dir + kIntentsDBSuffix, &intents_db)); |
744 | 145k | intents_db_.reset(intents_db); |
745 | 145k | intents_db_->ListenFilesChanged(std::bind(&Tablet::CleanupIntentFiles, this)); |
746 | 145k | } |
747 | | |
748 | 276k | ql_storage_.reset(new docdb::QLRocksDBStorage(doc_db())); |
749 | 276k | if (transaction_participant_) { |
750 | 145k | transaction_participant_->SetDB(doc_db(), &key_bounds_, &pending_non_abortable_op_counter_); |
751 | 145k | } |
752 | | |
753 | | // Don't allow reads at timestamps lower than the highest history cutoff of a past compaction. |
754 | 276k | auto regular_flushed_frontier = regular_db_->GetFlushedFrontier(); |
755 | 276k | if (regular_flushed_frontier) { |
756 | 3.05k | retention_policy_->UpdateCommittedHistoryCutoff( |
757 | 3.05k | static_cast<const docdb::ConsensusFrontier&>(*regular_flushed_frontier).history_cutoff()); |
758 | 3.05k | } |
759 | | |
760 | 276k | LOG_WITH_PREFIX(INFO) << "Successfully opened a RocksDB database at " << db_dir |
761 | 276k | << ", obj: " << db; |
762 | | |
763 | 276k | return Status::OK(); |
764 | 276k | } |
765 | | |
766 | 4.10k | void Tablet::RegularDbFilesChanged() { |
767 | 4.10k | std::lock_guard<std::mutex> lock(num_sst_files_changed_listener_mutex_); |
768 | 4.10k | if (num_sst_files_changed_listener_) { |
769 | 3.41k | num_sst_files_changed_listener_(); |
770 | 3.41k | } |
771 | 4.10k | } |
772 | | |
773 | 150k | void Tablet::SetCleanupPool(ThreadPool* thread_pool) { |
774 | 150k | if (!transaction_participant_) { |
775 | 93.1k | return; |
776 | 93.1k | } |
777 | | |
778 | 57.0k | cleanup_intent_files_token_ = thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL); |
779 | | |
780 | 57.0k | CleanupIntentFiles(); |
781 | 57.0k | } |
782 | | |
783 | 88.7k | void Tablet::CleanupIntentFiles() { |
784 | 88.7k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
785 | 88.7k | if (!scoped_read_operation.ok() || state_ != State::kOpen88.7k || !FLAGS_delete_intents_sst_files88.7k || |
786 | 88.7k | !cleanup_intent_files_token_88.7k ) { |
787 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << "Skip"; |
788 | 0 | return; |
789 | 0 | } |
790 | | |
791 | 88.7k | WARN_NOT_OK( |
792 | 88.7k | cleanup_intent_files_token_->SubmitFunc(std::bind(&Tablet::DoCleanupIntentFiles, this)), |
793 | 88.7k | "Submit cleanup intent files failed"); |
794 | 88.7k | } |
795 | | |
796 | 88.7k | void Tablet::DoCleanupIntentFiles() { |
797 | 88.7k | if (metadata_->is_under_twodc_replication()) { |
798 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << "Exit because of TwoDC replication"; |
799 | 0 | return; |
800 | 0 | } |
801 | 88.7k | HybridTime best_file_max_ht = HybridTime::kMax; |
802 | 88.7k | std::vector<rocksdb::LiveFileMetaData> files; |
803 | | // Stops when there are no more files to delete. |
804 | 88.7k | std::string previous_name; |
805 | 89.0k | while (GetAtomicFlag(&FLAGS_cleanup_intents_sst_files)) { |
806 | 88.9k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
807 | 88.9k | if (!scoped_read_operation.ok()) { |
808 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << "Failed to acquire scoped read operation"; |
809 | 0 | break; |
810 | 0 | } |
811 | | |
812 | 88.9k | best_file_max_ht = HybridTime::kMax; |
813 | 88.9k | const rocksdb::LiveFileMetaData* best_file = nullptr; |
814 | 88.9k | files.clear(); |
815 | 88.9k | intents_db_->GetLiveFilesMetaData(&files); |
816 | 88.9k | auto min_largest_seq_no = std::numeric_limits<rocksdb::SequenceNumber>::max(); |
817 | | |
818 | 88.9k | VLOG_WITH_PREFIX_AND_FUNC4 (5) << "Files: " << AsString(files)4 ; |
819 | | |
820 | 88.9k | for (const auto& file : files) { |
821 | 6.28k | if (file.largest.seqno < min_largest_seq_no) { |
822 | 6.28k | min_largest_seq_no = file.largest.seqno; |
823 | 6.28k | if (file.largest.user_frontier6.28k ) { |
824 | 6.28k | auto& frontier = down_cast<docdb::ConsensusFrontier&>(*file.largest.user_frontier); |
825 | 6.28k | best_file_max_ht = frontier.hybrid_time(); |
826 | 18.4E | } else { |
827 | 18.4E | best_file_max_ht = HybridTime::kMax; |
828 | 18.4E | } |
829 | 6.28k | best_file = &file; |
830 | 6.28k | } |
831 | 6.28k | } |
832 | | |
833 | 88.9k | auto min_running_start_ht = transaction_participant_->MinRunningHybridTime(); |
834 | 88.9k | if (!min_running_start_ht.is_valid() || min_running_start_ht <= best_file_max_ht88.8k ) { |
835 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) |
836 | 18.4E | << "Cannot delete because of running transactions: " << min_running_start_ht |
837 | 18.4E | << ", best file max ht: " << best_file_max_ht; |
838 | 88.7k | break; |
839 | 88.7k | } |
840 | 260 | if (best_file->name == previous_name) { |
841 | 0 | LOG_WITH_PREFIX_AND_FUNC(INFO) |
842 | 0 | << "Attempt to delete same file: " << previous_name << ", stopping cleanup"; |
843 | 0 | break; |
844 | 0 | } |
845 | 260 | previous_name = best_file->name; |
846 | | |
847 | 260 | LOG_WITH_PREFIX_AND_FUNC(INFO) |
848 | 260 | << "Intents SST file will be deleted: " << best_file->ToString() |
849 | 260 | << ", max ht: " << best_file_max_ht << ", min running transaction start ht: " |
850 | 260 | << min_running_start_ht; |
851 | 260 | auto flush_status = regular_db_->Flush(rocksdb::FlushOptions()); |
852 | 260 | if (!flush_status.ok()) { |
853 | 0 | LOG_WITH_PREFIX_AND_FUNC(WARNING) << "Failed to flush regular db: " << flush_status; |
854 | 0 | break; |
855 | 0 | } |
856 | 260 | auto delete_status = intents_db_->DeleteFile(best_file->name); |
857 | 260 | if (!delete_status.ok()) { |
858 | 0 | LOG_WITH_PREFIX_AND_FUNC(WARNING) |
859 | 0 | << "Failed to delete " << best_file->ToString() << ", all files " << AsString(files) |
860 | 0 | << ": " << delete_status; |
861 | 0 | break; |
862 | 0 | } |
863 | 260 | } |
864 | | |
865 | 88.7k | if (best_file_max_ht != HybridTime::kMax) { |
866 | 3.53k | VLOG_WITH_PREFIX_AND_FUNC0 (4) << "Wait min running hybrid time: " << best_file_max_ht0 ; |
867 | 3.53k | transaction_participant_->WaitMinRunningHybridTime(best_file_max_ht); |
868 | 3.53k | } |
869 | 88.7k | } |
870 | | |
871 | 152k | Status Tablet::EnableCompactions(ScopedRWOperationPause* non_abortable_ops_pause) { |
872 | 152k | if (!non_abortable_ops_pause) { |
873 | 150k | auto operation = CreateNonAbortableScopedRWOperation(); |
874 | 150k | RETURN_NOT_OK(operation); |
875 | 150k | return DoEnableCompactions(); |
876 | 150k | } |
877 | | |
878 | 1.79k | return DoEnableCompactions(); |
879 | 152k | } |
880 | | |
881 | 323k | Status Tablet::DoEnableCompactions() { |
882 | 323k | Status regular_db_status; |
883 | 323k | std::unordered_map<std::string, std::string> new_options = { |
884 | 323k | { "level0_slowdown_writes_trigger"s, |
885 | 323k | std::to_string(max_if_negative(FLAGS_rocksdb_level0_slowdown_writes_trigger))}, |
886 | 323k | { "level0_stop_writes_trigger"s, |
887 | 323k | std::to_string(max_if_negative(FLAGS_rocksdb_level0_stop_writes_trigger))}, |
888 | 323k | }; |
889 | 323k | if (regular_db_) { |
890 | 275k | WARN_WITH_PREFIX_NOT_OK( |
891 | 275k | regular_db_->SetOptions(new_options, /* dump_options= */ false), |
892 | 275k | "Failed to set options on regular DB"); |
893 | 275k | regular_db_status = |
894 | 275k | regular_db_->EnableAutoCompaction({regular_db_->DefaultColumnFamily()}); |
895 | 275k | if (!regular_db_status.ok()) { |
896 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to enable compactions on regular DB: " |
897 | 0 | << regular_db_status; |
898 | 0 | } |
899 | 275k | } |
900 | 323k | if (intents_db_) { |
901 | 145k | WARN_WITH_PREFIX_NOT_OK( |
902 | 145k | intents_db_->SetOptions(new_options, /* dump_options= */ false), |
903 | 145k | "Failed to set options on provisional records DB"); |
904 | 145k | Status intents_db_status = |
905 | 145k | intents_db_->EnableAutoCompaction({intents_db_->DefaultColumnFamily()}); |
906 | 145k | if (!intents_db_status.ok()) { |
907 | 0 | LOG_WITH_PREFIX(WARNING) |
908 | 0 | << "Failed to enable compactions on provisional records DB: " << intents_db_status; |
909 | 0 | return intents_db_status; |
910 | 0 | } |
911 | 145k | } |
912 | 323k | return regular_db_status; |
913 | 323k | } |
914 | | |
915 | 150k | void Tablet::MarkFinishedBootstrapping() { |
916 | 150k | CHECK_EQ(state_, kBootstrapping); |
917 | 150k | state_ = kOpen; |
918 | 150k | } |
919 | | |
920 | 227k | bool Tablet::StartShutdown(const IsDropTable is_drop_table) { |
921 | 227k | LOG_WITH_PREFIX(INFO) << __func__; |
922 | | |
923 | 227k | bool expected = false; |
924 | 227k | if (!shutdown_requested_.compare_exchange_strong(expected, true)) { |
925 | 151k | return false; |
926 | 151k | } |
927 | | |
928 | 76.0k | if (transaction_participant_) { |
929 | 44.7k | transaction_participant_->StartShutdown(); |
930 | 44.7k | } |
931 | | |
932 | 76.0k | return true; |
933 | 227k | } |
934 | | |
935 | 76.0k | void Tablet::CompleteShutdown(IsDropTable is_drop_table) { |
936 | 76.0k | LOG_WITH_PREFIX(INFO) << __func__ << "(" << is_drop_table << ")"; |
937 | | |
938 | 76.0k | StartShutdown(); |
939 | | |
940 | 76.0k | auto op_pauses = StartShutdownRocksDBs(DisableFlushOnShutdown(is_drop_table), Stop::kTrue); |
941 | 76.0k | if (!op_pauses.ok()) { |
942 | 0 | LOG_WITH_PREFIX(DFATAL) << "Failed to shut down: " << op_pauses.status(); |
943 | 0 | return; |
944 | 0 | } |
945 | | |
946 | 76.0k | cleanup_intent_files_token_.reset(); |
947 | | |
948 | 76.0k | if (transaction_coordinator_) { |
949 | 432 | transaction_coordinator_->Shutdown(); |
950 | 432 | } |
951 | | |
952 | 76.0k | if (transaction_participant_) { |
953 | 44.7k | transaction_participant_->CompleteShutdown(); |
954 | 44.7k | } |
955 | | |
956 | 76.0k | { |
957 | 76.0k | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
958 | | |
959 | 76.0k | if (completed_split_log_anchor_) { |
960 | 44 | WARN_NOT_OK(log_anchor_registry_->Unregister(completed_split_log_anchor_.get()), |
961 | 44 | "Unregister split anchor"); |
962 | 44 | } |
963 | | |
964 | 76.0k | if (completed_split_operation_filter_) { |
965 | 44 | UnregisterOperationFilterUnlocked(completed_split_operation_filter_.get()); |
966 | 44 | } |
967 | | |
968 | 76.0k | if (restoring_operation_filter_) { |
969 | 0 | UnregisterOperationFilterUnlocked(restoring_operation_filter_.get()); |
970 | 0 | } |
971 | 76.0k | } |
972 | | |
973 | 76.0k | std::lock_guard<rw_spinlock> lock(component_lock_); |
974 | | |
975 | | // Shutdown the RocksDB instance for this tablet, if present. |
976 | | // Destroy intents and regular DBs in reverse order to their creation. |
977 | | // Also it makes sure that regular DB is alive during flush filter of intents db. |
978 | 76.0k | WARN_NOT_OK(CompleteShutdownRocksDBs(Destroy::kFalse, &(*op_pauses)), |
979 | 76.0k | "Failed to reset rocksdb during shutdown"); |
980 | | |
981 | 76.0k | if (post_split_compaction_task_pool_token_) { |
982 | 49 | post_split_compaction_task_pool_token_->Shutdown(); |
983 | 49 | } |
984 | | |
985 | 76.0k | state_ = kShutdown; |
986 | | |
987 | 151k | for (auto* op_pause : op_pauses->AsArray()) { |
988 | | // Release the mutex that prevents snapshot restore / truncate operations from running. Such |
989 | | // operations are no longer possible because the tablet has shut down. When we start the |
990 | | // "read/write operation pause", we incremented the "exclusive operation" counter. This will |
991 | | // prevent us from decrementing that counter back, disabling read/write operations permanently. |
992 | 151k | op_pause->ReleaseMutexButKeepDisabled(); |
993 | | // Ensure that op_pause stays in scope throughout this function. |
994 | 18.4E | LOG_IF(DFATAL, !op_pause->status().ok()) << op_pause->status(); |
995 | 151k | } |
996 | 76.0k | } |
997 | | |
998 | | CHECKED_STATUS ResetRocksDB( |
999 | 498k | bool destroy, const rocksdb::Options& options, std::unique_ptr<rocksdb::DB>* db) { |
1000 | 498k | if (!*db) { |
1001 | 116k | return Status::OK(); |
1002 | 116k | } |
1003 | | |
1004 | 381k | auto dir = (**db).GetName(); |
1005 | 381k | db->reset(); |
1006 | 381k | if (!destroy) { |
1007 | 120k | return Status::OK(); |
1008 | 120k | } |
1009 | | |
1010 | 261k | return rocksdb::DestroyDB(dir, options); |
1011 | 381k | } |
1012 | | |
1013 | | Result<TabletScopedRWOperationPauses> Tablet::StartShutdownRocksDBs( |
1014 | 248k | DisableFlushOnShutdown disable_flush_on_shutdown, Stop stop) { |
1015 | 248k | TabletScopedRWOperationPauses op_pauses; |
1016 | | |
1017 | 498k | auto pause = [this, stop](const Abortable abortable) -> Result<ScopedRWOperationPause> { |
1018 | 498k | auto op_pause = PauseReadWriteOperations(abortable, stop); |
1019 | 498k | if (!op_pause.ok()) { |
1020 | 0 | return op_pause.status().CloneAndPrepend("Failed to stop read/write operations: "); |
1021 | 0 | } |
1022 | 498k | return std::move(op_pause); |
1023 | 498k | }; |
1024 | | |
1025 | 248k | op_pauses.non_abortable = VERIFY_RESULT(pause(Abortable::kFalse)); |
1026 | | |
1027 | 0 | bool expected = false; |
1028 | | // If shutdown has been already requested, we still might need to wait for all pending read/write |
1029 | | // operations to complete here, because caller is not holding ScopedRWOperationPause. |
1030 | 249k | if (rocksdb_shutdown_requested_.compare_exchange_strong(expected, true)248k ) { |
1031 | 498k | for (auto* db : {regular_db_.get(), intents_db_.get()}) { |
1032 | 498k | if (db) { |
1033 | 382k | db->SetDisableFlushOnShutdown(disable_flush_on_shutdown); |
1034 | 382k | db->StartShutdown(); |
1035 | 382k | } |
1036 | 498k | } |
1037 | 249k | } |
1038 | | |
1039 | 248k | op_pauses.abortable = VERIFY_RESULT(pause(Abortable::kTrue)); |
1040 | | |
1041 | 0 | return op_pauses; |
1042 | 248k | } |
1043 | | |
1044 | | Status Tablet::CompleteShutdownRocksDBs( |
1045 | 249k | Destroy destroy, TabletScopedRWOperationPauses* ops_pauses) { |
1046 | | // We need non-null ops_pauses just to guarantee that PauseReadWriteOperations has been called. |
1047 | 249k | RSTATUS_DCHECK( |
1048 | 249k | ops_pauses != nullptr, InvalidArgument, |
1049 | 249k | "ops_pauses could not be null, StartRocksDbShutdown should be called before " |
1050 | 249k | "ShutdownRocksDBs."); |
1051 | | |
1052 | 249k | if (intents_db_) { |
1053 | 133k | intents_db_->ListenFilesChanged(nullptr); |
1054 | 133k | } |
1055 | | |
1056 | 249k | rocksdb::Options rocksdb_options; |
1057 | 249k | if (destroy) { |
1058 | 173k | InitRocksDBOptions(&rocksdb_options, LogPrefix()); |
1059 | 173k | } |
1060 | | |
1061 | 249k | Status intents_status = ResetRocksDB(destroy, rocksdb_options, &intents_db_); |
1062 | 249k | Status regular_status = ResetRocksDB(destroy, rocksdb_options, ®ular_db_); |
1063 | 249k | key_bounds_ = docdb::KeyBounds(); |
1064 | | // Reset rocksdb_shutdown_requested_ to the initial state like RocksDBs were never opened, |
1065 | | // so we don't have to reset it on RocksDB open (we potentially can have several places in the |
1066 | | // code doing opening RocksDB while RocksDB shutdown is always going through |
1067 | | // Tablet::ShutdownRocksDBs). |
1068 | 249k | rocksdb_shutdown_requested_ = false; |
1069 | | |
1070 | 18.4E | return regular_status.ok()249k ? intents_status249k : regular_status; |
1071 | 249k | } |
1072 | | |
1073 | | Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::NewRowIterator( |
1074 | | const Schema &projection, |
1075 | | const ReadHybridTime read_hybrid_time, |
1076 | | const TableId& table_id, |
1077 | | CoarseTimePoint deadline, |
1078 | | AllowBootstrappingState allow_bootstrapping_state, |
1079 | 421k | const Slice& sub_doc_key) const { |
1080 | 421k | if (state_ != kOpen && (178 !allow_bootstrapping_state178 || state_ != kBootstrapping178 )) { |
1081 | 0 | return STATUS_FORMAT(IllegalState, "Tablet in wrong state: $0", state_); |
1082 | 0 | } |
1083 | | |
1084 | 421k | if (table_type_ != TableType::YQL_TABLE_TYPE && table_type_ != TableType::PGSQL_TABLE_TYPE12 ) { |
1085 | 0 | return STATUS_FORMAT(NotSupported, "Invalid table type: $0", table_type_); |
1086 | 0 | } |
1087 | | |
1088 | 421k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
1089 | 421k | RETURN_NOT_OK(scoped_read_operation); |
1090 | | |
1091 | 421k | VLOG_WITH_PREFIX40 (2) << "Created new Iterator reading at " << read_hybrid_time.ToString()40 ; |
1092 | | |
1093 | 421k | const std::shared_ptr<tablet::TableInfo> table_info = |
1094 | 421k | VERIFY_RESULT(metadata_->GetTableInfo(table_id)); |
1095 | 0 | const Schema& schema = *table_info->schema; |
1096 | 421k | auto mapped_projection = std::make_unique<Schema>(); |
1097 | 421k | RETURN_NOT_OK(schema.GetMappedReadProjection(projection, mapped_projection.get())); |
1098 | | |
1099 | 421k | auto txn_op_ctx = VERIFY_RESULT(CreateTransactionOperationContext( |
1100 | 421k | /* transaction_id */ boost::none, |
1101 | 421k | schema.table_properties().is_ysql_catalog_table())); |
1102 | 421k | const auto read_time = read_hybrid_time |
1103 | 421k | ? read_hybrid_time43.9k |
1104 | 421k | : ReadHybridTime::SingleTime(377k VERIFY_RESULT377k (SafeTime(RequireLease::kFalse)))377k ; |
1105 | 0 | auto result = std::make_unique<DocRowwiseIterator>( |
1106 | 421k | std::move(mapped_projection), schema, txn_op_ctx, doc_db(), |
1107 | 421k | deadline, read_time, &pending_non_abortable_op_counter_); |
1108 | 421k | RETURN_NOT_OK(result->Init(table_type_, sub_doc_key)); |
1109 | 421k | return std::move(result); |
1110 | 421k | } |
1111 | | |
1112 | | Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::NewRowIterator( |
1113 | 0 | const TableId& table_id) const { |
1114 | 0 | const std::shared_ptr<tablet::TableInfo> table_info = |
1115 | 0 | VERIFY_RESULT(metadata_->GetTableInfo(table_id)); |
1116 | 0 | return NewRowIterator(*table_info->schema, {}, table_id); |
1117 | 0 | } |
1118 | | |
1119 | | Status Tablet::ApplyRowOperations( |
1120 | 8.83M | WriteOperation* operation, AlreadyAppliedToRegularDB already_applied_to_regular_db) { |
1121 | 8.83M | const auto& write_request = |
1122 | 8.83M | operation->consensus_round() && operation->consensus_round()->replicate_msg()8.55M |
1123 | | // Online case. |
1124 | 8.83M | ? operation->consensus_round()->replicate_msg()->write()8.55M |
1125 | | // Bootstrap case. |
1126 | 8.83M | : *operation->request()277k ; |
1127 | 8.83M | const KeyValueWriteBatchPB& put_batch = write_request.write_batch(); |
1128 | 8.83M | if (metrics_) { |
1129 | 8.66M | VLOG(3) << "Applying write batch (write_pairs=" << put_batch.write_pairs().size() << "): " |
1130 | 112 | << put_batch.ShortDebugString(); |
1131 | 8.66M | metrics_->rows_inserted->IncrementBy(put_batch.write_pairs().size()); |
1132 | 8.66M | } |
1133 | | |
1134 | 8.83M | return ApplyOperation( |
1135 | 8.83M | *operation, write_request.batch_idx(), put_batch, already_applied_to_regular_db); |
1136 | 8.83M | } |
1137 | | |
1138 | | Status Tablet::ApplyOperation( |
1139 | | const Operation& operation, int64_t batch_idx, |
1140 | | const docdb::KeyValueWriteBatchPB& write_batch, |
1141 | 8.83M | AlreadyAppliedToRegularDB already_applied_to_regular_db) { |
1142 | 8.83M | auto hybrid_time = operation.WriteHybridTime(); |
1143 | | |
1144 | 8.83M | docdb::ConsensusFrontiers frontiers; |
1145 | | // Even if we have an external hybrid time, use the local commit hybrid time in the consensus |
1146 | | // frontier. |
1147 | 8.83M | auto frontiers_ptr = |
1148 | 8.83M | InitFrontiers(operation.op_id(), operation.hybrid_time(), &frontiers); |
1149 | 8.83M | if (frontiers_ptr) { |
1150 | 8.83M | auto ttl = write_batch.has_ttl() |
1151 | 8.83M | ? MonoDelta::FromNanoseconds(write_batch.ttl())611 |
1152 | 8.83M | : docdb::ValueControlFields::kMaxTtl8.82M ; |
1153 | 8.83M | frontiers_ptr->Largest().set_max_value_level_ttl_expiration_time( |
1154 | 8.83M | docdb::FileExpirationFromValueTTL(operation.hybrid_time(), ttl)); |
1155 | 8.83M | } |
1156 | 8.83M | return ApplyKeyValueRowOperations( |
1157 | 8.83M | batch_idx, write_batch, frontiers_ptr, hybrid_time, already_applied_to_regular_db); |
1158 | 8.83M | } |
1159 | | |
1160 | | Status Tablet::WriteTransactionalBatch( |
1161 | | int64_t batch_idx, |
1162 | | const KeyValueWriteBatchPB& put_batch, |
1163 | | HybridTime hybrid_time, |
1164 | 2.44M | const rocksdb::UserFrontiers* frontiers) { |
1165 | 2.44M | auto transaction_id = CHECK_RESULT( |
1166 | 2.44M | FullyDecodeTransactionId(put_batch.transaction().transaction_id())); |
1167 | | |
1168 | 2.44M | bool store_metadata = false; |
1169 | 2.44M | if (put_batch.transaction().has_isolation()) { |
1170 | | // Store transaction metadata (status tablet, isolation level etc.) |
1171 | 1.80M | auto metadata = VERIFY_RESULT(TransactionMetadata::FromPB(put_batch.transaction())); |
1172 | 0 | auto add_result = transaction_participant()->Add(metadata); |
1173 | 1.80M | if (!add_result.ok()) { |
1174 | 130k | return add_result.status(); |
1175 | 130k | } |
1176 | 1.67M | store_metadata = add_result.get(); |
1177 | 1.67M | } |
1178 | 2.31M | boost::container::small_vector<uint8_t, 16> encoded_replicated_batch_idx_set; |
1179 | 2.31M | auto prepare_batch_data = transaction_participant()->PrepareBatchData( |
1180 | 2.31M | transaction_id, batch_idx, &encoded_replicated_batch_idx_set); |
1181 | 2.31M | if (!prepare_batch_data) { |
1182 | | // If metadata is missing it could be caused by aborted and removed transaction. |
1183 | | // In this case we should not add new intents for it. |
1184 | 481 | return STATUS(TryAgain, |
1185 | 481 | Format("Transaction metadata missing: $0, looks like it was just aborted", |
1186 | 481 | transaction_id), Slice(), |
1187 | 481 | PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)); |
1188 | 481 | } |
1189 | | |
1190 | 2.31M | auto isolation_level = prepare_batch_data->first; |
1191 | 2.31M | auto& last_batch_data = prepare_batch_data->second; |
1192 | | |
1193 | 2.31M | docdb::TransactionalWriter writer( |
1194 | 2.31M | put_batch, hybrid_time, transaction_id, isolation_level, |
1195 | 2.31M | docdb::PartialRangeKeyIntents(metadata_->UsePartialRangeKeyIntents()), |
1196 | 2.31M | Slice(encoded_replicated_batch_idx_set.data(), encoded_replicated_batch_idx_set.size()), |
1197 | 2.31M | last_batch_data.next_write_id); |
1198 | 2.31M | if (store_metadata) { |
1199 | 1.67M | writer.SetMetadataToStore(&put_batch.transaction()); |
1200 | 1.67M | } |
1201 | 2.31M | rocksdb::WriteBatch write_batch; |
1202 | 2.31M | write_batch.SetDirectWriter(&writer); |
1203 | 2.31M | RequestScope request_scope(transaction_participant_.get()); |
1204 | | |
1205 | 2.31M | WriteToRocksDB(frontiers, &write_batch, StorageDbType::kIntents); |
1206 | | |
1207 | 2.31M | last_batch_data.hybrid_time = hybrid_time; |
1208 | 2.31M | last_batch_data.next_write_id = writer.intra_txn_write_id(); |
1209 | 2.31M | transaction_participant()->BatchReplicated(transaction_id, last_batch_data); |
1210 | | |
1211 | 2.31M | return Status::OK(); |
1212 | 2.31M | } |
1213 | | |
1214 | | Status Tablet::ApplyKeyValueRowOperations( |
1215 | | int64_t batch_idx, |
1216 | | const KeyValueWriteBatchPB& put_batch, |
1217 | | const rocksdb::UserFrontiers* frontiers, |
1218 | | const HybridTime hybrid_time, |
1219 | 8.83M | AlreadyAppliedToRegularDB already_applied_to_regular_db) { |
1220 | 8.83M | if (put_batch.write_pairs().empty() && put_batch.read_pairs().empty()810k && |
1221 | 8.83M | put_batch.apply_external_transactions().empty()39.4k ) { |
1222 | 39.4k | return Status::OK(); |
1223 | 39.4k | } |
1224 | | |
1225 | | // Could return failure only for cases where it is safe to skip applying operations to DB. |
1226 | | // For instance where aborted transaction intents are written. |
1227 | | // In all other cases we should crash instead of skipping apply. |
1228 | | |
1229 | 8.79M | if (put_batch.has_transaction()) { |
1230 | 2.44M | RETURN_NOT_OK(WriteTransactionalBatch(batch_idx, put_batch, hybrid_time, frontiers)); |
1231 | 6.35M | } else { |
1232 | 6.35M | rocksdb::WriteBatch regular_write_batch; |
1233 | 6.35M | auto* regular_write_batch_ptr = !already_applied_to_regular_db ? ®ular_write_batch6.34M : nullptr1.68k ; |
1234 | | |
1235 | | // See comments for PrepareExternalWriteBatch. |
1236 | 6.35M | rocksdb::WriteBatch intents_write_batch; |
1237 | 6.35M | bool has_non_exteranl_records = PrepareExternalWriteBatch( |
1238 | 6.35M | put_batch, hybrid_time, intents_db_.get(), regular_write_batch_ptr, &intents_write_batch); |
1239 | | |
1240 | 6.35M | if (intents_write_batch.Count() != 0) { |
1241 | 0 | if (!metadata_->is_under_twodc_replication()) { |
1242 | 0 | RETURN_NOT_OK(metadata_->SetIsUnderTwodcReplicationAndFlush(true)); |
1243 | 0 | } |
1244 | 0 | WriteToRocksDB(frontiers, &intents_write_batch, StorageDbType::kIntents); |
1245 | 0 | } |
1246 | | |
1247 | 6.35M | docdb::NonTransactionalWriter writer(put_batch, hybrid_time); |
1248 | 6.35M | if (!already_applied_to_regular_db && has_non_exteranl_records6.34M ) { |
1249 | 6.34M | regular_write_batch.SetDirectWriter(&writer); |
1250 | 6.34M | } |
1251 | 6.35M | if (regular_write_batch.Count() != 0 || regular_write_batch.HasDirectWriter()6.34M ) { |
1252 | 6.34M | WriteToRocksDB(frontiers, ®ular_write_batch, StorageDbType::kRegular); |
1253 | 6.34M | } |
1254 | | |
1255 | 6.35M | if (snapshot_coordinator_) { |
1256 | 99.2M | for (const auto& pair : put_batch.write_pairs()) { |
1257 | 99.2M | WARN_NOT_OK(snapshot_coordinator_->ApplyWritePair(pair.key(), pair.value()), |
1258 | 99.2M | "ApplyWritePair failed"); |
1259 | 99.2M | } |
1260 | 779k | } |
1261 | 6.35M | } |
1262 | | |
1263 | 8.66M | return Status::OK(); |
1264 | 8.79M | } |
1265 | | |
1266 | | void Tablet::WriteToRocksDB( |
1267 | | const rocksdb::UserFrontiers* frontiers, |
1268 | | rocksdb::WriteBatch* write_batch, |
1269 | 11.6M | docdb::StorageDbType storage_db_type) { |
1270 | 11.6M | rocksdb::DB* dest_db = nullptr; |
1271 | 11.6M | switch (storage_db_type) { |
1272 | 7.65M | case StorageDbType::kRegular: dest_db = regular_db_.get(); break; |
1273 | 3.97M | case StorageDbType::kIntents: dest_db = intents_db_.get(); break; |
1274 | 11.6M | } |
1275 | | |
1276 | | // Frontiers can be null for deferred apply operations. |
1277 | 11.6M | if (11.6M frontiers11.6M ) { |
1278 | 11.6M | write_batch->SetFrontiers(frontiers); |
1279 | 11.6M | } |
1280 | | |
1281 | | // We are using Raft replication index for the RocksDB sequence number for |
1282 | | // all members of this write batch. |
1283 | 11.6M | rocksdb::WriteOptions write_options; |
1284 | 11.6M | InitRocksDBWriteOptions(&write_options); |
1285 | | |
1286 | 11.6M | auto rocksdb_write_status = dest_db->Write(write_options, write_batch); |
1287 | 11.6M | if (!rocksdb_write_status.ok()) { |
1288 | 0 | LOG_WITH_PREFIX(FATAL) << "Failed to write a batch with " << write_batch->Count() |
1289 | 0 | << " operations into RocksDB: " << rocksdb_write_status; |
1290 | 0 | } |
1291 | | |
1292 | 11.6M | if (FLAGS_TEST_docdb_log_write_batches) { |
1293 | 0 | LOG_WITH_PREFIX(INFO) |
1294 | 0 | << "Wrote " << write_batch->Count() << " key/value pairs to " << storage_db_type |
1295 | 0 | << " RocksDB:\n" << docdb::WriteBatchToString( |
1296 | 0 | *write_batch, |
1297 | 0 | storage_db_type, |
1298 | 0 | BinaryOutputFormat::kEscapedAndHex, |
1299 | 0 | WriteBatchOutputFormat::kArrow, |
1300 | 0 | " " + LogPrefix(storage_db_type)); |
1301 | 0 | } |
1302 | 11.6M | } |
1303 | | |
1304 | | //-------------------------------------------------------------------------------------------------- |
1305 | | // Redis Request Processing. |
1306 | | Status Tablet::HandleRedisReadRequest(CoarseTimePoint deadline, |
1307 | | const ReadHybridTime& read_time, |
1308 | | const RedisReadRequestPB& redis_read_request, |
1309 | 84.6k | RedisResponsePB* response) { |
1310 | | // TODO: move this locking to the top-level read request handler in TabletService. |
1311 | 84.6k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline); |
1312 | 84.6k | RETURN_NOT_OK(scoped_read_operation); |
1313 | | |
1314 | 84.6k | ScopedTabletMetricsTracker metrics_tracker(metrics_->redis_read_latency); |
1315 | | |
1316 | 84.6k | docdb::RedisReadOperation doc_op(redis_read_request, doc_db(), deadline, read_time); |
1317 | 84.6k | RETURN_NOT_OK(doc_op.Execute()); |
1318 | 84.6k | *response = std::move(doc_op.response()); |
1319 | 84.6k | return Status::OK(); |
1320 | 84.6k | } |
1321 | | |
1322 | | bool IsSchemaVersionCompatible( |
1323 | 11.7M | uint32_t current_version, uint32_t request_version, bool compatible_with_previous_version) { |
1324 | 11.7M | if (request_version == current_version) { |
1325 | 11.7M | return true; |
1326 | 11.7M | } |
1327 | | |
1328 | 1.66k | if (compatible_with_previous_version && request_version == current_version + 11.11k ) { |
1329 | 79 | DVLOG(1) << (0 FLAGS_yql_allow_compatible_schema_versions0 ? "A"0 : "Not a"0 ) |
1330 | 0 | << "ccepting request that is ahead of us by 1 version"; |
1331 | 79 | return FLAGS_yql_allow_compatible_schema_versions; |
1332 | 79 | } |
1333 | | |
1334 | 1.58k | return false; |
1335 | 1.66k | } |
1336 | | |
1337 | | //-------------------------------------------------------------------------------------------------- |
1338 | | // CQL Request Processing. |
1339 | | Status Tablet::HandleQLReadRequest( |
1340 | | CoarseTimePoint deadline, |
1341 | | const ReadHybridTime& read_time, |
1342 | | const QLReadRequestPB& ql_read_request, |
1343 | | const TransactionMetadataPB& transaction_metadata, |
1344 | 7.28M | QLReadRequestResult* result) { |
1345 | 7.28M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline); |
1346 | 7.28M | RETURN_NOT_OK(scoped_read_operation); |
1347 | 7.28M | ScopedTabletMetricsTracker metrics_tracker(metrics_->ql_read_latency); |
1348 | | |
1349 | 7.28M | if (!IsSchemaVersionCompatible( |
1350 | 7.28M | metadata()->schema_version(), ql_read_request.schema_version(), |
1351 | 7.28M | ql_read_request.is_compatible_with_previous_version())) { |
1352 | 140 | DVLOG(1) << "Setting status for read as YQL_STATUS_SCHEMA_VERSION_MISMATCH"0 ; |
1353 | 140 | result->response.set_status(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH); |
1354 | 140 | result->response.set_error_message(Format( |
1355 | 140 | "schema version mismatch for table $0: expected $1, got $2 (compt with prev: $3)", |
1356 | 140 | metadata()->table_id(), |
1357 | 140 | metadata()->schema_version(), |
1358 | 140 | ql_read_request.schema_version(), |
1359 | 140 | ql_read_request.is_compatible_with_previous_version())); |
1360 | 140 | return Status::OK(); |
1361 | 140 | } |
1362 | | |
1363 | 7.28M | Result<TransactionOperationContext> txn_op_ctx = |
1364 | 7.28M | CreateTransactionOperationContext(transaction_metadata, /* is_ysql_catalog_table */ false); |
1365 | 7.28M | RETURN_NOT_OK(txn_op_ctx); |
1366 | 7.28M | return AbstractTablet::HandleQLReadRequest( |
1367 | 7.28M | deadline, read_time, ql_read_request, *txn_op_ctx, result); |
1368 | 7.28M | } |
1369 | | |
1370 | | CHECKED_STATUS Tablet::CreatePagingStateForRead(const QLReadRequestPB& ql_read_request, |
1371 | | const size_t row_count, |
1372 | 7.26M | QLResponsePB* response) const { |
1373 | | |
1374 | | // If the response does not have a next partition key, it means we are done reading the current |
1375 | | // tablet. But, if the request does not have the hash columns set, this must be a table-scan, |
1376 | | // so we need to decide if we are done or if we need to move to the next tablet. |
1377 | | // If we did not reach the: |
1378 | | // 1. max number of results (LIMIT clause -- if set) |
1379 | | // 2. end of the table (this was the last tablet) |
1380 | | // 3. max partition key (upper bound condition using 'token' -- if set) |
1381 | | // we set the paging state to point to the exclusive end partition key of this tablet, which is |
1382 | | // the start key of the next tablet). |
1383 | 7.26M | if (ql_read_request.hashed_column_values().empty() && |
1384 | 7.26M | !response->paging_state().has_next_partition_key()43.8k ) { |
1385 | | // Check we did not reach the results limit. |
1386 | | // If return_paging_state is set, it means the request limit is actually just the page size. |
1387 | 43.2k | if (!ql_read_request.has_limit() || |
1388 | 43.2k | row_count < ql_read_request.limit()42.3k || |
1389 | 43.2k | ql_read_request.return_paging_state()119 ) { |
1390 | | |
1391 | | // Check we did not reach the last tablet. |
1392 | 43.2k | const string& next_partition_key = metadata_->partition()->partition_key_end(); |
1393 | 43.2k | if (!next_partition_key.empty()) { |
1394 | 34.1k | uint16_t next_hash_code = PartitionSchema::DecodeMultiColumnHashValue(next_partition_key); |
1395 | | |
1396 | | // Check we did not reach the max partition key. |
1397 | 34.1k | if (!ql_read_request.has_max_hash_code() || |
1398 | 34.1k | next_hash_code <= ql_read_request.max_hash_code()138 ) { |
1399 | 34.0k | response->mutable_paging_state()->set_next_partition_key(next_partition_key); |
1400 | 34.0k | } |
1401 | 34.1k | } |
1402 | 43.2k | } |
1403 | 43.2k | } |
1404 | | |
1405 | | // If there is a paging state, update the total number of rows read so far. |
1406 | 7.26M | if (response->has_paging_state()) { |
1407 | 35.1k | response->mutable_paging_state()->set_total_num_rows_read( |
1408 | 35.1k | ql_read_request.paging_state().total_num_rows_read() + row_count); |
1409 | 35.1k | } |
1410 | 7.26M | return Status::OK(); |
1411 | 7.26M | } |
1412 | | |
1413 | | //-------------------------------------------------------------------------------------------------- |
1414 | | // PGSQL Request Processing. |
1415 | | //-------------------------------------------------------------------------------------------------- |
1416 | | Status Tablet::HandlePgsqlReadRequest( |
1417 | | CoarseTimePoint deadline, |
1418 | | const ReadHybridTime& read_time, |
1419 | | bool is_explicit_request_read_time, |
1420 | | const PgsqlReadRequestPB& pgsql_read_request, |
1421 | | const TransactionMetadataPB& transaction_metadata, |
1422 | | const SubTransactionMetadataPB& subtransaction_metadata, |
1423 | | PgsqlReadRequestResult* result, |
1424 | 3.47M | size_t* num_rows_read) { |
1425 | 3.47M | TRACE(LogPrefix()); |
1426 | 3.47M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline); |
1427 | 3.47M | RETURN_NOT_OK(scoped_read_operation); |
1428 | | // TODO(neil) Work on metrics for PGSQL. |
1429 | | // ScopedTabletMetricsTracker metrics_tracker(metrics_->pgsql_read_latency); |
1430 | | |
1431 | 3.47M | const shared_ptr<tablet::TableInfo> table_info = |
1432 | 3.47M | VERIFY_RESULT(metadata_->GetTableInfo(pgsql_read_request.table_id())); |
1433 | | // Assert the table is a Postgres table. |
1434 | 0 | DCHECK_EQ(table_info->table_type, TableType::PGSQL_TABLE_TYPE); |
1435 | 3.47M | if (table_info->schema_version != pgsql_read_request.schema_version()) { |
1436 | 62 | result->response.set_status(PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH); |
1437 | 62 | result->response.set_error_message( |
1438 | 62 | Format("schema version mismatch for table $0: expected $1, got $2", |
1439 | 62 | table_info->table_id, |
1440 | 62 | table_info->schema_version, |
1441 | 62 | pgsql_read_request.schema_version())); |
1442 | 62 | return Status::OK(); |
1443 | 62 | } |
1444 | | |
1445 | 3.47M | Result<TransactionOperationContext> txn_op_ctx = |
1446 | 3.47M | CreateTransactionOperationContext( |
1447 | 3.47M | transaction_metadata, |
1448 | 3.47M | table_info->schema->table_properties().is_ysql_catalog_table(), |
1449 | 3.47M | &subtransaction_metadata); |
1450 | 3.47M | RETURN_NOT_OK(txn_op_ctx); |
1451 | 3.47M | return AbstractTablet::HandlePgsqlReadRequest( |
1452 | 3.47M | deadline, read_time, is_explicit_request_read_time, |
1453 | 3.47M | pgsql_read_request, *txn_op_ctx, result, num_rows_read); |
1454 | 3.47M | } |
1455 | | |
1456 | | // Returns true if the query can be satisfied by rows present in current tablet. |
1457 | | // Returns false if query requires other tablets to also be scanned. Examples of this include: |
1458 | | // (1) full table scan queries |
1459 | | // (2) queries that whose key conditions are such that the query will require a multi tablet |
1460 | | // scan. |
1461 | | // |
1462 | | // Requests that are of the form batched index lookups of ybctids are sent only to a single tablet. |
1463 | | // However there can arise situations where tablets splitting occurs after such requests are being |
1464 | | // prepared by the pggate layer (specifically pg_doc_op.cc). Under such circumstances, if tablets |
1465 | | // are split into two sub-tablets, then such batched index lookups of ybctid requests should be sent |
1466 | | // to multiple tablets (the two sub-tablets). Hence, the request ends up not being a single tablet |
1467 | | // request. |
1468 | | Result<bool> Tablet::IsQueryOnlyForTablet( |
1469 | 3.46M | const PgsqlReadRequestPB& pgsql_read_request, size_t row_count) const { |
1470 | 3.46M | if ((!pgsql_read_request.ybctid_column_value().value().binary_value().empty() && |
1471 | 3.46M | (16.3k implicit_cast<size_t>(pgsql_read_request.batch_arguments_size()) == row_count16.3k || |
1472 | 16.3k | pgsql_read_request.batch_arguments_size() == 010.3k )) || |
1473 | 3.46M | !pgsql_read_request.partition_column_values().empty()3.45M ) { |
1474 | 2.05M | return true; |
1475 | 2.05M | } |
1476 | | |
1477 | 1.41M | std::shared_ptr<const Schema> schema = metadata_->schema(); |
1478 | 1.41M | if (schema->has_cotable_id() || schema->has_colocation_id()1.41M ) { |
1479 | | // This is a colocated table. |
1480 | 0 | return true; |
1481 | 0 | } |
1482 | | |
1483 | 1.41M | if (schema->num_hash_key_columns() == 0 && |
1484 | 1.41M | schema->num_range_key_columns() == |
1485 | 1.24M | implicit_cast<size_t>(pgsql_read_request.range_column_values_size())) { |
1486 | | // PK is contained within this tablet. |
1487 | 81.1k | return true; |
1488 | 81.1k | } |
1489 | 1.33M | return false; |
1490 | 1.41M | } |
1491 | | |
1492 | | Result<bool> Tablet::HasScanReachedMaxPartitionKey( |
1493 | | const PgsqlReadRequestPB& pgsql_read_request, |
1494 | | const string& partition_key, |
1495 | 100k | size_t row_count) const { |
1496 | 100k | auto schema = metadata_->schema(); |
1497 | | |
1498 | 100k | if (schema->num_hash_key_columns() > 0) { |
1499 | 100k | uint16_t next_hash_code = PartitionSchema::DecodeMultiColumnHashValue(partition_key); |
1500 | | // For batched index lookup of ybctids, check if the current partition hash is lesser than |
1501 | | // upper bound. If it is, we can then avoid paging. Paging of batched index lookup of ybctids |
1502 | | // occur when tablets split after request is prepared. |
1503 | 100k | if (pgsql_read_request.has_ybctid_column_value() && |
1504 | 100k | implicit_cast<size_t>(pgsql_read_request.batch_arguments_size()) > row_count290 ) { |
1505 | 290 | if (!pgsql_read_request.upper_bound().has_key()) { |
1506 | 0 | return false; |
1507 | 0 | } |
1508 | 290 | uint16_t upper_bound_hash = |
1509 | 290 | PartitionSchema::DecodeMultiColumnHashValue(pgsql_read_request.upper_bound().key()); |
1510 | 290 | uint16_t partition_hash = |
1511 | 290 | PartitionSchema::DecodeMultiColumnHashValue(partition_key); |
1512 | 290 | return pgsql_read_request.upper_bound().is_inclusive() ? |
1513 | 0 | partition_hash > upper_bound_hash : |
1514 | 290 | partition_hash >= upper_bound_hash; |
1515 | 290 | } |
1516 | 100k | if (pgsql_read_request.has_max_hash_code() && |
1517 | 100k | next_hash_code > pgsql_read_request.max_hash_code()14.1k ) { |
1518 | 14.1k | return true; |
1519 | 14.1k | } |
1520 | 100k | } else if (260 pgsql_read_request.has_upper_bound()260 ) { |
1521 | 154 | docdb::DocKey partition_doc_key(*schema); |
1522 | 154 | VERIFY_RESULT(partition_doc_key.DecodeFrom( |
1523 | 0 | partition_key, docdb::DocKeyPart::kWholeDocKey, docdb::AllowSpecial::kTrue)); |
1524 | 0 | docdb::DocKey max_partition_doc_key(*schema); |
1525 | 154 | VERIFY_RESULT(max_partition_doc_key.DecodeFrom( |
1526 | 0 | pgsql_read_request.upper_bound().key(), docdb::DocKeyPart::kWholeDocKey, |
1527 | 0 | docdb::AllowSpecial::kTrue)); |
1528 | | |
1529 | 0 | auto cmp = partition_doc_key.CompareTo(max_partition_doc_key); |
1530 | 154 | return pgsql_read_request.upper_bound().is_inclusive() ? cmp > 0118 : cmp >= 036 ; |
1531 | 154 | } |
1532 | | |
1533 | 86.2k | return false; |
1534 | 100k | } |
1535 | | |
1536 | | namespace { |
1537 | | |
1538 | | void SetBackfillSpecForYsqlBackfill( |
1539 | | const PgsqlReadRequestPB& pgsql_read_request, |
1540 | | const size_t& row_count, |
1541 | 2.33k | PgsqlResponsePB* response) { |
1542 | 2.33k | PgsqlBackfillSpecPB in_spec; |
1543 | 2.33k | in_spec.ParseFromString(a2b_hex(pgsql_read_request.backfill_spec())); |
1544 | | |
1545 | 2.33k | auto limit = in_spec.limit(); |
1546 | 2.33k | PgsqlBackfillSpecPB out_spec; |
1547 | 2.33k | out_spec.set_limit(limit); |
1548 | 2.33k | out_spec.set_count(in_spec.count() + row_count); |
1549 | 2.33k | response->set_is_backfill_batch_done(!response->has_paging_state()); |
1550 | 18.4E | VLOG(2) << " limit is " << limit << " set_count to " << out_spec.count(); |
1551 | 2.33k | if (limit >= 02.33k && out_spec.count() >= limit) { |
1552 | | // Hint postgres to stop scanning now. And set up the |
1553 | | // next_row_key based on the paging state. |
1554 | 507 | if (response->has_paging_state()) { |
1555 | 390 | out_spec.set_next_row_key(response->paging_state().next_row_key()); |
1556 | 390 | } |
1557 | 507 | response->set_is_backfill_batch_done(true); |
1558 | 507 | } |
1559 | | |
1560 | 2.33k | VLOG(2) << "Got input spec " << yb::ToString(in_spec) |
1561 | 0 | << " set output spec " << yb::ToString(out_spec) |
1562 | 0 | << " batch_done=" << response->is_backfill_batch_done(); |
1563 | 2.33k | string serialized_pb; |
1564 | 2.33k | out_spec.SerializeToString(&serialized_pb); |
1565 | 2.33k | response->set_backfill_spec(b2a_hex(serialized_pb)); |
1566 | 2.33k | } |
1567 | | |
1568 | | } // namespace |
1569 | | |
1570 | | CHECKED_STATUS Tablet::CreatePagingStateForRead(const PgsqlReadRequestPB& pgsql_read_request, |
1571 | | const size_t row_count, |
1572 | 3.47M | PgsqlResponsePB* response) const { |
1573 | | // If there is no hash column in the read request, this is a full-table query. And if there is no |
1574 | | // paging state in the response, we are done reading from the current tablet. In this case, we |
1575 | | // should return the exclusive end partition key of this tablet if not empty which is the start |
1576 | | // key of the next tablet. Do so only if the request has no row count limit, or there is and we |
1577 | | // haven't hit it, or we are asked to return paging state even when we have hit the limit. |
1578 | | // Otherwise, leave the paging state empty which means we are completely done reading for the |
1579 | | // whole SELECT statement. |
1580 | 3.47M | const bool single_tablet_query = |
1581 | 3.47M | VERIFY_RESULT(IsQueryOnlyForTablet(pgsql_read_request, row_count)); |
1582 | 3.47M | if (!single_tablet_query && |
1583 | 3.47M | !response->has_paging_state()1.33M && |
1584 | 3.47M | (1.27M !pgsql_read_request.has_limit()1.27M || row_count < pgsql_read_request.limit()1.27M || |
1585 | 1.27M | pgsql_read_request.return_paging_state()162 )) { |
1586 | | // For backward scans partition_key_start must be used as next_partition_key. |
1587 | | // Client level logic will check it and route next request to the preceding tablet. |
1588 | 1.27M | const auto& next_partition_key = |
1589 | 1.27M | pgsql_read_request.has_hash_code() || |
1590 | 1.27M | pgsql_read_request.is_forward_scan()1.18M |
1591 | 1.27M | ? metadata_->partition()->partition_key_end()1.27M |
1592 | 1.27M | : metadata_->partition()->partition_key_start()49 ; |
1593 | | // Check we did not reach the last tablet. |
1594 | 1.27M | const bool end_scan = next_partition_key.empty() || |
1595 | 1.27M | VERIFY_RESULT(HasScanReachedMaxPartitionKey( |
1596 | 1.27M | pgsql_read_request, next_partition_key, row_count)); |
1597 | 1.27M | if (!end_scan) { |
1598 | 86.3k | response->mutable_paging_state()->set_next_partition_key(next_partition_key); |
1599 | 86.3k | } |
1600 | 1.27M | } |
1601 | | |
1602 | | // If there is a paging state, update the total number of rows read so far. |
1603 | 3.47M | if (response->has_paging_state()) { |
1604 | 149k | response->mutable_paging_state()->set_total_num_rows_read( |
1605 | 149k | pgsql_read_request.paging_state().total_num_rows_read() + row_count); |
1606 | 149k | } |
1607 | | |
1608 | 3.47M | if (pgsql_read_request.is_for_backfill()) { |
1609 | | // BackfillSpec is used to implement "paging" across multiple BackfillIndex |
1610 | | // rpcs from the master. |
1611 | 2.33k | SetBackfillSpecForYsqlBackfill(pgsql_read_request, row_count, response); |
1612 | 2.33k | } |
1613 | 3.47M | return Status::OK(); |
1614 | 3.47M | } |
1615 | | |
1616 | | //-------------------------------------------------------------------------------------------------- |
1617 | | |
1618 | 3.26M | void Tablet::AcquireLocksAndPerformDocOperations(std::unique_ptr<WriteQuery> query) { |
1619 | 3.26M | TRACE(__func__); |
1620 | 3.26M | if (table_type_ == TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
1621 | 0 | query->Cancel( |
1622 | 0 | STATUS(NotSupported, "Transaction status table does not support write")); |
1623 | 0 | return; |
1624 | 0 | } |
1625 | | |
1626 | 3.26M | if (!GetAtomicFlag(&FLAGS_disable_alter_vs_write_mutual_exclusion)) { |
1627 | 3.25M | auto write_permit = GetPermitToWrite(query->deadline()); |
1628 | 3.25M | if (!write_permit.ok()) { |
1629 | 16.3k | TRACE("Could not get the write permit."); |
1630 | 16.3k | WriteQuery::StartSynchronization(std::move(query), MoveStatus(write_permit)); |
1631 | 16.3k | return; |
1632 | 16.3k | } |
1633 | | // Save the write permit to be released after the operation is submitted |
1634 | | // to Raft queue. |
1635 | 3.24M | query->UseSubmitToken(std::move(write_permit)); |
1636 | 3.24M | } |
1637 | | |
1638 | 3.24M | WriteQuery::Execute(std::move(query)); |
1639 | 3.24M | } |
1640 | | |
1641 | 174k | Status Tablet::Flush(FlushMode mode, FlushFlags flags, int64_t ignore_if_flushed_after_tick) { |
1642 | 174k | TRACE_EVENT0("tablet", "Tablet::Flush"); |
1643 | | |
1644 | 174k | ScopedRWOperation pending_op; |
1645 | 174k | if (!HasFlags(flags, FlushFlags::kNoScopedOperation)) { |
1646 | 2.84k | pending_op = CreateNonAbortableScopedRWOperation(); |
1647 | 2.84k | LOG_IF(DFATAL, !pending_op.ok()) << "CreateNonAbortableScopedRWOperation failed"1 ; |
1648 | 2.84k | RETURN_NOT_OK(pending_op); |
1649 | 2.84k | } |
1650 | | |
1651 | 174k | rocksdb::FlushOptions options; |
1652 | 174k | options.ignore_if_flushed_after_tick = ignore_if_flushed_after_tick; |
1653 | 174k | bool flush_intents = intents_db_ && HasFlags(flags, FlushFlags::kIntents)89.1k ; |
1654 | 174k | if (flush_intents) { |
1655 | 89.1k | options.wait = false; |
1656 | 89.1k | WARN_NOT_OK(intents_db_->Flush(options), "Flush intents DB"); |
1657 | 89.1k | } |
1658 | | |
1659 | 174k | if (HasFlags(flags, FlushFlags::kRegular) && regular_db_174k ) { |
1660 | 174k | options.wait = mode == FlushMode::kSync; |
1661 | 174k | WARN_NOT_OK(regular_db_->Flush(options), "Flush regular DB"); |
1662 | 174k | } |
1663 | | |
1664 | 174k | if (flush_intents && mode == FlushMode::kSync89.1k ) { |
1665 | 95 | RETURN_NOT_OK(intents_db_->WaitForFlush()); |
1666 | 95 | } |
1667 | | |
1668 | 174k | return Status::OK(); |
1669 | 174k | } |
1670 | | |
1671 | 50 | Status Tablet::WaitForFlush() { |
1672 | 50 | TRACE_EVENT0("tablet", "Tablet::WaitForFlush"); |
1673 | | |
1674 | 50 | if (regular_db_) { |
1675 | 50 | RETURN_NOT_OK(regular_db_->WaitForFlush()); |
1676 | 50 | } |
1677 | 50 | if (intents_db_) { |
1678 | 29 | RETURN_NOT_OK(intents_db_->WaitForFlush()); |
1679 | 29 | } |
1680 | | |
1681 | 50 | return Status::OK(); |
1682 | 50 | } |
1683 | | |
1684 | 0 | Status Tablet::ImportData(const std::string& source_dir) { |
1685 | | // We import only regular records, so don't have to deal with intents here. |
1686 | 0 | return regular_db_->Import(source_dir); |
1687 | 0 | } |
1688 | | |
1689 | | // We apply intents by iterating over whole transaction reverse index. |
1690 | | // Using value of reverse index record we find original intent record and apply it. |
1691 | | // After that we delete both intent record and reverse index record. |
1692 | 1.30M | Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApplyData& data) { |
1693 | 1.30M | VLOG_WITH_PREFIX796 (4) << __func__ << ": " << data.transaction_id796 ; |
1694 | | |
1695 | | // This flag enables tests to induce a situation where a transaction has committed but its intents |
1696 | | // haven't yet moved to regular db for a sufficiently long period. For example, it can help a test |
1697 | | // to reliably assert that conflict resolution/ concurrency control with a conflicting committed |
1698 | | // transaction is done properly in the rare situation where the committed transaction's intents |
1699 | | // are still in intents db and not yet in regular db. |
1700 | 1.30M | AtomicFlagSleepMs(&FLAGS_TEST_inject_sleep_before_applying_intents_ms); |
1701 | 1.30M | docdb::ApplyIntentsContext context( |
1702 | 1.30M | data.transaction_id, data.apply_state, data.aborted, data.commit_ht, data.log_ht, |
1703 | 1.30M | &key_bounds_, intents_db_.get()); |
1704 | 1.30M | docdb::IntentsWriter intents_writer( |
1705 | 1.30M | data.apply_state ? data.apply_state->key173 : Slice()1.30M , intents_db_.get(), &context); |
1706 | 1.30M | rocksdb::WriteBatch regular_write_batch; |
1707 | 1.30M | regular_write_batch.SetDirectWriter(&intents_writer); |
1708 | | // data.hybrid_time contains transaction commit time. |
1709 | | // We don't set transaction field of put_batch, otherwise we would write another bunch of intents. |
1710 | 1.30M | docdb::ConsensusFrontiers frontiers; |
1711 | 1.30M | auto frontiers_ptr = data.op_id.empty() ? nullptr0 : InitFrontiers(data, &frontiers); |
1712 | 1.30M | WriteToRocksDB(frontiers_ptr, ®ular_write_batch, StorageDbType::kRegular); |
1713 | 1.30M | return context.apply_state(); |
1714 | 1.30M | } |
1715 | | |
1716 | | template <class Ids> |
1717 | 1.67M | CHECKED_STATUS Tablet::RemoveIntentsImpl(const RemoveIntentsData& data, const Ids& ids) { |
1718 | 1.67M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
1719 | 1.67M | RETURN_NOT_OK(scoped_read_operation); |
1720 | | |
1721 | 1.67M | rocksdb::WriteBatch intents_write_batch; |
1722 | 1.67M | for (const auto& id : ids) { |
1723 | 1.67M | boost::optional<docdb::ApplyTransactionState> apply_state; |
1724 | 1.67M | for (;;) { |
1725 | 1.67M | docdb::RemoveIntentsContext context(id); |
1726 | 1.67M | docdb::IntentsWriter writer( |
1727 | 1.67M | apply_state ? apply_state->key637 : Slice()1.67M , intents_db_.get(), &context); |
1728 | 1.67M | intents_write_batch.SetDirectWriter(&writer); |
1729 | 1.67M | docdb::ConsensusFrontiers frontiers; |
1730 | 1.67M | auto frontiers_ptr = InitFrontiers(data, &frontiers); |
1731 | 1.67M | WriteToRocksDB(frontiers_ptr, &intents_write_batch, StorageDbType::kIntents); |
1732 | | |
1733 | 1.67M | if (!context.apply_state().active()) { |
1734 | 1.67M | break; |
1735 | 1.67M | } |
1736 | | |
1737 | 1.05k | apply_state = std::move(context.apply_state()); |
1738 | 1.05k | intents_write_batch.Clear(); |
1739 | | |
1740 | 1.05k | AtomicFlagSleepMs(&FLAGS_apply_intents_task_injected_delay_ms); |
1741 | 1.05k | } |
1742 | 1.67M | } |
1743 | | |
1744 | 1.67M | return Status::OK(); |
1745 | 1.67M | } yb::Status yb::tablet::Tablet::RemoveIntentsImpl<std::initializer_list<yb::StronglyTypedUuid<yb::TransactionId_Tag> > >(yb::tablet::RemoveIntentsData const&, std::initializer_list<yb::StronglyTypedUuid<yb::TransactionId_Tag> > const&) Line | Count | Source | 1717 | 1.67M | CHECKED_STATUS Tablet::RemoveIntentsImpl(const RemoveIntentsData& data, const Ids& ids) { | 1718 | 1.67M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); | 1719 | 1.67M | RETURN_NOT_OK(scoped_read_operation); | 1720 | | | 1721 | 1.67M | rocksdb::WriteBatch intents_write_batch; | 1722 | 1.67M | for (const auto& id : ids) { | 1723 | 1.67M | boost::optional<docdb::ApplyTransactionState> apply_state; | 1724 | 1.67M | for (;;) { | 1725 | 1.67M | docdb::RemoveIntentsContext context(id); | 1726 | 1.67M | docdb::IntentsWriter writer( | 1727 | 1.67M | apply_state ? apply_state->key637 : Slice()1.67M , intents_db_.get(), &context); | 1728 | 1.67M | intents_write_batch.SetDirectWriter(&writer); | 1729 | 1.67M | docdb::ConsensusFrontiers frontiers; | 1730 | 1.67M | auto frontiers_ptr = InitFrontiers(data, &frontiers); | 1731 | 1.67M | WriteToRocksDB(frontiers_ptr, &intents_write_batch, StorageDbType::kIntents); | 1732 | | | 1733 | 1.67M | if (!context.apply_state().active()) { | 1734 | 1.67M | break; | 1735 | 1.67M | } | 1736 | | | 1737 | 1.05k | apply_state = std::move(context.apply_state()); | 1738 | 1.05k | intents_write_batch.Clear(); | 1739 | | | 1740 | 1.05k | AtomicFlagSleepMs(&FLAGS_apply_intents_task_injected_delay_ms); | 1741 | 1.05k | } | 1742 | 1.67M | } | 1743 | | | 1744 | 1.67M | return Status::OK(); | 1745 | 1.67M | } |
Unexecuted instantiation: yb::Status yb::tablet::Tablet::RemoveIntentsImpl<std::__1::unordered_set<yb::StronglyTypedUuid<yb::TransactionId_Tag>, boost::hash<yb::StronglyTypedUuid<yb::TransactionId_Tag> >, std::__1::equal_to<yb::StronglyTypedUuid<yb::TransactionId_Tag> >, std::__1::allocator<yb::StronglyTypedUuid<yb::TransactionId_Tag> > > >(yb::tablet::RemoveIntentsData const&, std::__1::unordered_set<yb::StronglyTypedUuid<yb::TransactionId_Tag>, boost::hash<yb::StronglyTypedUuid<yb::TransactionId_Tag> >, std::__1::equal_to<yb::StronglyTypedUuid<yb::TransactionId_Tag> >, std::__1::allocator<yb::StronglyTypedUuid<yb::TransactionId_Tag> > > const&) |
1746 | | |
1747 | | |
1748 | 1.67M | Status Tablet::RemoveIntents(const RemoveIntentsData& data, const TransactionId& id) { |
1749 | 1.67M | return RemoveIntentsImpl(data, std::initializer_list<TransactionId>{id}); |
1750 | 1.67M | } |
1751 | | |
1752 | 0 | Status Tablet::RemoveIntents(const RemoveIntentsData& data, const TransactionIdSet& transactions) { |
1753 | 0 | return RemoveIntentsImpl(data, transactions); |
1754 | 0 | } |
1755 | | |
1756 | | // We batch this as some tx could be very large and may not fit in one batch |
1757 | | CHECKED_STATUS Tablet::GetIntents( |
1758 | | const TransactionId& id, |
1759 | | std::vector<docdb::IntentKeyValueForCDC>* key_value_intents, |
1760 | 147 | docdb::ApplyTransactionState* stream_state) { |
1761 | 147 | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
1762 | 147 | RETURN_NOT_OK(scoped_read_operation); |
1763 | | |
1764 | 147 | docdb::ApplyTransactionState new_stream_state; |
1765 | | |
1766 | 147 | new_stream_state = VERIFY_RESULT( |
1767 | 0 | docdb::GetIntentsBatch(id, &key_bounds_, stream_state, intents_db_.get(), key_value_intents)); |
1768 | 0 | stream_state->key = new_stream_state.key; |
1769 | 147 | stream_state->write_id = new_stream_state.write_id; |
1770 | | |
1771 | 147 | return Status::OK(); |
1772 | 147 | } |
1773 | | |
1774 | 0 | Result<HybridTime> Tablet::ApplierSafeTime(HybridTime min_allowed, CoarseTimePoint deadline) { |
1775 | | // We could not use mvcc_ directly, because correct lease should be passed to it. |
1776 | 0 | return SafeTime(RequireLease::kFalse, min_allowed, deadline); |
1777 | 0 | } |
1778 | | |
1779 | | Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::CreateCDCSnapshotIterator( |
1780 | 12 | const Schema& projection, const ReadHybridTime& time, const string& next_key) { |
1781 | 12 | VLOG_WITH_PREFIX0 (2) << "The nextKey is " << next_key0 ; |
1782 | | |
1783 | 12 | Slice next_slice; |
1784 | 12 | if (!next_key.empty()) { |
1785 | 0 | SubDocKey start_sub_doc_key; |
1786 | 0 | docdb::KeyBytes start_key_bytes(next_key); |
1787 | 0 | RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice())); |
1788 | 0 | next_slice = start_sub_doc_key.doc_key().Encode().AsSlice(); |
1789 | 0 | VLOG_WITH_PREFIX(2) << "The nextKey doc is " << next_key; |
1790 | 0 | } |
1791 | 12 | return NewRowIterator( |
1792 | 12 | projection, time, "", CoarseTimePoint::max(), AllowBootstrappingState::kFalse, next_slice); |
1793 | 12 | } |
1794 | | |
1795 | | Status Tablet::CreatePreparedChangeMetadata( |
1796 | 1.10M | ChangeMetadataOperation *operation, const Schema* schema) { |
1797 | 1.10M | if (schema) { |
1798 | 73.4k | auto key_schema = GetKeySchema(operation->has_table_id() ? operation->table_id()72.7k : ""726 ); |
1799 | 73.4k | if (!key_schema.KeyEquals(*schema)) { |
1800 | 0 | return STATUS_FORMAT( |
1801 | 0 | InvalidArgument, |
1802 | 0 | "Schema keys cannot be altered. New schema key: $0. Existing schema key: $1", |
1803 | 0 | schema->CreateKeyProjection(), |
1804 | 0 | key_schema); |
1805 | 0 | } |
1806 | | |
1807 | 73.4k | if (!schema->has_column_ids()) { |
1808 | | // this probably means that the request is not from the Master |
1809 | 0 | return STATUS(InvalidArgument, "Missing Column IDs"); |
1810 | 0 | } |
1811 | 73.4k | } |
1812 | | |
1813 | 1.10M | operation->set_schema(schema); |
1814 | 1.10M | return Status::OK(); |
1815 | 1.10M | } |
1816 | | |
1817 | 1.02M | Status Tablet::AddTable(const TableInfoPB& table_info) { |
1818 | 1.02M | Schema schema; |
1819 | 1.02M | RETURN_NOT_OK(SchemaFromPB(table_info.schema(), &schema)); |
1820 | | |
1821 | 1.02M | PartitionSchema partition_schema; |
1822 | 1.02M | RETURN_NOT_OK(PartitionSchema::FromPB(table_info.partition_schema(), schema, &partition_schema)); |
1823 | | |
1824 | 1.02M | metadata_->AddTable( |
1825 | 1.02M | table_info.table_id(), table_info.namespace_name(), table_info.table_name(), |
1826 | 1.02M | table_info.table_type(), schema, IndexMap(), partition_schema, boost::none, |
1827 | 1.02M | table_info.schema_version()); |
1828 | | |
1829 | 1.02M | RETURN_NOT_OK(metadata_->Flush()); |
1830 | | |
1831 | 1.02M | return Status::OK(); |
1832 | 1.02M | } |
1833 | | |
1834 | 237 | Status Tablet::RemoveTable(const std::string& table_id) { |
1835 | 237 | metadata_->RemoveTable(table_id); |
1836 | 237 | RETURN_NOT_OK(metadata_->Flush()); |
1837 | 237 | return Status::OK(); |
1838 | 237 | } |
1839 | | |
1840 | 11.1k | Status Tablet::MarkBackfillDone(const TableId& table_id) { |
1841 | 11.1k | auto table_info = table_id.empty() ? |
1842 | 11.1k | metadata_->primary_table_info()0 : VERIFY_RESULT(metadata_->GetTableInfo(table_id)); |
1843 | 11.1k | LOG_WITH_PREFIX(INFO) << "Setting backfill as done. Current schema " |
1844 | 11.1k | << table_info->schema->ToString(); |
1845 | 11.1k | const vector<DeletedColumn> empty_deleted_cols; |
1846 | 11.1k | Schema new_schema = *table_info->schema; |
1847 | 11.1k | new_schema.SetRetainDeleteMarkers(false); |
1848 | 11.1k | metadata_->SetSchema( |
1849 | 11.1k | new_schema, *table_info->index_map, empty_deleted_cols, table_info->schema_version, table_id); |
1850 | 11.1k | return metadata_->Flush(); |
1851 | 11.1k | } |
1852 | | |
1853 | 68.1k | Status Tablet::AlterSchema(ChangeMetadataOperation *operation) { |
1854 | 68.1k | auto current_table_info = VERIFY_RESULT(metadata_->GetTableInfo( |
1855 | 68.1k | operation->request()->has_alter_table_id() ? |
1856 | 68.1k | operation->request()->alter_table_id() : "")); |
1857 | 0 | auto key_schema = current_table_info->schema->CreateKeyProjection(); |
1858 | | |
1859 | 68.1k | RSTATUS_DCHECK_NE(operation->schema(), static_cast<void*>(nullptr), InvalidArgument, |
1860 | 68.1k | "Schema could not be null"); |
1861 | 68.1k | RSTATUS_DCHECK(key_schema.KeyEquals(*DCHECK_NOTNULL(operation->schema())), InvalidArgument, |
1862 | 68.1k | "Schema keys cannot be altered"); |
1863 | | |
1864 | | // Abortable read/write operations could be long and they shouldn't access metadata_ without |
1865 | | // locks, so no need to wait for them here. |
1866 | 68.1k | auto op_pause = PauseReadWriteOperations(Abortable::kFalse); |
1867 | 68.1k | RETURN_NOT_OK(op_pause); |
1868 | | |
1869 | | // If the current version >= new version, there is nothing to do. |
1870 | 68.1k | if (current_table_info->schema_version >= operation->schema_version()) { |
1871 | 1.49k | LOG_WITH_PREFIX(INFO) |
1872 | 1.49k | << "Already running schema version " << current_table_info->schema_version |
1873 | 1.49k | << " got alter request for version " << operation->schema_version(); |
1874 | 1.49k | return Status::OK(); |
1875 | 1.49k | } |
1876 | | |
1877 | 66.6k | LOG_WITH_PREFIX(INFO) << "Alter schema from " << current_table_info->schema->ToString() |
1878 | 66.6k | << " version " << current_table_info->schema_version |
1879 | 66.6k | << " to " << operation->schema()->ToString() |
1880 | 66.6k | << " version " << operation->schema_version(); |
1881 | | |
1882 | | // Find out which columns have been deleted in this schema change, and add them to metadata. |
1883 | 66.6k | vector<DeletedColumn> deleted_cols; |
1884 | 315k | for (const auto& col : current_table_info->schema->column_ids()) { |
1885 | 315k | if (operation->schema()->find_column_by_id(col) == Schema::kColumnNotFound) { |
1886 | 2.15k | deleted_cols.emplace_back(col, clock_->Now()); |
1887 | 2.15k | LOG_WITH_PREFIX(INFO) << "Column " << col << " recorded as deleted."; |
1888 | 2.15k | } |
1889 | 315k | } |
1890 | | |
1891 | 66.6k | metadata_->SetSchema(*operation->schema(), operation->index_map(), deleted_cols, |
1892 | 66.6k | operation->schema_version(), current_table_info->table_id); |
1893 | 66.7k | if (operation->has_new_table_name()66.6k ) { |
1894 | 66.7k | metadata_->SetTableName(current_table_info->namespace_name, operation->new_table_name()); |
1895 | 66.7k | if (table_metrics_entity_) { |
1896 | 66.7k | table_metrics_entity_->SetAttribute("table_name", operation->new_table_name()); |
1897 | 66.7k | table_metrics_entity_->SetAttribute("namespace_name", current_table_info->namespace_name); |
1898 | 66.7k | } |
1899 | 66.8k | if (tablet_metrics_entity_66.7k ) { |
1900 | 66.8k | tablet_metrics_entity_->SetAttribute("table_name", operation->new_table_name()); |
1901 | 66.8k | tablet_metrics_entity_->SetAttribute("namespace_name", current_table_info->namespace_name); |
1902 | 66.8k | } |
1903 | 66.7k | } |
1904 | | |
1905 | | // Clear old index table metadata cache. |
1906 | 66.6k | ResetYBMetaDataCache(); |
1907 | | |
1908 | | // Create transaction manager and index table metadata cache for secondary index update. |
1909 | 66.6k | if (!operation->index_map().empty()) { |
1910 | 53.2k | if (current_table_info->schema->table_properties().is_transactional() && |
1911 | 53.2k | !transaction_manager_43.2k ) { |
1912 | 8.73k | transaction_manager_ = std::make_unique<client::TransactionManager>( |
1913 | 8.73k | client_future_.get(), scoped_refptr<server::Clock>(clock_), local_tablet_filter_); |
1914 | 8.73k | } |
1915 | 53.2k | CreateNewYBMetaDataCache(); |
1916 | 53.2k | } |
1917 | | |
1918 | | // Flush the updated schema metadata to disk. |
1919 | 66.6k | return metadata_->Flush(); |
1920 | 68.1k | } |
1921 | | |
1922 | 5.22k | Status Tablet::AlterWalRetentionSecs(ChangeMetadataOperation* operation) { |
1923 | 5.22k | if (operation->has_wal_retention_secs()) { |
1924 | 5.22k | LOG_WITH_PREFIX(INFO) << "Altering metadata wal_retention_secs from " |
1925 | 5.22k | << metadata_->wal_retention_secs() |
1926 | 5.22k | << " to " << operation->wal_retention_secs(); |
1927 | 5.22k | metadata_->set_wal_retention_secs(operation->wal_retention_secs()); |
1928 | | // Flush the updated schema metadata to disk. |
1929 | 5.22k | return metadata_->Flush(); |
1930 | 5.22k | } |
1931 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Invalid ChangeMetadataOperation: $0", |
1932 | 5.22k | operation->ToString()); |
1933 | 5.22k | } |
1934 | | |
1935 | | namespace { |
1936 | | |
1937 | | Result<pgwrapper::PGConnPtr> ConnectToPostgres( |
1938 | | const HostPort& pgsql_proxy_bind_address, |
1939 | | const std::string& database_name, |
1940 | 1.92k | const uint64_t postgres_auth_key) { |
1941 | | // Construct connection string. Note that the plain password in the connection string will be |
1942 | | // sent over the wire, but since it only goes over a unix-domain socket, there should be no |
1943 | | // eavesdropping/tampering issues. |
1944 | 1.92k | std::string conn_str = Format( |
1945 | 1.92k | "user=$0 password=$1 host=$2 port=$3 dbname=$4", |
1946 | 1.92k | "postgres", |
1947 | 1.92k | postgres_auth_key, |
1948 | 1.92k | PgDeriveSocketDir(pgsql_proxy_bind_address.host()), |
1949 | 1.92k | pgsql_proxy_bind_address.port(), |
1950 | 1.92k | pgwrapper::PqEscapeLiteral(database_name)); |
1951 | 1.92k | std::string conn_str_for_log = Format( |
1952 | 1.92k | "user=$0 password=$1 host=$2 port=$3 dbname=$4", |
1953 | 1.92k | "postgres", |
1954 | 1.92k | "<REDACTED>", |
1955 | 1.92k | PgDeriveSocketDir(pgsql_proxy_bind_address.host()), |
1956 | 1.92k | pgsql_proxy_bind_address.port(), |
1957 | 1.92k | pgwrapper::PqEscapeLiteral(database_name)); |
1958 | 1.92k | VLOG(1) << __func__ << ": libpq connection string: " << conn_str_for_log0 ; |
1959 | | |
1960 | | // Connect. |
1961 | 1.92k | pgwrapper::PGConnPtr conn(PQconnectdb(conn_str.c_str())); |
1962 | 1.92k | if (!conn) { |
1963 | 0 | return STATUS(IllegalState, "backfill failed to connect to DB"); |
1964 | 0 | } |
1965 | 1.92k | if (PQstatus(conn.get()) == CONNECTION_BAD) { |
1966 | 0 | std::string msg(PQerrorMessage(conn.get())); |
1967 | | |
1968 | | // Avoid double newline (postgres adds a newline after the error message). |
1969 | 0 | if (msg.back() == '\n') { |
1970 | 0 | msg.resize(msg.size() - 1); |
1971 | 0 | } |
1972 | 0 | LOG(WARNING) << "libpq connection \"" << conn_str_for_log << "\" failed: " << msg; |
1973 | 0 | return STATUS_FORMAT(IllegalState, "backfill connection to DB failed: $0", msg); |
1974 | 0 | } |
1975 | 1.92k | return conn; |
1976 | 1.92k | } |
1977 | | |
1978 | 2.27k | string GenerateSerializedBackfillSpec(size_t batch_size, const string& next_row_to_backfill) { |
1979 | 2.27k | PgsqlBackfillSpecPB backfill_spec; |
1980 | 2.27k | std::string serialized_backfill_spec; |
1981 | | // Note that although we set the desired batch_size as the limit, postgres |
1982 | | // has its own internal paging size of 1024 (controlled by --ysql_prefetch_limit). So the actual |
1983 | | // rows processed could be larger than the limit set here; unless it happens |
1984 | | // to be a multiple of FLAGS_ysql_prefetch_limit |
1985 | 2.27k | backfill_spec.set_limit(batch_size); |
1986 | 2.27k | backfill_spec.set_next_row_key(next_row_to_backfill); |
1987 | 2.27k | backfill_spec.SerializeToString(&serialized_backfill_spec); |
1988 | 2.27k | VLOG(2) << "Generating backfill_spec " << yb::ToString(backfill_spec) << " encoded as " |
1989 | 0 | << b2a_hex(serialized_backfill_spec) << " a string of length " |
1990 | 0 | << serialized_backfill_spec.length(); |
1991 | 2.27k | return serialized_backfill_spec; |
1992 | 2.27k | } |
1993 | | |
1994 | | Result<PgsqlBackfillSpecPB> QueryPostgresToDoBackfill( |
1995 | 2.27k | const pgwrapper::PGConnPtr& conn, const string& query_str) { |
1996 | | // Execute. |
1997 | 2.27k | pgwrapper::PGResultPtr res(PQexec(conn.get(), query_str.c_str())); |
1998 | 2.27k | if (!res) { |
1999 | 0 | std::string msg(PQerrorMessage(conn.get())); |
2000 | | |
2001 | | // Avoid double newline (postgres adds a newline after the error message). |
2002 | 0 | if (msg.back() == '\n') { |
2003 | 0 | msg.resize(msg.size() - 1); |
2004 | 0 | } |
2005 | 0 | LOG(WARNING) << "libpq query \"" << query_str << "\" was not sent: " << msg; |
2006 | 0 | return STATUS_FORMAT(IllegalState, "backfill query couldn't be sent: $0", msg); |
2007 | 0 | } |
2008 | | |
2009 | 2.27k | ExecStatusType status = PQresultStatus(res.get()); |
2010 | | // TODO(jason): more properly handle bad statuses |
2011 | 2.27k | if (status != PGRES_TUPLES_OK) { |
2012 | 24 | std::string msg(PQresultErrorMessage(res.get())); |
2013 | | |
2014 | | // Avoid double newline (postgres adds a newline after the error message). |
2015 | 24 | if (msg.back() == '\n') { |
2016 | 24 | msg.resize(msg.size() - 1); |
2017 | 24 | } |
2018 | 24 | LOG(WARNING) << "libpq query \"" << query_str << "\" returned " << PQresStatus(status) << ": " |
2019 | 24 | << msg; |
2020 | 24 | return STATUS(IllegalState, msg); |
2021 | 24 | } |
2022 | | |
2023 | 2.24k | CHECK_EQ(PQntuples(res.get()), 1); |
2024 | 2.24k | CHECK_EQ(PQnfields(res.get()), 1); |
2025 | 2.24k | const std::string returned_spec = CHECK_RESULT(pgwrapper::GetString(res.get(), 0, 0)); |
2026 | 2.24k | VLOG(3) << "Got back " << returned_spec << " of length " << returned_spec.length()1 ; |
2027 | | |
2028 | 2.24k | PgsqlBackfillSpecPB spec; |
2029 | 2.24k | spec.ParseFromString(a2b_hex(returned_spec)); |
2030 | 2.24k | return spec; |
2031 | 2.27k | } |
2032 | | |
2033 | | struct BackfillParams { |
2034 | | explicit BackfillParams(const CoarseTimePoint deadline) |
2035 | | : start_time(CoarseMonoClock::Now()), |
2036 | | deadline(deadline), |
2037 | | rate_per_sec(GetAtomicFlag(&FLAGS_backfill_index_rate_rows_per_sec)), |
2038 | 4.34k | batch_size(GetAtomicFlag(&FLAGS_backfill_index_write_batch_size)) { |
2039 | 4.34k | auto grace_margin_ms = GetAtomicFlag(&FLAGS_backfill_index_timeout_grace_margin_ms); |
2040 | 4.34k | if (grace_margin_ms < 04.34k ) { |
2041 | | // We need: grace_margin_ms >= 1000 * batch_size / rate_per_sec; |
2042 | | // By default, we will set it to twice the minimum value + 1s. |
2043 | 4.34k | grace_margin_ms = (rate_per_sec > 0 ? 1000 * (1 + 2.0 * batch_size / rate_per_sec)42 : 10004.30k ); |
2044 | 4.34k | YB_LOG_EVERY_N_SECS(INFO, 10) |
2045 | 1.44k | << "Using grace margin of " << grace_margin_ms << "ms, original deadline: " |
2046 | 1.44k | << MonoDelta(deadline - start_time); |
2047 | 4.34k | } |
2048 | 4.34k | modified_deadline = deadline - grace_margin_ms * 1ms; |
2049 | 4.34k | } |
2050 | | |
2051 | | CoarseTimePoint start_time; |
2052 | | CoarseTimePoint deadline; |
2053 | | size_t rate_per_sec; |
2054 | | size_t batch_size; |
2055 | | CoarseTimePoint modified_deadline; |
2056 | | }; |
2057 | | |
2058 | | // Slow down before the next batch to throttle the rate of processing. |
2059 | | void MaybeSleepToThrottleBackfill( |
2060 | | const CoarseTimePoint& start_time, |
2061 | 5.06k | size_t number_of_rows_processed) { |
2062 | 5.06k | if (FLAGS_backfill_index_rate_rows_per_sec <= 0) { |
2063 | 4.78k | return; |
2064 | 4.78k | } |
2065 | | |
2066 | 275 | auto now = CoarseMonoClock::Now(); |
2067 | 275 | auto duration_for_rows_processed = MonoDelta(now - start_time); |
2068 | 275 | auto expected_time_for_processing_rows = MonoDelta::FromMilliseconds( |
2069 | 275 | number_of_rows_processed * 1000 / FLAGS_backfill_index_rate_rows_per_sec); |
2070 | 275 | DVLOG(3) << "Duration since last batch " << duration_for_rows_processed << " expected duration " |
2071 | 3 | << expected_time_for_processing_rows << " extra time to sleep: " |
2072 | 3 | << expected_time_for_processing_rows - duration_for_rows_processed; |
2073 | 275 | if (duration_for_rows_processed < expected_time_for_processing_rows) { |
2074 | 266 | SleepFor(expected_time_for_processing_rows - duration_for_rows_processed); |
2075 | 266 | } |
2076 | 275 | } |
2077 | | |
2078 | | bool CanProceedToBackfillMoreRows( |
2079 | | const BackfillParams& backfill_params, |
2080 | 3.01k | size_t number_of_rows_processed) { |
2081 | 3.01k | auto now = CoarseMonoClock::Now(); |
2082 | 3.01k | if (now > backfill_params.modified_deadline || |
2083 | 3.01k | (3.00k FLAGS_TEST_backfill_paging_size > 03.00k && |
2084 | 3.00k | number_of_rows_processed >= FLAGS_TEST_backfill_paging_size320 )) { |
2085 | | // We are done if we are out of time. |
2086 | | // Or, if for testing purposes we have a bound on the size of batches to process. |
2087 | 132 | return false; |
2088 | 132 | } |
2089 | 2.88k | return true; |
2090 | 3.01k | } |
2091 | | |
2092 | | bool CanProceedToBackfillMoreRows( |
2093 | | const BackfillParams& backfill_params, |
2094 | | const string& backfilled_until, |
2095 | 2.24k | size_t number_of_rows_processed) { |
2096 | 2.24k | if (backfilled_until.empty()) { |
2097 | | // The backfill is done for this tablet. No need to do another batch. |
2098 | 1.85k | return false; |
2099 | 1.85k | } |
2100 | | |
2101 | 390 | return CanProceedToBackfillMoreRows(backfill_params, number_of_rows_processed); |
2102 | 2.24k | } |
2103 | | |
2104 | | } // namespace |
2105 | | |
2106 | | // Assume that we are already in the Backfilling mode. |
2107 | | Status Tablet::BackfillIndexesForYsql( |
2108 | | const std::vector<IndexInfo>& indexes, |
2109 | | const std::string& backfill_from, |
2110 | | const CoarseTimePoint deadline, |
2111 | | const HybridTime read_time, |
2112 | | const HostPort& pgsql_proxy_bind_address, |
2113 | | const std::string& database_name, |
2114 | | const uint64_t postgres_auth_key, |
2115 | | size_t* number_of_rows_processed, |
2116 | 1.95k | std::string* backfilled_until) { |
2117 | 1.95k | if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) { |
2118 | 136 | TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms); |
2119 | 136 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms)); |
2120 | 136 | } |
2121 | 1.95k | LOG(INFO) << "Begin " << __func__ << " at " << read_time << " from " |
2122 | 1.95k | << (backfill_from.empty() ? "<start-of-the-tablet>"1.89k : strings::b2a_hex(backfill_from)67 ) |
2123 | 1.95k | << " for " << AsString(indexes); |
2124 | 1.95k | *backfilled_until = backfill_from; |
2125 | 1.95k | pgwrapper::PGConnPtr conn = |
2126 | 1.95k | VERIFY_RESULT(ConnectToPostgres(pgsql_proxy_bind_address, database_name, postgres_auth_key)); |
2127 | | |
2128 | | // Construct query string. |
2129 | 0 | std::string index_oids; |
2130 | 1.95k | { |
2131 | 1.95k | std::stringstream ss; |
2132 | 1.95k | for (auto& index : indexes) { |
2133 | | // Cannot use Oid type because for large OID such as 2147500041, it overflows Postgres |
2134 | | // lexer <ival> type. Use int to output as -2147467255 that is accepted by <ival>. |
2135 | 1.91k | int index_oid = VERIFY_RESULT(GetPgsqlTableOid(index.table_id())); |
2136 | 0 | ss << index_oid << ","; |
2137 | 1.91k | } |
2138 | 1.95k | index_oids = ss.str(); |
2139 | 1.95k | index_oids.pop_back(); |
2140 | 1.95k | } |
2141 | 0 | std::string partition_key = metadata_->partition()->partition_key_start(); |
2142 | | |
2143 | 1.95k | BackfillParams backfill_params(deadline); |
2144 | 1.95k | *number_of_rows_processed = 0; |
2145 | 2.31k | do { |
2146 | 2.31k | std::string serialized_backfill_spec = |
2147 | 2.31k | GenerateSerializedBackfillSpec(backfill_params.batch_size, *backfilled_until); |
2148 | | |
2149 | | // This should be safe from injection attacks because the parameters only consist of characters |
2150 | | // [-,0-9a-f]. |
2151 | 2.31k | std::string query_str = Format( |
2152 | 2.31k | "BACKFILL INDEX $0 WITH x'$1' READ TIME $2 PARTITION x'$3';", |
2153 | 2.31k | index_oids, |
2154 | 2.31k | b2a_hex(serialized_backfill_spec), |
2155 | 2.31k | read_time.ToUint64(), |
2156 | 2.31k | b2a_hex(partition_key)); |
2157 | 2.31k | VLOG(1) << __func__ << ": libpq query string: " << query_str40 ; |
2158 | | |
2159 | 2.31k | PgsqlBackfillSpecPB spec = VERIFY_RESULT2.28k (QueryPostgresToDoBackfill(conn, query_str));2.28k |
2160 | 0 | *number_of_rows_processed += spec.count(); |
2161 | 2.28k | *backfilled_until = spec.next_row_key(); |
2162 | | |
2163 | 2.28k | VLOG(2) << "Backfilled " << *number_of_rows_processed << " rows. " |
2164 | 41 | << "Setting backfilled_until to " << b2a_hex(*backfilled_until) << " of length " |
2165 | 41 | << backfilled_until->length(); |
2166 | | |
2167 | 2.28k | MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed); |
2168 | 2.28k | } while (CanProceedToBackfillMoreRows( |
2169 | 2.28k | backfill_params, *backfilled_until, *number_of_rows_processed)); |
2170 | | |
2171 | 1.93k | VLOG(1) << "Backfilled " << *number_of_rows_processed << " rows. " |
2172 | 41 | << "Set backfilled_until to " |
2173 | 41 | << (backfilled_until->empty() ? "(empty)"0 : b2a_hex(*backfilled_until)); |
2174 | 1.93k | return Status::OK(); |
2175 | 1.95k | } |
2176 | | |
2177 | | std::vector<yb::ColumnSchema> Tablet::GetColumnSchemasForIndex( |
2178 | 2.43k | const std::vector<IndexInfo>& indexes) { |
2179 | 2.43k | std::unordered_set<yb::ColumnId> col_ids_set; |
2180 | 2.43k | std::vector<yb::ColumnSchema> columns; |
2181 | | |
2182 | 12.6k | for (auto idx : schema()->column_ids()) { |
2183 | 12.6k | if (schema()->is_key_column(idx)) { |
2184 | 6.75k | col_ids_set.insert(idx); |
2185 | 6.75k | auto res = schema()->column_by_id(idx); |
2186 | 6.75k | if (res) { |
2187 | 6.73k | columns.push_back(*res); |
2188 | 6.73k | } else { |
2189 | 18 | LOG(DFATAL) << "Unexpected: cannot find the column in the main table for " |
2190 | 18 | << idx; |
2191 | 18 | } |
2192 | 6.75k | } |
2193 | 12.6k | } |
2194 | 2.46k | for (const IndexInfo& idx : indexes) { |
2195 | 10.0k | for (const auto& idx_col : idx.columns()) { |
2196 | 10.0k | if (col_ids_set.find(idx_col.indexed_column_id) == col_ids_set.end()) { |
2197 | 3.22k | col_ids_set.insert(idx_col.indexed_column_id); |
2198 | 3.22k | auto res = schema()->column_by_id(idx_col.indexed_column_id); |
2199 | 3.23k | if (res3.22k ) { |
2200 | 3.23k | columns.push_back(*res); |
2201 | 18.4E | } else { |
2202 | 18.4E | LOG(DFATAL) << "Unexpected: cannot find the column in the main table for " |
2203 | 18.4E | << idx_col.indexed_column_id; |
2204 | 18.4E | } |
2205 | 3.22k | } |
2206 | 10.0k | } |
2207 | 2.46k | if (idx.where_predicate_spec()) { |
2208 | 1.42k | for (const auto col_in_pred : idx.where_predicate_spec()->column_ids()) { |
2209 | 1.42k | ColumnId col_id_in_pred(col_in_pred); |
2210 | 1.42k | if (col_ids_set.find(col_id_in_pred) == col_ids_set.end()) { |
2211 | 214 | col_ids_set.insert(col_id_in_pred); |
2212 | 214 | auto res = schema()->column_by_id(col_id_in_pred); |
2213 | 216 | if (res214 ) { |
2214 | 216 | columns.push_back(*res); |
2215 | 18.4E | } else { |
2216 | 18.4E | LOG(DFATAL) << "Unexpected: cannot find the column in the main table for " << |
2217 | 18.4E | col_id_in_pred; |
2218 | 18.4E | } |
2219 | 214 | } |
2220 | 1.42k | } |
2221 | 1.13k | } |
2222 | 2.46k | } |
2223 | 2.43k | return columns; |
2224 | 2.43k | } |
2225 | | |
2226 | | namespace { |
2227 | | |
2228 | 2.43k | std::vector<TableId> GetIndexIds(const std::vector<IndexInfo>& indexes) { |
2229 | 2.43k | std::vector<TableId> index_ids; |
2230 | 2.45k | for (const IndexInfo& idx : indexes) { |
2231 | 2.45k | index_ids.push_back(idx.table_id()); |
2232 | 2.45k | } |
2233 | 2.43k | return index_ids; |
2234 | 2.43k | } |
2235 | | |
2236 | | template <typename SomeVector> |
2237 | | void SleepToThrottleRate( |
2238 | 0 | SomeVector* index_requests, int32 row_access_rate_per_sec, CoarseTimePoint* last_flushed_at) { |
2239 | 0 | auto now = CoarseMonoClock::Now(); |
2240 | 0 | if (row_access_rate_per_sec > 0) { |
2241 | 0 | auto duration_since_last_batch = MonoDelta(now - *last_flushed_at); |
2242 | 0 | auto expected_duration_ms = |
2243 | 0 | MonoDelta::FromMilliseconds(index_requests->size() * 1000 / row_access_rate_per_sec); |
2244 | 0 | DVLOG(3) << "Duration since last batch " << duration_since_last_batch << " expected duration " |
2245 | 0 | << expected_duration_ms |
2246 | 0 | << " extra time so sleep: " << expected_duration_ms - duration_since_last_batch; |
2247 | 0 | if (duration_since_last_batch < expected_duration_ms) { |
2248 | 0 | SleepFor(expected_duration_ms - duration_since_last_batch); |
2249 | 0 | } |
2250 | 0 | } |
2251 | 0 | } |
2252 | | |
2253 | | Result<client::YBTablePtr> GetTable( |
2254 | 4.91k | const TableId& table_id, const std::shared_ptr<client::YBMetaDataCache>& metadata_cache) { |
2255 | | // TODO create async version of GetTable. |
2256 | | // It is ok to have sync call here, because we use cache and it should not take too long. |
2257 | 4.91k | client::YBTablePtr index_table; |
2258 | 4.91k | bool cache_used_ignored = false; |
2259 | 4.91k | RETURN_NOT_OK(metadata_cache->GetTable(table_id, &index_table, &cache_used_ignored)); |
2260 | 4.91k | return index_table; |
2261 | 4.91k | } |
2262 | | |
2263 | | } // namespace |
2264 | | |
2265 | | // Should backfill the index with the information contained in this tablet. |
2266 | | // Assume that we are already in the Backfilling mode. |
2267 | | Status Tablet::BackfillIndexes( |
2268 | | const std::vector<IndexInfo>& indexes, |
2269 | | const std::string& backfill_from, |
2270 | | const CoarseTimePoint deadline, |
2271 | | const HybridTime read_time, |
2272 | | size_t* number_of_rows_processed, |
2273 | | std::string* backfilled_until, |
2274 | 2.43k | std::unordered_set<TableId>* failed_indexes) { |
2275 | 2.43k | TRACE(__func__); |
2276 | 2.43k | if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) { |
2277 | 155 | TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms); |
2278 | 155 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms)); |
2279 | 155 | } |
2280 | 2.43k | VLOG(2) << "Begin BackfillIndexes at " << read_time << " for " << AsString(indexes)2 ; |
2281 | | |
2282 | 2.43k | std::vector<TableId> index_ids = GetIndexIds(indexes); |
2283 | 2.43k | std::vector<yb::ColumnSchema> columns = GetColumnSchemasForIndex(indexes); |
2284 | | |
2285 | 2.43k | Schema projection(columns, {}, schema()->num_key_columns()); |
2286 | 2.43k | auto iter = VERIFY_RESULT(NewRowIterator( |
2287 | 2.43k | projection, ReadHybridTime::SingleTime(read_time), "" /* table_id */, deadline)); |
2288 | 0 | QLTableRow row; |
2289 | 2.43k | std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>> index_requests; |
2290 | | |
2291 | 2.43k | BackfillParams backfill_params{deadline}; |
2292 | 2.43k | constexpr auto kProgressInterval = 1000; |
2293 | | |
2294 | 2.43k | if (!backfill_from.empty()) { |
2295 | 97 | VLOG(1) << "Resuming backfill from " << b2a_hex(backfill_from)0 ; |
2296 | 97 | *backfilled_until = backfill_from; |
2297 | 97 | RETURN_NOT_OK(iter->SeekTuple(Slice(backfill_from))); |
2298 | 97 | } |
2299 | | |
2300 | 2.43k | string resume_backfill_from; |
2301 | 2.43k | *number_of_rows_processed = 0; |
2302 | 2.43k | int TEST_number_rows_corrupted = 0; |
2303 | 2.43k | int TEST_number_rows_dropped = 0; |
2304 | | |
2305 | 2.62k | while (VERIFY_RESULT(iter->HasNext())) { |
2306 | 2.62k | if (index_requests.empty()) { |
2307 | 390 | *backfilled_until = VERIFY_RESULT(iter->GetTupleId()).ToBuffer(); |
2308 | 0 | MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed); |
2309 | 390 | } |
2310 | | |
2311 | 2.62k | if (!CanProceedToBackfillMoreRows(backfill_params, *number_of_rows_processed)) { |
2312 | 96 | resume_backfill_from = VERIFY_RESULT(iter->GetTupleId()).ToBuffer(); |
2313 | 0 | break; |
2314 | 96 | } |
2315 | | |
2316 | 2.52k | RETURN_NOT_OK(iter->NextRow(&row)); |
2317 | 2.52k | if (FLAGS_TEST_backfill_sabotage_frequency > 0 && |
2318 | 2.52k | *number_of_rows_processed % FLAGS_TEST_backfill_sabotage_frequency == 00 ) { |
2319 | 0 | VLOG(1) << "Corrupting fetched row: " << row.ToString(); |
2320 | | // Corrupt first key column, since index should not be built on primary key |
2321 | 0 | row.MarkTombstoned(schema()->column_id(0)); |
2322 | 0 | TEST_number_rows_corrupted++; |
2323 | 0 | } |
2324 | | |
2325 | 2.52k | if (FLAGS_TEST_backfill_drop_frequency > 0 && |
2326 | 2.52k | *number_of_rows_processed % FLAGS_TEST_backfill_drop_frequency == 00 ) { |
2327 | 0 | (*number_of_rows_processed)++; |
2328 | 0 | VLOG(1) << "Dropping fetched row: " << row.ToString(); |
2329 | 0 | TEST_number_rows_dropped++; |
2330 | 0 | continue; |
2331 | 0 | } |
2332 | | |
2333 | 18.4E | DVLOG(2) << "Building index for fetched row: " << row.ToString(); |
2334 | 2.52k | RETURN_NOT_OK(UpdateIndexInBatches( |
2335 | 2.52k | row, indexes, read_time, backfill_params.deadline, &index_requests, |
2336 | 2.52k | failed_indexes)); |
2337 | | |
2338 | 2.52k | if (++(*number_of_rows_processed) % kProgressInterval == 0) { |
2339 | 0 | VLOG(1) << "Processed " << *number_of_rows_processed << " rows"; |
2340 | 0 | } |
2341 | 2.52k | } |
2342 | | |
2343 | 2.43k | if (FLAGS_TEST_backfill_sabotage_frequency > 0) { |
2344 | 0 | LOG(INFO) << "In total, " << TEST_number_rows_corrupted |
2345 | 0 | << " rows were corrupted in index backfill."; |
2346 | 0 | } |
2347 | | |
2348 | 2.43k | if (FLAGS_TEST_backfill_drop_frequency > 0) { |
2349 | 0 | LOG(INFO) << "In total, " << TEST_number_rows_dropped |
2350 | 0 | << " rows were dropped in index backfill."; |
2351 | 0 | } |
2352 | | |
2353 | 18.4E | VLOG(1) << "Processed " << *number_of_rows_processed << " rows"; |
2354 | 2.43k | RETURN_NOT_OK(FlushWriteIndexBatch( |
2355 | 2.43k | read_time, backfill_params.deadline, &index_requests, failed_indexes)); |
2356 | 2.42k | MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed); |
2357 | 2.42k | *backfilled_until = resume_backfill_from; |
2358 | 2.42k | LOG(INFO) << "Done BackfillIndexes at " << read_time << " for " << AsString(index_ids) |
2359 | 2.42k | << " until " |
2360 | 2.42k | << (backfilled_until->empty() ? "<end of the tablet>"2.33k : b2a_hex(*backfilled_until)95 ); |
2361 | 2.42k | return Status::OK(); |
2362 | 2.43k | } |
2363 | | |
2364 | | Status Tablet::UpdateIndexInBatches( |
2365 | | const QLTableRow& row, |
2366 | | const std::vector<IndexInfo>& indexes, |
2367 | | const HybridTime write_time, |
2368 | | const CoarseTimePoint deadline, |
2369 | | std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests, |
2370 | 2.53k | std::unordered_set<TableId>* failed_indexes) { |
2371 | 2.53k | const QLTableRow& kEmptyRow = QLTableRow::empty_row(); |
2372 | 2.53k | QLExprExecutor expr_executor; |
2373 | | |
2374 | 4.96k | for (const IndexInfo& index : indexes) { |
2375 | 4.96k | QLWriteRequestPB* const index_request = VERIFY_RESULT( |
2376 | 4.96k | docdb::CreateAndSetupIndexInsertRequest( |
2377 | 4.96k | &expr_executor, /* index_has_write_permission */ true, |
2378 | 4.96k | kEmptyRow, row, &index, index_requests)); |
2379 | 4.96k | if (index_request) |
2380 | 4.91k | index_request->set_is_backfill(true); |
2381 | 4.96k | } |
2382 | | |
2383 | | // Update the index write op. |
2384 | 2.53k | return FlushWriteIndexBatchIfRequired(write_time, deadline, index_requests, failed_indexes); |
2385 | 2.53k | } |
2386 | | |
2387 | | Result<std::shared_ptr<YBSession>> Tablet::GetSessionForVerifyOrBackfill( |
2388 | 2.50k | const CoarseTimePoint deadline) { |
2389 | 2.50k | if (!client_future_.valid()) { |
2390 | 0 | return STATUS_FORMAT(IllegalState, "Client future is not set up for $0", tablet_id()); |
2391 | 0 | } |
2392 | | |
2393 | 2.50k | auto client = client_future_.get(); |
2394 | 2.50k | auto session = std::make_shared<YBSession>(client); |
2395 | 2.50k | session->SetDeadline(deadline); |
2396 | 2.50k | return session; |
2397 | 2.50k | } |
2398 | | |
2399 | | Status Tablet::FlushWriteIndexBatchIfRequired( |
2400 | | const HybridTime write_time, |
2401 | | const CoarseTimePoint deadline, |
2402 | | std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests, |
2403 | 2.53k | std::unordered_set<TableId>* failed_indexes) { |
2404 | 2.53k | if (index_requests->size() < FLAGS_backfill_index_write_batch_size) { |
2405 | 2.45k | return Status::OK(); |
2406 | 2.45k | } |
2407 | 73 | return FlushWriteIndexBatch(write_time, deadline, index_requests, failed_indexes); |
2408 | 2.53k | } |
2409 | | |
2410 | | Status Tablet::FlushWriteIndexBatch( |
2411 | | const HybridTime write_time, |
2412 | | const CoarseTimePoint deadline, |
2413 | | std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests, |
2414 | 2.50k | std::unordered_set<TableId>* failed_indexes) { |
2415 | 2.50k | if (!client_future_.valid()) { |
2416 | 0 | return STATUS_FORMAT(IllegalState, "Client future is not set up for $0", tablet_id()); |
2417 | 2.50k | } else if (!YBMetaDataCache()) { |
2418 | 0 | return STATUS(IllegalState, "Table metadata cache is not present for index update"); |
2419 | 0 | } |
2420 | 2.50k | std::shared_ptr<YBSession> session = VERIFY_RESULT(GetSessionForVerifyOrBackfill(deadline)); |
2421 | | |
2422 | 0 | std::unordered_set< |
2423 | 2.50k | client::YBqlWriteOpPtr, client::YBqlWritePrimaryKeyComparator, |
2424 | 2.50k | client::YBqlWritePrimaryKeyComparator> |
2425 | 2.50k | ops_by_primary_key; |
2426 | 2.50k | std::vector<shared_ptr<client::YBqlWriteOp>> write_ops; |
2427 | | |
2428 | 2.50k | constexpr int kMaxNumRetries = 10; |
2429 | 2.50k | auto metadata_cache = YBMetaDataCache(); |
2430 | | |
2431 | 4.91k | for (auto& pair : *index_requests) { |
2432 | 4.91k | client::YBTablePtr index_table = |
2433 | 4.91k | VERIFY_RESULT(GetTable(pair.first->table_id(), metadata_cache)); |
2434 | | |
2435 | 0 | shared_ptr<client::YBqlWriteOp> index_op(index_table->NewQLWrite()); |
2436 | 4.91k | index_op->set_write_time_for_backfill(write_time); |
2437 | 4.91k | index_op->mutable_request()->Swap(&pair.second); |
2438 | 4.91k | if (index_table->IsUniqueIndex()) { |
2439 | 547 | if (ops_by_primary_key.count(index_op) > 0) { |
2440 | 4 | VLOG(2) << "Splitting the batch of writes because " << index_op->ToString() |
2441 | 0 | << " collides with an existing update in this batch."; |
2442 | 4 | VLOG(1) << "Flushing " << ops_by_primary_key.size() << " ops to the index"0 ; |
2443 | 4 | RETURN_NOT_OK(FlushWithRetries(session, write_ops, kMaxNumRetries, failed_indexes)); |
2444 | 3 | VLOG(3) << "Done flushing ops to the index"0 ; |
2445 | 3 | ops_by_primary_key.clear(); |
2446 | 3 | } |
2447 | 546 | ops_by_primary_key.insert(index_op); |
2448 | 546 | } |
2449 | 4.91k | session->Apply(index_op); |
2450 | 4.91k | write_ops.push_back(index_op); |
2451 | 4.91k | } |
2452 | | |
2453 | 2.50k | VLOG(1) << Format("Flushing $0 ops to the index", |
2454 | 4 | (!ops_by_primary_key.empty() ? ops_by_primary_key.size()0 |
2455 | 4 | : write_ops.size())); |
2456 | 2.50k | RETURN_NOT_OK(FlushWithRetries(session, write_ops, kMaxNumRetries, failed_indexes)); |
2457 | 2.50k | index_requests->clear(); |
2458 | | |
2459 | 2.50k | return Status::OK(); |
2460 | 2.50k | } |
2461 | | |
2462 | | template <typename SomeYBqlOp> |
2463 | | Status Tablet::FlushWithRetries( |
2464 | | shared_ptr<YBSession> session, |
2465 | | const std::vector<shared_ptr<SomeYBqlOp>>& index_ops, |
2466 | | int num_retries, |
2467 | 2.50k | std::unordered_set<TableId>* failed_indexes) { |
2468 | 2.50k | auto retries_left = num_retries; |
2469 | 2.50k | std::vector<std::shared_ptr<SomeYBqlOp>> pending_ops = index_ops; |
2470 | 2.50k | std::unordered_map<string, int32_t> error_msg_cnts; |
2471 | 2.50k | do { |
2472 | 2.50k | std::vector<std::shared_ptr<SomeYBqlOp>> failed_ops; |
2473 | 2.50k | RETURN_NOT_OK_PREPEND(session->Flush(), "Flush failed."); |
2474 | 18.4E | VLOG(3) << "Done flushing ops to the index"; |
2475 | 4.92k | for (auto index_op : pending_ops) { |
2476 | 4.92k | if (index_op->response().status() == QLResponsePB::YQL_STATUS_OK) { |
2477 | 4.91k | continue; |
2478 | 4.91k | } |
2479 | | |
2480 | 5 | VLOG(2) << "Got response " << AsString(index_op->response()) << " for " |
2481 | 0 | << AsString(index_op->request()); |
2482 | 5 | if (index_op->response().status() != QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) { |
2483 | 5 | failed_indexes->insert(index_op->table()->id()); |
2484 | 5 | const string& error_message = index_op->response().error_message(); |
2485 | 5 | error_msg_cnts[error_message]++; |
2486 | 5 | VLOG_WITH_PREFIX0 (3) << "Failing index " << index_op->table()->id() |
2487 | 0 | << " due to non-retryable errors " << error_message; |
2488 | 5 | continue; |
2489 | 5 | } |
2490 | | |
2491 | 0 | failed_ops.push_back(index_op); |
2492 | 0 | session->Apply(index_op); |
2493 | 0 | } |
2494 | | |
2495 | 2.50k | if (!failed_ops.empty()) { |
2496 | 0 | VLOG(1) << Format("Flushing $0 failed ops again to the index", failed_ops.size()); |
2497 | 0 | } |
2498 | 2.50k | pending_ops = std::move(failed_ops); |
2499 | 2.50k | } while (!pending_ops.empty() && --retries_left > 00 ); |
2500 | | |
2501 | 2.50k | if (!failed_indexes->empty()) { |
2502 | 5 | VLOG_WITH_PREFIX0 (1) << "Failed due to non-retryable errors " << AsString(*failed_indexes)0 ; |
2503 | 5 | } |
2504 | 2.50k | if (!pending_ops.empty()) { |
2505 | 0 | for (auto index_op : pending_ops) { |
2506 | 0 | failed_indexes->insert(index_op->table()->id()); |
2507 | 0 | const string& error_message = index_op->response().error_message(); |
2508 | 0 | error_msg_cnts[error_message]++; |
2509 | 0 | } |
2510 | 0 | VLOG_WITH_PREFIX(1) << "Failed indexes including retryable and non-retryable errors are " |
2511 | 0 | << AsString(*failed_indexes); |
2512 | 0 | } |
2513 | 2.50k | return ( |
2514 | 2.50k | failed_indexes->empty() |
2515 | 2.50k | ? Status::OK()2.50k |
2516 | 2.50k | : STATUS_SUBSTITUTE2 ( |
2517 | 2.50k | IllegalState, "Index op failed for $0 requests after $1 retries with errors: $2", |
2518 | 2.50k | pending_ops.size(), num_retries, AsString(error_msg_cnts))); |
2519 | 2.50k | } yb::Status yb::tablet::Tablet::FlushWithRetries<yb::client::YBqlWriteOp>(std::__1::shared_ptr<yb::client::YBSession>, std::__1::vector<std::__1::shared_ptr<yb::client::YBqlWriteOp>, std::__1::allocator<std::__1::shared_ptr<yb::client::YBqlWriteOp> > > const&, int, std::__1::unordered_set<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >*) Line | Count | Source | 2467 | 2.50k | std::unordered_set<TableId>* failed_indexes) { | 2468 | 2.50k | auto retries_left = num_retries; | 2469 | 2.50k | std::vector<std::shared_ptr<SomeYBqlOp>> pending_ops = index_ops; | 2470 | 2.50k | std::unordered_map<string, int32_t> error_msg_cnts; | 2471 | 2.50k | do { | 2472 | 2.50k | std::vector<std::shared_ptr<SomeYBqlOp>> failed_ops; | 2473 | 2.50k | RETURN_NOT_OK_PREPEND(session->Flush(), "Flush failed."); | 2474 | 18.4E | VLOG(3) << "Done flushing ops to the index"; | 2475 | 4.92k | for (auto index_op : pending_ops) { | 2476 | 4.92k | if (index_op->response().status() == QLResponsePB::YQL_STATUS_OK) { | 2477 | 4.91k | continue; | 2478 | 4.91k | } | 2479 | | | 2480 | 5 | VLOG(2) << "Got response " << AsString(index_op->response()) << " for " | 2481 | 0 | << AsString(index_op->request()); | 2482 | 5 | if (index_op->response().status() != QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) { | 2483 | 5 | failed_indexes->insert(index_op->table()->id()); | 2484 | 5 | const string& error_message = index_op->response().error_message(); | 2485 | 5 | error_msg_cnts[error_message]++; | 2486 | 5 | VLOG_WITH_PREFIX0 (3) << "Failing index " << index_op->table()->id() | 2487 | 0 | << " due to non-retryable errors " << error_message; | 2488 | 5 | continue; | 2489 | 5 | } | 2490 | | | 2491 | 0 | failed_ops.push_back(index_op); | 2492 | 0 | session->Apply(index_op); | 2493 | 0 | } | 2494 | | | 2495 | 2.50k | if (!failed_ops.empty()) { | 2496 | 0 | VLOG(1) << Format("Flushing $0 failed ops again to the index", failed_ops.size()); | 2497 | 0 | } | 2498 | 2.50k | pending_ops = std::move(failed_ops); | 2499 | 2.50k | } while (!pending_ops.empty() && --retries_left > 00 ); | 2500 | | | 2501 | 2.50k | if (!failed_indexes->empty()) { | 2502 | 5 | VLOG_WITH_PREFIX0 (1) << "Failed due to non-retryable errors " << AsString(*failed_indexes)0 ; | 2503 | 5 | } | 2504 | 2.50k | if (!pending_ops.empty()) { | 2505 | 0 | for (auto index_op : pending_ops) { | 2506 | 0 | failed_indexes->insert(index_op->table()->id()); | 2507 | 0 | const string& error_message = index_op->response().error_message(); | 2508 | 0 | error_msg_cnts[error_message]++; | 2509 | 0 | } | 2510 | 0 | VLOG_WITH_PREFIX(1) << "Failed indexes including retryable and non-retryable errors are " | 2511 | 0 | << AsString(*failed_indexes); | 2512 | 0 | } | 2513 | 2.50k | return ( | 2514 | 2.50k | failed_indexes->empty() | 2515 | 2.50k | ? Status::OK()2.50k | 2516 | 2.50k | : STATUS_SUBSTITUTE2 ( | 2517 | 2.50k | IllegalState, "Index op failed for $0 requests after $1 retries with errors: $2", | 2518 | 2.50k | pending_ops.size(), num_retries, AsString(error_msg_cnts))); | 2519 | 2.50k | } |
Unexecuted instantiation: yb::Status yb::tablet::Tablet::FlushWithRetries<yb::client::YBqlReadOp>(std::__1::shared_ptr<yb::client::YBSession>, std::__1::vector<std::__1::shared_ptr<yb::client::YBqlReadOp>, std::__1::allocator<std::__1::shared_ptr<yb::client::YBqlReadOp> > > const&, int, std::__1::unordered_set<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >*) |
2520 | | |
2521 | | Status Tablet::VerifyIndexTableConsistencyForCQL( |
2522 | | const std::vector<IndexInfo>& indexes, |
2523 | | const std::string& start_key, |
2524 | | const int num_rows, |
2525 | | const CoarseTimePoint deadline, |
2526 | | const HybridTime read_time, |
2527 | | std::unordered_map<TableId, uint64>* consistency_stats, |
2528 | 0 | std::string* verified_until) { |
2529 | 0 | std::vector<TableId> index_ids = GetIndexIds(indexes); |
2530 | 0 | std::vector<yb::ColumnSchema> columns = GetColumnSchemasForIndex(indexes); |
2531 | 0 | return VerifyTableConsistencyForCQL( |
2532 | 0 | index_ids, columns, start_key, num_rows, deadline, read_time, false, consistency_stats, |
2533 | 0 | verified_until); |
2534 | 0 | } |
2535 | | |
2536 | | Status Tablet::VerifyMainTableConsistencyForCQL( |
2537 | | const TableId& main_table_id, |
2538 | | const std::string& start_key, |
2539 | | const int num_rows, |
2540 | | const CoarseTimePoint deadline, |
2541 | | const HybridTime read_time, |
2542 | | std::unordered_map<TableId, uint64>* consistency_stats, |
2543 | 0 | std::string* verified_until) { |
2544 | 0 | const std::vector<yb::ColumnSchema>& columns = schema()->columns(); |
2545 | 0 | const std::vector<TableId>& table_ids = {main_table_id}; |
2546 | 0 | return VerifyTableConsistencyForCQL( |
2547 | 0 | table_ids, columns, start_key, num_rows, deadline, read_time, true, consistency_stats, |
2548 | 0 | verified_until); |
2549 | 0 | } |
2550 | | |
2551 | | Status Tablet::VerifyTableConsistencyForCQL( |
2552 | | const std::vector<TableId>& table_ids, |
2553 | | const std::vector<yb::ColumnSchema>& columns, |
2554 | | const std::string& start_key, |
2555 | | const int num_rows, |
2556 | | const CoarseTimePoint deadline, |
2557 | | const HybridTime read_time, |
2558 | | const bool is_main_table, |
2559 | | std::unordered_map<TableId, uint64>* consistency_stats, |
2560 | 0 | std::string* verified_until) { |
2561 | 0 | Schema projection(columns, {}, schema()->num_key_columns()); |
2562 | 0 | auto iter = VERIFY_RESULT(NewRowIterator( |
2563 | 0 | projection, ReadHybridTime::SingleTime(read_time), "" /* table_id */, deadline)); |
2564 | | |
2565 | 0 | if (!start_key.empty()) { |
2566 | 0 | VLOG(2) << "Starting verify index from " << b2a_hex(start_key); |
2567 | 0 | RETURN_NOT_OK(iter->SeekTuple(Slice(start_key))); |
2568 | 0 | } |
2569 | | |
2570 | 0 | constexpr int kProgressInterval = 1000; |
2571 | 0 | CoarseTimePoint last_flushed_at; |
2572 | |
|
2573 | 0 | QLTableRow row; |
2574 | 0 | std::vector<std::pair<const TableId, QLReadRequestPB>> requests; |
2575 | 0 | std::unordered_set<TableId> failed_indexes; |
2576 | 0 | std::string resume_verified_from; |
2577 | |
|
2578 | 0 | int rows_verified = 0; |
2579 | 0 | while (VERIFY_RESULT(iter->HasNext()) && rows_verified < num_rows && |
2580 | 0 | CoarseMonoClock::Now() < deadline) { |
2581 | 0 | resume_verified_from = VERIFY_RESULT(iter->GetTupleId()).ToBuffer(); |
2582 | 0 | RETURN_NOT_OK(iter->NextRow(&row)); |
2583 | 0 | VLOG(1) << "Verifying index for main table row: " << row.ToString(); |
2584 | |
|
2585 | 0 | RETURN_NOT_OK(VerifyTableInBatches( |
2586 | 0 | row, table_ids, read_time, deadline, is_main_table, &requests, &last_flushed_at, |
2587 | 0 | &failed_indexes, consistency_stats)); |
2588 | 0 | if (++rows_verified % kProgressInterval == 0) { |
2589 | 0 | VLOG(1) << "Verified " << rows_verified << " rows"; |
2590 | 0 | } |
2591 | 0 | *verified_until = resume_verified_from; |
2592 | 0 | } |
2593 | 0 | return FlushVerifyBatch( |
2594 | 0 | read_time, deadline, &requests, &last_flushed_at, &failed_indexes, consistency_stats); |
2595 | 0 | } |
2596 | | |
2597 | | namespace { |
2598 | | |
2599 | 0 | QLConditionPB* InitWhereOp(QLReadRequestPB* req) { |
2600 | | // Add the hash column values |
2601 | 0 | DCHECK(req->hashed_column_values().empty()); |
2602 | | |
2603 | | // Add the range column values to the where clause |
2604 | 0 | QLConditionPB* where_pb = req->mutable_where_expr()->mutable_condition(); |
2605 | 0 | if (!where_pb->has_op()) { |
2606 | 0 | where_pb->set_op(QL_OP_AND); |
2607 | 0 | } |
2608 | 0 | DCHECK_EQ(where_pb->op(), QL_OP_AND); |
2609 | 0 | return where_pb; |
2610 | 0 | } |
2611 | | |
2612 | 0 | void SetSelectedExprToTrue(QLReadRequestPB* req) { |
2613 | | // Set TRUE as selected exprs helps reduce |
2614 | | // the need for row retrieval in the index read request |
2615 | 0 | req->add_selected_exprs()->mutable_value()->set_bool_value(true); |
2616 | 0 | QLRSRowDescPB* rsrow_desc = req->mutable_rsrow_desc(); |
2617 | 0 | QLRSColDescPB* rscol_desc = rsrow_desc->add_rscol_descs(); |
2618 | 0 | rscol_desc->set_name("1"); |
2619 | 0 | rscol_desc->mutable_ql_type()->set_main(yb::DataType::BOOL); |
2620 | 0 | } |
2621 | | |
2622 | | Status WhereMainTableToPB( |
2623 | | const QLTableRow& key, |
2624 | | const IndexInfo& index_info, |
2625 | | const Schema& main_table_schema, |
2626 | 0 | QLReadRequestPB* req) { |
2627 | 0 | std::unordered_map<ColumnId, ColumnId> column_id_map; |
2628 | 0 | for (const auto& col : index_info.columns()) { |
2629 | 0 | column_id_map.insert({col.indexed_column_id, col.column_id}); |
2630 | 0 | } |
2631 | |
|
2632 | 0 | auto column_refs = req->mutable_column_refs(); |
2633 | 0 | QLConditionPB* where_pb = InitWhereOp(req); |
2634 | |
|
2635 | 0 | for (const auto& col_id : main_table_schema.column_ids()) { |
2636 | 0 | if (main_table_schema.is_hash_key_column(col_id)) { |
2637 | 0 | *req->add_hashed_column_values()->mutable_value() = *key.GetValue(column_id_map[col_id]); |
2638 | 0 | column_refs->add_ids(col_id); |
2639 | 0 | } else { |
2640 | 0 | auto it = column_id_map.find(col_id); |
2641 | 0 | if (it != column_id_map.end()) { |
2642 | 0 | QLConditionPB* col_cond_pb = where_pb->add_operands()->mutable_condition(); |
2643 | 0 | col_cond_pb->set_op(QL_OP_EQUAL); |
2644 | 0 | col_cond_pb->add_operands()->set_column_id(col_id); |
2645 | 0 | *col_cond_pb->add_operands()->mutable_value() = *key.GetValue(it->second); |
2646 | 0 | column_refs->add_ids(col_id); |
2647 | 0 | } |
2648 | 0 | } |
2649 | 0 | } |
2650 | |
|
2651 | 0 | SetSelectedExprToTrue(req); |
2652 | 0 | return Status::OK(); |
2653 | 0 | } |
2654 | | |
2655 | | // Schema is index schema while key is row from main table |
2656 | | Status WhereIndexToPB( |
2657 | | const QLTableRow& key, |
2658 | | const IndexInfo& index_info, |
2659 | | const Schema& schema, |
2660 | 0 | QLReadRequestPB* req) { |
2661 | 0 | QLConditionPB* where_pb = InitWhereOp(req); |
2662 | 0 | auto column_refs = req->mutable_column_refs(); |
2663 | |
|
2664 | 0 | for (size_t idx = 0; idx < index_info.columns().size(); idx++) { |
2665 | 0 | const ColumnId& column_id = index_info.column(idx).column_id; |
2666 | 0 | const ColumnId& indexed_column_id = index_info.column(idx).indexed_column_id; |
2667 | 0 | if (schema.is_hash_key_column(column_id)) { |
2668 | 0 | *req->add_hashed_column_values()->mutable_value() = *key.GetValue(indexed_column_id); |
2669 | 0 | } else { |
2670 | 0 | QLConditionPB* col_cond_pb = where_pb->add_operands()->mutable_condition(); |
2671 | 0 | col_cond_pb->set_op(QL_OP_EQUAL); |
2672 | 0 | col_cond_pb->add_operands()->set_column_id(column_id); |
2673 | 0 | *col_cond_pb->add_operands()->mutable_value() = *key.GetValue(indexed_column_id); |
2674 | 0 | } |
2675 | 0 | column_refs->add_ids(column_id); |
2676 | 0 | } |
2677 | |
|
2678 | 0 | SetSelectedExprToTrue(req); |
2679 | 0 | return Status::OK(); |
2680 | 0 | } |
2681 | | |
2682 | | } // namespace |
2683 | | |
2684 | | Status Tablet::VerifyTableInBatches( |
2685 | | const QLTableRow& row, |
2686 | | const std::vector<TableId>& table_ids, |
2687 | | const HybridTime read_time, |
2688 | | const CoarseTimePoint deadline, |
2689 | | const bool is_main_table, |
2690 | | std::vector<std::pair<const TableId, QLReadRequestPB>>* requests, |
2691 | | CoarseTimePoint* last_flushed_at, |
2692 | | std::unordered_set<TableId>* failed_indexes, |
2693 | 0 | std::unordered_map<TableId, uint64>* consistency_stats) { |
2694 | 0 | auto client = client_future_.get(); |
2695 | 0 | auto local_index_info = metadata_->primary_table_info()->index_info.get(); |
2696 | 0 | for (const TableId& table_id : table_ids) { |
2697 | 0 | std::shared_ptr<client::YBTable> table; |
2698 | 0 | RETURN_NOT_OK(client->OpenTable(table_id, &table)); |
2699 | 0 | std::shared_ptr<client::YBqlReadOp> read_op(table->NewQLSelect()); |
2700 | |
|
2701 | 0 | QLReadRequestPB* req = read_op->mutable_request(); |
2702 | 0 | if (is_main_table) { |
2703 | 0 | RETURN_NOT_OK(WhereMainTableToPB(row, *local_index_info, table->InternalSchema(), req)); |
2704 | 0 | } else { |
2705 | 0 | RETURN_NOT_OK(WhereIndexToPB(row, table->index_info(), table->InternalSchema(), req)); |
2706 | 0 | } |
2707 | | |
2708 | 0 | requests->emplace_back(table_id, *req); |
2709 | 0 | } |
2710 | | |
2711 | 0 | return FlushVerifyBatchIfRequired( |
2712 | 0 | read_time, deadline, requests, last_flushed_at, failed_indexes, consistency_stats); |
2713 | 0 | } |
2714 | | |
2715 | | Status Tablet::FlushVerifyBatchIfRequired( |
2716 | | const HybridTime read_time, |
2717 | | const CoarseTimePoint deadline, |
2718 | | std::vector<std::pair<const TableId, QLReadRequestPB>>* requests, |
2719 | | CoarseTimePoint* last_flushed_at, |
2720 | | std::unordered_set<TableId>* failed_indexes, |
2721 | 0 | std::unordered_map<TableId, uint64>* consistency_stats) { |
2722 | 0 | if (requests->size() < FLAGS_verify_index_read_batch_size) { |
2723 | 0 | return Status::OK(); |
2724 | 0 | } |
2725 | 0 | return FlushVerifyBatch( |
2726 | 0 | read_time, deadline, requests, last_flushed_at, failed_indexes, consistency_stats); |
2727 | 0 | } |
2728 | | |
2729 | | Status Tablet::FlushVerifyBatch( |
2730 | | const HybridTime read_time, |
2731 | | const CoarseTimePoint deadline, |
2732 | | std::vector<std::pair<const TableId, QLReadRequestPB>>* requests, |
2733 | | CoarseTimePoint* last_flushed_at, |
2734 | | std::unordered_set<TableId>* failed_indexes, |
2735 | 0 | std::unordered_map<TableId, uint64>* consistency_stats) { |
2736 | 0 | std::vector<client::YBqlReadOpPtr> read_ops; |
2737 | 0 | std::shared_ptr<YBSession> session = VERIFY_RESULT(GetSessionForVerifyOrBackfill(deadline)); |
2738 | | |
2739 | 0 | auto client = client_future_.get(); |
2740 | 0 | for (auto& pair : *requests) { |
2741 | 0 | client::YBTablePtr table; |
2742 | 0 | RETURN_NOT_OK(client->OpenTable(pair.first, &table)); |
2743 | | |
2744 | 0 | client::YBqlReadOpPtr read_op(table->NewQLRead()); |
2745 | 0 | read_op->mutable_request()->Swap(&pair.second); |
2746 | 0 | read_op->SetReadTime(ReadHybridTime::SingleTime(read_time)); |
2747 | |
|
2748 | 0 | session->Apply(read_op); |
2749 | | |
2750 | | // Note: always emplace at tail because row keys must |
2751 | | // correspond sequentially with the read_ops in the vector |
2752 | 0 | read_ops.push_back(read_op); |
2753 | 0 | } |
2754 | | |
2755 | 0 | RETURN_NOT_OK(FlushWithRetries(session, read_ops, 0, failed_indexes)); |
2756 | | |
2757 | 0 | for (size_t idx = 0; idx < requests->size(); idx++) { |
2758 | 0 | const client::YBqlReadOpPtr& read_op = read_ops[idx]; |
2759 | 0 | auto row_block = read_op->MakeRowBlock(); |
2760 | 0 | if (row_block && row_block->row_count() == 1) continue; |
2761 | 0 | (*consistency_stats)[read_op->table()->id()]++; |
2762 | 0 | } |
2763 | |
|
2764 | 0 | SleepToThrottleRate(requests, FLAGS_verify_index_rate_rows_per_sec, last_flushed_at); |
2765 | 0 | *last_flushed_at = CoarseMonoClock::Now(); |
2766 | 0 | requests->clear(); |
2767 | |
|
2768 | 0 | return Status::OK(); |
2769 | 0 | } |
2770 | | |
2771 | | ScopedRWOperationPause Tablet::PauseReadWriteOperations( |
2772 | 566k | const Abortable abortable, const Stop stop) { |
2773 | 566k | VTRACE(1, LogPrefix()); |
2774 | 566k | LOG_SLOW_EXECUTION(WARNING, 1000, |
2775 | 567k | Substitute("$0Waiting for pending ops to complete", LogPrefix())) { |
2776 | 567k | return ScopedRWOperationPause( |
2777 | 567k | abortable ? &pending_abortable_op_counter_249k : &pending_non_abortable_op_counter_317k , |
2778 | 567k | CoarseMonoClock::Now() + |
2779 | 567k | MonoDelta::FromMilliseconds(FLAGS_tablet_rocksdb_ops_quiet_down_timeout_ms), |
2780 | 567k | stop); |
2781 | 567k | } |
2782 | 18.4E | FATAL_ERROR("Unreachable code -- the previous block must always return"); |
2783 | 18.4E | } |
2784 | | |
2785 | 139 | ScopedRWOperation Tablet::CreateAbortableScopedRWOperation(const CoarseTimePoint deadline) const { |
2786 | 139 | return ScopedRWOperation(&pending_abortable_op_counter_, deadline); |
2787 | 139 | } |
2788 | | |
2789 | | ScopedRWOperation Tablet::CreateNonAbortableScopedRWOperation( |
2790 | 159M | const CoarseTimePoint deadline) const { |
2791 | 159M | return ScopedRWOperation(&pending_non_abortable_op_counter_, deadline); |
2792 | 159M | } |
2793 | | |
2794 | | Status Tablet::ModifyFlushedFrontier( |
2795 | | const docdb::ConsensusFrontier& frontier, |
2796 | | rocksdb::FrontierModificationMode mode, |
2797 | 173k | FlushFlags flags) { |
2798 | 173k | const Status s = regular_db_->ModifyFlushedFrontier(frontier.Clone(), mode); |
2799 | 173k | if (PREDICT_FALSE(!s.ok())) { |
2800 | 1 | auto status = STATUS(IllegalState, "Failed to set flushed frontier", s.ToString()); |
2801 | 1 | LOG_WITH_PREFIX(WARNING) << status; |
2802 | 1 | return status; |
2803 | 1 | } |
2804 | 173k | { |
2805 | 173k | auto flushed_frontier = regular_db_->GetFlushedFrontier(); |
2806 | 173k | const auto& consensus_flushed_frontier = *down_cast<docdb::ConsensusFrontier*>( |
2807 | 173k | flushed_frontier.get()); |
2808 | 173k | DCHECK_EQ(frontier.op_id(), consensus_flushed_frontier.op_id()); |
2809 | 173k | DCHECK_EQ(frontier.hybrid_time(), consensus_flushed_frontier.hybrid_time()); |
2810 | 173k | } |
2811 | | |
2812 | 173k | if (FLAGS_TEST_tablet_verify_flushed_frontier_after_modifying && |
2813 | 173k | mode == rocksdb::FrontierModificationMode::kForce0 ) { |
2814 | 0 | LOG(INFO) << "Verifying that flushed frontier was force-set successfully"; |
2815 | 0 | string test_data_dir = VERIFY_RESULT(Env::Default()->GetTestDirectory()); |
2816 | 0 | const string checkpoint_dir_for_test = Format( |
2817 | 0 | "$0/test_checkpoint_$1_$2", test_data_dir, tablet_id(), MonoTime::Now().ToUint64()); |
2818 | 0 | RETURN_NOT_OK( |
2819 | 0 | rocksdb::checkpoint::CreateCheckpoint(regular_db_.get(), checkpoint_dir_for_test)); |
2820 | 0 | auto se = ScopeExit([checkpoint_dir_for_test] { |
2821 | 0 | CHECK_OK(Env::Default()->DeleteRecursively(checkpoint_dir_for_test)); |
2822 | 0 | }); |
2823 | 0 | rocksdb::Options rocksdb_options; |
2824 | 0 | docdb::InitRocksDBOptions( |
2825 | 0 | &rocksdb_options, LogPrefix(), /* statistics */ nullptr, tablet_options_); |
2826 | 0 | rocksdb_options.create_if_missing = false; |
2827 | 0 | LOG_WITH_PREFIX(INFO) << "Opening the test RocksDB at " << checkpoint_dir_for_test |
2828 | 0 | << ", expecting to see flushed frontier of " << frontier.ToString(); |
2829 | 0 | std::unique_ptr<rocksdb::DB> test_db = VERIFY_RESULT( |
2830 | 0 | rocksdb::DB::Open(rocksdb_options, checkpoint_dir_for_test)); |
2831 | 0 | LOG_WITH_PREFIX(INFO) << "Getting flushed frontier from test RocksDB at " |
2832 | 0 | << checkpoint_dir_for_test; |
2833 | 0 | auto restored_flushed_frontier = test_db->GetFlushedFrontier(); |
2834 | 0 | if (!restored_flushed_frontier) { |
2835 | 0 | LOG_WITH_PREFIX(FATAL) << LogPrefix() << "Restored flushed frontier not present"; |
2836 | 0 | } |
2837 | 0 | CHECK_EQ( |
2838 | 0 | frontier, |
2839 | 0 | down_cast<docdb::ConsensusFrontier&>(*restored_flushed_frontier)); |
2840 | 0 | LOG_WITH_PREFIX(INFO) << "Successfully verified persistently stored flushed frontier: " |
2841 | 0 | << frontier.ToString(); |
2842 | 0 | } |
2843 | | |
2844 | 173k | if (intents_db_) { |
2845 | | // It is OK to flush intents even if the regular DB is not yet flushed, |
2846 | | // because it would wait for flush of regular DB if we have unflushed intents. |
2847 | | // Otherwise it does not matter which flushed op id is stored. |
2848 | 88.6k | RETURN_NOT_OK(intents_db_->ModifyFlushedFrontier(frontier.Clone(), mode)); |
2849 | 88.6k | } |
2850 | | |
2851 | 173k | return Flush(FlushMode::kAsync, flags); |
2852 | 173k | } |
2853 | | |
2854 | 171k | Status Tablet::Truncate(TruncateOperation* operation) { |
2855 | 171k | if (metadata_->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
2856 | | // We use only Raft log for transaction status table. |
2857 | 0 | return Status::OK(); |
2858 | 0 | } |
2859 | | |
2860 | 171k | auto op_pauses = VERIFY_RESULT(StartShutdownRocksDBs(DisableFlushOnShutdown::kTrue)); |
2861 | | |
2862 | | // Check if tablet is in shutdown mode. |
2863 | 171k | if (IsShutdownRequested()) { |
2864 | 0 | return STATUS(IllegalState, "Tablet was shut down"); |
2865 | 0 | } |
2866 | | |
2867 | 171k | const rocksdb::SequenceNumber sequence_number = regular_db_->GetLatestSequenceNumber(); |
2868 | 171k | const string db_dir = regular_db_->GetName(); |
2869 | | |
2870 | 171k | auto s = CompleteShutdownRocksDBs(Destroy::kTrue, &op_pauses); |
2871 | 171k | if (PREDICT_FALSE(!s.ok())) { |
2872 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to clean up db dir " << db_dir << ": " << s; |
2873 | 0 | return STATUS(IllegalState, "Failed to clean up db dir", s.ToString()); |
2874 | 0 | } |
2875 | | |
2876 | | // Create a new database. |
2877 | | // Note: db_dir == metadata()->rocksdb_dir() is still valid db dir. |
2878 | 171k | s = OpenKeyValueTablet(); |
2879 | 171k | if (PREDICT_FALSE(!s.ok())) { |
2880 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to create a new db: " << s; |
2881 | 0 | return s; |
2882 | 0 | } |
2883 | | |
2884 | 171k | docdb::ConsensusFrontier frontier; |
2885 | 171k | frontier.set_op_id(operation->op_id()); |
2886 | 171k | frontier.set_hybrid_time(operation->hybrid_time()); |
2887 | | // We use the kUpdate mode here, because unlike the case of restoring a snapshot to a completely |
2888 | | // different tablet in an arbitrary Raft group, here there is no possibility of the flushed |
2889 | | // frontier needing to go backwards. |
2890 | 171k | RETURN_NOT_OK(ModifyFlushedFrontier(frontier, rocksdb::FrontierModificationMode::kUpdate, |
2891 | 171k | FlushFlags::kAllDbs | FlushFlags::kNoScopedOperation)); |
2892 | | |
2893 | 171k | LOG_WITH_PREFIX(INFO) << "Created new db for truncated tablet"; |
2894 | 171k | LOG_WITH_PREFIX(INFO) << "Sequence numbers: old=" << sequence_number |
2895 | 171k | << ", new=" << regular_db_->GetLatestSequenceNumber(); |
2896 | | // Ensure that op_pauses stays in scope throughout this function. |
2897 | 343k | for (auto* op_pause : op_pauses.AsArray()) { |
2898 | 343k | DFATAL_OR_RETURN_NOT_OK(op_pause->status()); |
2899 | 343k | } |
2900 | 171k | return DoEnableCompactions(); |
2901 | 171k | } |
2902 | | |
2903 | 10.6M | void Tablet::UpdateMonotonicCounter(int64_t value) { |
2904 | 10.6M | int64_t counter = monotonic_counter_; |
2905 | 10.6M | while (true10.6M ) { |
2906 | 10.6M | if (counter >= value10.6M ) { |
2907 | 10.6M | break; |
2908 | 10.6M | } |
2909 | 18.4E | if (monotonic_counter_.compare_exchange_weak(counter, value)) { |
2910 | 153 | break; |
2911 | 153 | } |
2912 | 18.4E | } |
2913 | 10.6M | } |
2914 | | |
2915 | | //////////////////////////////////////////////////////////// |
2916 | | // Tablet |
2917 | | //////////////////////////////////////////////////////////// |
2918 | | |
2919 | 150k | Result<bool> Tablet::HasSSTables() const { |
2920 | 150k | if (!regular_db_) { |
2921 | 48.0k | return false; |
2922 | 48.0k | } |
2923 | | |
2924 | 102k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
2925 | 102k | RETURN_NOT_OK(scoped_read_operation); |
2926 | | |
2927 | 102k | std::vector<rocksdb::LiveFileMetaData> live_files_metadata; |
2928 | 102k | regular_db_->GetLiveFilesMetaData(&live_files_metadata); |
2929 | 102k | return !live_files_metadata.empty(); |
2930 | 102k | } |
2931 | | |
2932 | 79.4M | yb::OpId MaxPersistentOpIdForDb(rocksdb::DB* db, bool invalid_if_no_new_data) { |
2933 | | // A possible race condition could happen, when data is written between this query and |
2934 | | // actual log gc. But it is not a problem as long as we are reading committed op id |
2935 | | // before MaxPersistentOpId, since we always keep last committed entry in the log during garbage |
2936 | | // collection. |
2937 | | // See TabletPeer::GetEarliestNeededLogIndex |
2938 | 79.4M | if (db == nullptr || |
2939 | 79.4M | (58.7M invalid_if_no_new_data58.7M && |
2940 | 58.7M | db->GetFlushAbility() == rocksdb::FlushAbility::kNoNewData58.7M )) { |
2941 | 58.7M | return yb::OpId::Invalid(); |
2942 | 58.7M | } |
2943 | | |
2944 | 20.6M | rocksdb::UserFrontierPtr frontier = db->GetFlushedFrontier(); |
2945 | 20.6M | if (!frontier) { |
2946 | 17.6M | return yb::OpId(); |
2947 | 17.6M | } |
2948 | | |
2949 | 3.01M | return down_cast<docdb::ConsensusFrontier*>(frontier.get())->op_id(); |
2950 | 20.6M | } |
2951 | | |
2952 | 39.7M | Result<DocDbOpIds> Tablet::MaxPersistentOpId(bool invalid_if_no_new_data) const { |
2953 | 39.7M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
2954 | 39.7M | RETURN_NOT_OK(scoped_read_operation); |
2955 | | |
2956 | 39.7M | return DocDbOpIds{ |
2957 | 39.7M | MaxPersistentOpIdForDb(regular_db_.get(), invalid_if_no_new_data), |
2958 | 39.7M | MaxPersistentOpIdForDb(intents_db_.get(), invalid_if_no_new_data) |
2959 | 39.7M | }; |
2960 | 39.7M | } |
2961 | | |
2962 | 39.7M | void Tablet::FlushIntentsDbIfNecessary(const yb::OpId& lastest_log_entry_op_id) { |
2963 | 39.7M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
2964 | 39.7M | if (!scoped_read_operation.ok()) { |
2965 | 0 | return; |
2966 | 0 | } |
2967 | | |
2968 | 39.7M | auto intents_frontier = intents_db_ |
2969 | 39.7M | ? MemTableFrontierFromDb(intents_db_.get(), rocksdb::UpdateUserValueType::kLargest)19.0M : nullptr20.6M ; |
2970 | 39.7M | if (intents_frontier) { |
2971 | 2.45M | auto index_delta = |
2972 | 2.45M | lastest_log_entry_op_id.index - |
2973 | 2.45M | down_cast<docdb::ConsensusFrontier*>(intents_frontier.get())->op_id().index; |
2974 | 2.45M | if (index_delta > FLAGS_num_raft_ops_to_force_idle_intents_db_to_flush) { |
2975 | 4 | auto intents_flush_ability = intents_db_->GetFlushAbility(); |
2976 | 4 | if (intents_flush_ability == rocksdb::FlushAbility::kHasNewData) { |
2977 | 4 | LOG_WITH_PREFIX(INFO) |
2978 | 4 | << "Force flushing intents DB since it was not flushed for " << index_delta |
2979 | 4 | << " operations, while only " |
2980 | 4 | << FLAGS_num_raft_ops_to_force_idle_intents_db_to_flush << " is allowed"; |
2981 | 4 | rocksdb::FlushOptions options; |
2982 | 4 | options.wait = false; |
2983 | 4 | WARN_NOT_OK(intents_db_->Flush(options), "Flush intents db failed"); |
2984 | 4 | } |
2985 | 4 | } |
2986 | 2.45M | } |
2987 | 39.7M | } |
2988 | | |
2989 | 16.8M | bool Tablet::IsTransactionalRequest(bool is_ysql_request) const { |
2990 | | // We consider all YSQL tables within the sys catalog transactional. |
2991 | 16.8M | return txns_enabled_ && ( |
2992 | 16.8M | schema()->table_properties().is_transactional() || |
2993 | 16.8M | (15.9M is_sys_catalog_15.9M && is_ysql_request1.29M )); |
2994 | 16.8M | } |
2995 | | |
2996 | 2.79k | Result<HybridTime> Tablet::MaxPersistentHybridTime() const { |
2997 | 2.79k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
2998 | 2.79k | RETURN_NOT_OK(scoped_read_operation); |
2999 | | |
3000 | 2.79k | if (!regular_db_) { |
3001 | 559 | return HybridTime::kMin; |
3002 | 559 | } |
3003 | | |
3004 | 2.23k | HybridTime result = HybridTime::kMin; |
3005 | 2.23k | auto temp = regular_db_->GetFlushedFrontier(); |
3006 | 2.23k | if (temp) { |
3007 | 1.25k | result.MakeAtLeast(down_cast<docdb::ConsensusFrontier*>(temp.get())->hybrid_time()); |
3008 | 1.25k | } |
3009 | 2.23k | if (intents_db_) { |
3010 | 237 | temp = intents_db_->GetFlushedFrontier(); |
3011 | 237 | if (temp) { |
3012 | 103 | result.MakeAtLeast(down_cast<docdb::ConsensusFrontier*>(temp.get())->hybrid_time()); |
3013 | 103 | } |
3014 | 237 | } |
3015 | 2.23k | return result; |
3016 | 2.79k | } |
3017 | | |
3018 | 17.7k | Result<HybridTime> Tablet::OldestMutableMemtableWriteHybridTime() const { |
3019 | 17.7k | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
3020 | 17.7k | RETURN_NOT_OK(scoped_read_operation); |
3021 | | |
3022 | 17.7k | HybridTime result = HybridTime::kMax; |
3023 | 35.4k | for (auto* db : { regular_db_.get(), intents_db_.get() }) { |
3024 | 35.4k | if (db) { |
3025 | 16.7k | auto mem_frontier = MemTableFrontierFromDb(db, rocksdb::UpdateUserValueType::kSmallest); |
3026 | 16.7k | if (mem_frontier) { |
3027 | 7.15k | const auto hybrid_time = |
3028 | 7.15k | static_cast<const docdb::ConsensusFrontier&>(*mem_frontier).hybrid_time(); |
3029 | 7.15k | result = std::min(result, hybrid_time); |
3030 | 7.15k | } |
3031 | 16.7k | } |
3032 | 35.4k | } |
3033 | 17.7k | return result; |
3034 | 17.7k | } |
3035 | | |
3036 | 17.3M | const yb::SchemaPtr Tablet::schema() const { |
3037 | 17.3M | return metadata_->schema(); |
3038 | 17.3M | } |
3039 | | |
3040 | 0 | Status Tablet::DebugDump(vector<string> *lines) { |
3041 | 0 | switch (table_type_) { |
3042 | 0 | case TableType::PGSQL_TABLE_TYPE: FALLTHROUGH_INTENDED; |
3043 | 0 | case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED; |
3044 | 0 | case TableType::REDIS_TABLE_TYPE: |
3045 | 0 | DocDBDebugDump(lines); |
3046 | 0 | return Status::OK(); |
3047 | 0 | case TableType::TRANSACTION_STATUS_TABLE_TYPE: |
3048 | 0 | return Status::OK(); |
3049 | 0 | } |
3050 | 0 | FATAL_INVALID_ENUM_VALUE(TableType, table_type_); |
3051 | 0 | } |
3052 | | |
3053 | 0 | void Tablet::DocDBDebugDump(vector<string> *lines) { |
3054 | 0 | LOG_STRING(INFO, lines) << "Dumping tablet:"; |
3055 | 0 | LOG_STRING(INFO, lines) << "---------------------------"; |
3056 | 0 | docdb::DocDBDebugDump(regular_db_.get(), LOG_STRING(INFO, lines), docdb::StorageDbType::kRegular); |
3057 | 0 | } |
3058 | | |
3059 | 0 | Status Tablet::TEST_SwitchMemtable() { |
3060 | 0 | auto scoped_operation = CreateNonAbortableScopedRWOperation(); |
3061 | 0 | RETURN_NOT_OK(scoped_operation); |
3062 | | |
3063 | 0 | if (regular_db_) { |
3064 | 0 | regular_db_->TEST_SwitchMemtable(); |
3065 | 0 | } else { |
3066 | 0 | LOG_WITH_PREFIX(INFO) << "Ignoring TEST_SwitchMemtable: no regular RocksDB"; |
3067 | 0 | } |
3068 | 0 | return Status::OK(); |
3069 | 0 | } |
3070 | | |
3071 | | Result<HybridTime> Tablet::DoGetSafeTime( |
3072 | 10.2M | RequireLease require_lease, HybridTime min_allowed, CoarseTimePoint deadline) const { |
3073 | 10.2M | if (require_lease == RequireLease::kFalse) { |
3074 | 404k | return mvcc_.SafeTimeForFollower(min_allowed, deadline); |
3075 | 404k | } |
3076 | 9.87M | FixedHybridTimeLease ht_lease; |
3077 | 9.87M | if (ht_lease_provider_) { |
3078 | | // This will block until a leader lease reaches the given value or a timeout occurs. |
3079 | 9.85M | auto ht_lease_result = ht_lease_provider_(min_allowed, deadline); |
3080 | 9.85M | if (!ht_lease_result.ok()) { |
3081 | 3 | if (require_lease == RequireLease::kFallbackToFollower && |
3082 | 3 | ht_lease_result.status().IsIllegalState()0 ) { |
3083 | 0 | return mvcc_.SafeTimeForFollower(min_allowed, deadline); |
3084 | 0 | } |
3085 | 3 | return ht_lease_result.status(); |
3086 | 3 | } |
3087 | 9.85M | ht_lease = *ht_lease_result; |
3088 | 9.85M | if (min_allowed > ht_lease.time) { |
3089 | 0 | return STATUS_FORMAT( |
3090 | 0 | InternalError, "Read request hybrid time after current time: $0, lease: $1", |
3091 | 0 | min_allowed, ht_lease); |
3092 | 0 | } |
3093 | 9.85M | } else if (27.3k min_allowed27.3k ) { |
3094 | 918 | RETURN_NOT_OK(WaitUntil(clock_.get(), min_allowed, deadline)); |
3095 | 918 | } |
3096 | 9.87M | if (min_allowed > ht_lease.lease) { |
3097 | 0 | return STATUS_FORMAT( |
3098 | 0 | InternalError, "Read request hybrid time after leader lease: $0, lease: $1", |
3099 | 0 | min_allowed, ht_lease); |
3100 | 0 | } |
3101 | 9.87M | return mvcc_.SafeTime(min_allowed, deadline, ht_lease); |
3102 | 9.87M | } |
3103 | | |
3104 | 27.5k | ScopedRWOperationPause Tablet::PauseWritePermits(CoarseTimePoint deadline) { |
3105 | 27.5k | TRACE("Blocking write permit(s)"); |
3106 | 27.5k | auto se = ScopeExit([] { TRACE("Blocking write permit(s) done"); }); |
3107 | | // Prevent new write ops from being submitted. |
3108 | 27.5k | return ScopedRWOperationPause(&write_ops_being_submitted_counter_, deadline, Stop::kFalse); |
3109 | 27.5k | } |
3110 | | |
3111 | 3.25M | ScopedRWOperation Tablet::GetPermitToWrite(CoarseTimePoint deadline) { |
3112 | 3.25M | TRACE("Acquiring write permit"); |
3113 | 3.26M | auto se = ScopeExit([] { TRACE("Acquiring write permit done"); }); |
3114 | 3.25M | return ScopedRWOperation(&write_ops_being_submitted_counter_); |
3115 | 3.25M | } |
3116 | | |
3117 | 3.09M | Result<bool> Tablet::StillHasOrphanedPostSplitData() { |
3118 | 3.09M | auto scoped_operation = CreateNonAbortableScopedRWOperation(); |
3119 | 3.09M | RETURN_NOT_OK(scoped_operation); |
3120 | 3.09M | return doc_db().key_bounds->IsInitialized() && !metadata()->has_been_fully_compacted()139k ; |
3121 | 3.09M | } |
3122 | | |
3123 | 2.35M | bool Tablet::MayHaveOrphanedPostSplitData() { |
3124 | 2.35M | auto res = StillHasOrphanedPostSplitData(); |
3125 | 2.35M | if (!res.ok()) { |
3126 | 214 | LOG(WARNING) << "Failed to call StillHasOrphanedPostSplitData: " << res.ToString(); |
3127 | 214 | return true; |
3128 | 214 | } |
3129 | 2.35M | return res.get(); |
3130 | 2.35M | } |
3131 | | |
3132 | 597k | bool Tablet::ShouldDisableLbMove() { |
3133 | 597k | auto still_has_parent_data_result = StillHasOrphanedPostSplitData(); |
3134 | 597k | if (still_has_parent_data_result.ok()597k ) { |
3135 | 597k | return still_has_parent_data_result.get(); |
3136 | 597k | } |
3137 | | // If this call failed, one of three things may be true: |
3138 | | // 1. We are in the middle of a tablet shutdown. |
3139 | | // |
3140 | | // In this case, what we report is not of much consequence, as the load balancer shouldn't try to |
3141 | | // move us anyways. We choose to return false. |
3142 | | // |
3143 | | // 2. We are in the middle of a TRUNCATE. |
3144 | | // |
3145 | | // In this case, any concurrent attempted LB move should fail before trying to move data, |
3146 | | // since the RocksDB instances are destroyed. On top of that, we do want to allow the LB to move |
3147 | | // this tablet after the TRUNCATE completes, so we should return false. |
3148 | | // |
3149 | | // 3. We are in the middle of an AlterSchema operation. This is only true for tablets belonging to |
3150 | | // colocated tables. |
3151 | | // |
3152 | | // In this case, we want to disable tablet moves. We conservatively return true for any failure |
3153 | | // if the tablet is part of a colocated table. |
3154 | 18.4E | return metadata_->schema()->has_colocation_id(); |
3155 | 597k | } |
3156 | | |
3157 | 6 | void Tablet::ForceRocksDBCompactInTest() { |
3158 | 6 | CHECK_OK(ForceFullRocksDBCompact()); |
3159 | 6 | } |
3160 | | |
3161 | 139 | Status Tablet::ForceFullRocksDBCompact() { |
3162 | 139 | auto scoped_operation = CreateAbortableScopedRWOperation(); |
3163 | 139 | RETURN_NOT_OK(scoped_operation); |
3164 | | |
3165 | 139 | if (regular_db_) { |
3166 | 139 | RETURN_NOT_OK(docdb::ForceRocksDBCompact(regular_db_.get())); |
3167 | 139 | } |
3168 | 139 | if (intents_db_) { |
3169 | 94 | RETURN_NOT_OK_PREPEND( |
3170 | 94 | intents_db_->Flush(rocksdb::FlushOptions()), "Pre-compaction flush of intents db failed"); |
3171 | 94 | RETURN_NOT_OK(docdb::ForceRocksDBCompact(intents_db_.get())); |
3172 | 94 | } |
3173 | 139 | return Status::OK(); |
3174 | 139 | } |
3175 | | |
3176 | 7 | std::string Tablet::TEST_DocDBDumpStr(IncludeIntents include_intents) { |
3177 | 7 | if (!regular_db_) return ""0 ; |
3178 | | |
3179 | 7 | if (!include_intents) { |
3180 | 0 | return docdb::DocDBDebugDumpToStr(doc_db().WithoutIntents()); |
3181 | 0 | } |
3182 | | |
3183 | 7 | return docdb::DocDBDebugDumpToStr(doc_db()); |
3184 | 7 | } |
3185 | | |
3186 | | void Tablet::TEST_DocDBDumpToContainer( |
3187 | 7 | IncludeIntents include_intents, std::unordered_set<std::string>* out) { |
3188 | 7 | if (!regular_db_) return0 ; |
3189 | | |
3190 | 7 | if (!include_intents) { |
3191 | 0 | return docdb::DocDBDebugDumpToContainer(doc_db().WithoutIntents(), out); |
3192 | 0 | } |
3193 | | |
3194 | 7 | return docdb::DocDBDebugDumpToContainer(doc_db(), out); |
3195 | 7 | } |
3196 | | |
3197 | 0 | void Tablet::TEST_DocDBDumpToLog(IncludeIntents include_intents) { |
3198 | 0 | if (!regular_db_) { |
3199 | 0 | LOG_WITH_PREFIX(INFO) << "No RocksDB to dump"; |
3200 | 0 | return; |
3201 | 0 | } |
3202 | | |
3203 | 0 | docdb::DumpRocksDBToLog(regular_db_.get(), StorageDbType::kRegular, LogPrefix()); |
3204 | |
|
3205 | 0 | if (include_intents && intents_db_) { |
3206 | 0 | docdb::DumpRocksDBToLog(intents_db_.get(), StorageDbType::kIntents, LogPrefix()); |
3207 | 0 | } |
3208 | 0 | } |
3209 | | |
3210 | 0 | size_t Tablet::TEST_CountRegularDBRecords() { |
3211 | 0 | if (!regular_db_) return 0; |
3212 | 0 | rocksdb::ReadOptions read_opts; |
3213 | 0 | read_opts.query_id = rocksdb::kDefaultQueryId; |
3214 | 0 | docdb::BoundedRocksDbIterator iter(regular_db_.get(), read_opts, &key_bounds_); |
3215 | |
|
3216 | 0 | size_t result = 0; |
3217 | 0 | for (iter.SeekToFirst(); iter.Valid(); iter.Next()) { |
3218 | 0 | ++result; |
3219 | 0 | } |
3220 | 0 | return result; |
3221 | 0 | } |
3222 | | |
3223 | | template <class F> |
3224 | 54.5M | auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const { |
3225 | 54.5M | auto scoped_operation = CreateNonAbortableScopedRWOperation(); |
3226 | 54.5M | std::lock_guard<rw_spinlock> lock(component_lock_); |
3227 | | |
3228 | | // In order to get actual stats we would have to wait. |
3229 | | // This would give us correct stats but would make this request slower. |
3230 | 54.5M | if (!scoped_operation.ok()54.5M || !regular_db_) { |
3231 | 12.3M | return default_value; |
3232 | 12.3M | } |
3233 | 42.1M | return func(); |
3234 | 54.5M | } tablet.cc:auto yb::tablet::Tablet::GetRegularDbStat<yb::tablet::Tablet::GetCurrentVersionSstFilesSize() const::$_3>(yb::tablet::Tablet::GetCurrentVersionSstFilesSize() const::$_3 const&, decltype(fp()) const&) const Line | Count | Source | 3224 | 8.07k | auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const { | 3225 | 8.07k | auto scoped_operation = CreateNonAbortableScopedRWOperation(); | 3226 | 8.07k | std::lock_guard<rw_spinlock> lock(component_lock_); | 3227 | | | 3228 | | // In order to get actual stats we would have to wait. | 3229 | | // This would give us correct stats but would make this request slower. | 3230 | 8.07k | if (!scoped_operation.ok() || !regular_db_) { | 3231 | 2.94k | return default_value; | 3232 | 2.94k | } | 3233 | 5.12k | return func(); | 3234 | 8.07k | } |
tablet.cc:auto yb::tablet::Tablet::GetRegularDbStat<yb::tablet::Tablet::GetCurrentVersionSstFilesUncompressedSize() const::$_4>(yb::tablet::Tablet::GetCurrentVersionSstFilesUncompressedSize() const::$_4 const&, decltype(fp()) const&) const Line | Count | Source | 3224 | 8.07k | auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const { | 3225 | 8.07k | auto scoped_operation = CreateNonAbortableScopedRWOperation(); | 3226 | 8.07k | std::lock_guard<rw_spinlock> lock(component_lock_); | 3227 | | | 3228 | | // In order to get actual stats we would have to wait. | 3229 | | // This would give us correct stats but would make this request slower. | 3230 | 8.07k | if (!scoped_operation.ok() || !regular_db_) { | 3231 | 2.94k | return default_value; | 3232 | 2.94k | } | 3233 | 5.12k | return func(); | 3234 | 8.07k | } |
tablet.cc:auto yb::tablet::Tablet::GetRegularDbStat<yb::tablet::Tablet::GetCurrentVersionSstFilesAllSizes() const::$_5>(yb::tablet::Tablet::GetCurrentVersionSstFilesAllSizes() const::$_5 const&, decltype(fp()) const&) const Line | Count | Source | 3224 | 2.35M | auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const { | 3225 | 2.35M | auto scoped_operation = CreateNonAbortableScopedRWOperation(); | 3226 | 2.35M | std::lock_guard<rw_spinlock> lock(component_lock_); | 3227 | | | 3228 | | // In order to get actual stats we would have to wait. | 3229 | | // This would give us correct stats but would make this request slower. | 3230 | 2.35M | if (!scoped_operation.ok() || !regular_db_2.35M ) { | 3231 | 709k | return default_value; | 3232 | 709k | } | 3233 | 1.64M | return func(); | 3234 | 2.35M | } |
tablet.cc:auto yb::tablet::Tablet::GetRegularDbStat<yb::tablet::Tablet::GetCurrentVersionNumSSTFiles() const::$_6>(yb::tablet::Tablet::GetCurrentVersionNumSSTFiles() const::$_6 const&, decltype(fp()) const&) const Line | Count | Source | 3224 | 52.1M | auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const { | 3225 | 52.1M | auto scoped_operation = CreateNonAbortableScopedRWOperation(); | 3226 | 52.1M | std::lock_guard<rw_spinlock> lock(component_lock_); | 3227 | | | 3228 | | // In order to get actual stats we would have to wait. | 3229 | | // This would give us correct stats but would make this request slower. | 3230 | 52.2M | if (!scoped_operation.ok()52.1M || !regular_db_) { | 3231 | 11.6M | return default_value; | 3232 | 11.6M | } | 3233 | 40.5M | return func(); | 3234 | 52.1M | } |
|
3235 | | |
3236 | 8.07k | uint64_t Tablet::GetCurrentVersionSstFilesSize() const { |
3237 | 8.07k | return GetRegularDbStat([this] { |
3238 | 5.12k | return regular_db_->GetCurrentVersionSstFilesSize(); |
3239 | 5.12k | }, 0); |
3240 | 8.07k | } |
3241 | | |
3242 | 8.07k | uint64_t Tablet::GetCurrentVersionSstFilesUncompressedSize() const { |
3243 | 8.07k | return GetRegularDbStat([this] { |
3244 | 5.12k | return regular_db_->GetCurrentVersionSstFilesUncompressedSize(); |
3245 | 5.12k | }, 0); |
3246 | 8.07k | } |
3247 | | |
3248 | 2.35M | std::pair<uint64_t, uint64_t> Tablet::GetCurrentVersionSstFilesAllSizes() const { |
3249 | 2.35M | return GetRegularDbStat([this] { |
3250 | 1.64M | return regular_db_->GetCurrentVersionSstFilesAllSizes(); |
3251 | 1.64M | }, std::pair<uint64_t, uint64_t>(0, 0)); |
3252 | 2.35M | } |
3253 | | |
3254 | 52.1M | uint64_t Tablet::GetCurrentVersionNumSSTFiles() const { |
3255 | 52.1M | return GetRegularDbStat([this] { |
3256 | 40.5M | return regular_db_->GetCurrentVersionNumSSTFiles(); |
3257 | 40.5M | }, 0); |
3258 | 52.1M | } |
3259 | | |
3260 | 4.39k | std::pair<int, int> Tablet::GetNumMemtables() const { |
3261 | 4.39k | int intents_num_memtables = 0; |
3262 | 4.39k | int regular_num_memtables = 0; |
3263 | | |
3264 | 4.39k | { |
3265 | 4.39k | auto scoped_operation = CreateNonAbortableScopedRWOperation(); |
3266 | 4.39k | if (!scoped_operation.ok()) { |
3267 | 0 | return std::make_pair(0, 0); |
3268 | 0 | } |
3269 | 4.39k | std::lock_guard<rw_spinlock> lock(component_lock_); |
3270 | 4.39k | if (intents_db_) { |
3271 | | // NOTE: 1 is added on behalf of cfd->mem(). |
3272 | 1.47k | intents_num_memtables = 1 + intents_db_->GetCfdImmNumNotFlushed(); |
3273 | 1.47k | } |
3274 | 4.39k | if (regular_db_) { |
3275 | | // NOTE: 1 is added on behalf of cfd->mem(). |
3276 | 1.88k | regular_num_memtables = 1 + regular_db_->GetCfdImmNumNotFlushed(); |
3277 | 1.88k | } |
3278 | 4.39k | } |
3279 | | |
3280 | 0 | return std::make_pair(intents_num_memtables, regular_num_memtables); |
3281 | 4.39k | } |
3282 | | |
3283 | | // ------------------------------------------------------------------------------------------------ |
3284 | | |
3285 | | Result<TransactionOperationContext> Tablet::CreateTransactionOperationContext( |
3286 | | const TransactionMetadataPB& transaction_metadata, |
3287 | | bool is_ysql_catalog_table, |
3288 | 13.8M | const SubTransactionMetadataPB* subtransaction_metadata) const { |
3289 | 13.8M | if (!txns_enabled_) |
3290 | 177k | return TransactionOperationContext(); |
3291 | | |
3292 | 13.7M | if (transaction_metadata.has_transaction_id()) { |
3293 | 3.44M | Result<TransactionId> txn_id = FullyDecodeTransactionId( |
3294 | 3.44M | transaction_metadata.transaction_id()); |
3295 | 3.44M | RETURN_NOT_OK(txn_id); |
3296 | 3.44M | return CreateTransactionOperationContext( |
3297 | 3.44M | boost::make_optional(*txn_id), is_ysql_catalog_table, subtransaction_metadata); |
3298 | 10.2M | } else { |
3299 | 10.2M | return CreateTransactionOperationContext( |
3300 | 10.2M | /* transaction_id */ boost::none, is_ysql_catalog_table, subtransaction_metadata); |
3301 | 10.2M | } |
3302 | 13.7M | } |
3303 | | |
3304 | | Result<TransactionOperationContext> Tablet::CreateTransactionOperationContext( |
3305 | | const boost::optional<TransactionId>& transaction_id, |
3306 | | bool is_ysql_catalog_table, |
3307 | 14.1M | const SubTransactionMetadataPB* subtransaction_metadata) const { |
3308 | 14.1M | if (!txns_enabled_) { |
3309 | 1.24k | return TransactionOperationContext(); |
3310 | 1.24k | } |
3311 | | |
3312 | 14.1M | const TransactionId* txn_id = nullptr; |
3313 | | |
3314 | 14.1M | if (transaction_id.is_initialized()) { |
3315 | 3.44M | txn_id = transaction_id.get_ptr(); |
3316 | 10.6M | } else if (metadata_->schema()->table_properties().is_transactional() || is_ysql_catalog_table10.2M ) { |
3317 | | // deadbeef-dead-beef-dead-beef00000075 |
3318 | 1.50M | static const TransactionId kArbitraryTxnIdForNonTxnReads( |
3319 | 1.50M | 17275436393656397278ULL, 8430738506459819486ULL); |
3320 | | // We still need context with transaction participant in order to resolve intents during |
3321 | | // possible reads. |
3322 | 1.50M | txn_id = &kArbitraryTxnIdForNonTxnReads; |
3323 | 9.17M | } else { |
3324 | 9.17M | return TransactionOperationContext(); |
3325 | 9.17M | } |
3326 | | |
3327 | 4.95M | if (!transaction_participant_) { |
3328 | 0 | return STATUS(IllegalState, "Transactional operation for non transactional tablet"); |
3329 | 0 | } |
3330 | | |
3331 | 4.95M | if (!subtransaction_metadata) { |
3332 | 400k | return TransactionOperationContext(*txn_id, transaction_participant()); |
3333 | 400k | } |
3334 | | |
3335 | 4.55M | auto subtxn = VERIFY_RESULT(SubTransactionMetadata::FromPB(*subtransaction_metadata)); |
3336 | 0 | return TransactionOperationContext(*txn_id, std::move(subtxn), transaction_participant()); |
3337 | 4.55M | } |
3338 | | |
3339 | | Status Tablet::CreateReadIntents( |
3340 | | const TransactionMetadataPB& transaction_metadata, |
3341 | | const SubTransactionMetadataPB& subtransaction_metadata, |
3342 | | const google::protobuf::RepeatedPtrField<QLReadRequestPB>& ql_batch, |
3343 | | const google::protobuf::RepeatedPtrField<PgsqlReadRequestPB>& pgsql_batch, |
3344 | 314k | docdb::KeyValueWriteBatchPB* write_batch) { |
3345 | 314k | auto txn_op_ctx = VERIFY_RESULT(CreateTransactionOperationContext( |
3346 | 314k | transaction_metadata, |
3347 | 314k | /* is_ysql_catalog_table */ pgsql_batch.size() > 0 && is_sys_catalog_, |
3348 | 314k | &subtransaction_metadata)); |
3349 | | |
3350 | 0 | for (const auto& ql_read : ql_batch) { |
3351 | 0 | docdb::QLReadOperation doc_op(ql_read, txn_op_ctx); |
3352 | 0 | RETURN_NOT_OK(doc_op.GetIntents(*GetSchema(), write_batch)); |
3353 | 0 | } |
3354 | | |
3355 | 2.76M | for (const auto& pgsql_read : pgsql_batch)314k { |
3356 | 2.76M | docdb::PgsqlReadOperation doc_op(pgsql_read, txn_op_ctx); |
3357 | 2.76M | RETURN_NOT_OK(doc_op.GetIntents(*GetSchema(pgsql_read.table_id()), write_batch)); |
3358 | 2.76M | } |
3359 | | |
3360 | 314k | return Status::OK(); |
3361 | 314k | } |
3362 | | |
3363 | 5.53M | bool Tablet::ShouldApplyWrite() { |
3364 | 5.53M | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
3365 | 5.53M | if (!scoped_read_operation.ok()) { |
3366 | 0 | return false; |
3367 | 0 | } |
3368 | | |
3369 | 5.53M | return !regular_db_->NeedsDelay(); |
3370 | 5.53M | } |
3371 | | |
3372 | 1.35M | Result<IsolationLevel> Tablet::GetIsolationLevel(const TransactionMetadataPB& transaction) { |
3373 | 1.35M | if (transaction.has_isolation()) { |
3374 | 738k | return transaction.isolation(); |
3375 | 738k | } |
3376 | 619k | return VERIFY_RESULT618k (transaction_participant_->PrepareMetadata(transaction)).isolation618k ; |
3377 | 619k | } |
3378 | | |
3379 | | Result<RaftGroupMetadataPtr> Tablet::CreateSubtablet( |
3380 | | const TabletId& tablet_id, const Partition& partition, const docdb::KeyBounds& key_bounds, |
3381 | 141 | const yb::OpId& split_op_id, const HybridTime& split_op_hybrid_time) { |
3382 | 141 | auto scoped_read_operation = CreateNonAbortableScopedRWOperation(); |
3383 | 141 | RETURN_NOT_OK(scoped_read_operation); |
3384 | | |
3385 | 141 | RETURN_NOT_OK(Flush(FlushMode::kSync)); |
3386 | | |
3387 | 141 | auto metadata = VERIFY_RESULT(metadata_->CreateSubtabletMetadata( |
3388 | 141 | tablet_id, partition, key_bounds.lower.ToStringBuffer(), key_bounds.upper.ToStringBuffer())); |
3389 | | |
3390 | 141 | RETURN_NOT_OK(snapshots_->CreateCheckpoint( |
3391 | 141 | metadata->rocksdb_dir(), CreateIntentsCheckpointIn::kSubDir)); |
3392 | | |
3393 | | // We want flushed frontier to cover split_op_id, so during bootstrap of after-split tablets |
3394 | | // we don't replay split operation. |
3395 | 141 | docdb::ConsensusFrontier frontier; |
3396 | 141 | frontier.set_op_id(split_op_id); |
3397 | 141 | frontier.set_hybrid_time(split_op_hybrid_time); |
3398 | | |
3399 | 141 | struct RocksDbDirWithType { |
3400 | 141 | std::string db_dir; |
3401 | 141 | docdb::StorageDbType db_type; |
3402 | 141 | }; |
3403 | 141 | boost::container::static_vector<RocksDbDirWithType, 2> subtablet_rocksdbs( |
3404 | 141 | {{ metadata->rocksdb_dir(), docdb::StorageDbType::kRegular }}); |
3405 | 141 | if (intents_db_) { |
3406 | 95 | subtablet_rocksdbs.push_back( |
3407 | 95 | { metadata->intents_rocksdb_dir(), docdb::StorageDbType::kIntents }); |
3408 | 95 | } |
3409 | 236 | for (auto rocksdb : subtablet_rocksdbs) { |
3410 | 236 | rocksdb::Options rocksdb_options; |
3411 | 236 | docdb::InitRocksDBOptions( |
3412 | 236 | &rocksdb_options, MakeTabletLogPrefix(tablet_id, log_prefix_suffix_, rocksdb.db_type), |
3413 | 236 | /* statistics */ nullptr, tablet_options_); |
3414 | 236 | rocksdb_options.create_if_missing = false; |
3415 | | // Disable background compactions, we only need to update flushed frontier. |
3416 | 236 | rocksdb_options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone; |
3417 | 236 | std::unique_ptr<rocksdb::DB> db = |
3418 | 236 | VERIFY_RESULT(rocksdb::DB::Open(rocksdb_options, rocksdb.db_dir)); |
3419 | 236 | RETURN_NOT_OK( |
3420 | 236 | db->ModifyFlushedFrontier(frontier.Clone(), rocksdb::FrontierModificationMode::kUpdate)); |
3421 | 236 | } |
3422 | 141 | return metadata; |
3423 | 141 | } |
3424 | | |
3425 | 0 | Result<int64_t> Tablet::CountIntents() { |
3426 | 0 | auto pending_op = CreateNonAbortableScopedRWOperation(); |
3427 | 0 | RETURN_NOT_OK(pending_op); |
3428 | | |
3429 | 0 | if (!intents_db_) { |
3430 | 0 | return 0; |
3431 | 0 | } |
3432 | 0 | rocksdb::ReadOptions read_options; |
3433 | 0 | auto intent_iter = std::unique_ptr<rocksdb::Iterator>( |
3434 | 0 | intents_db_->NewIterator(read_options)); |
3435 | 0 | int64_t num_intents = 0; |
3436 | 0 | intent_iter->SeekToFirst(); |
3437 | 0 | while (intent_iter->Valid()) { |
3438 | 0 | num_intents++; |
3439 | 0 | intent_iter->Next(); |
3440 | 0 | } |
3441 | 0 | return num_intents; |
3442 | 0 | } |
3443 | | |
3444 | 225k | void Tablet::ListenNumSSTFilesChanged(std::function<void()> listener) { |
3445 | 225k | std::lock_guard<std::mutex> lock(num_sst_files_changed_listener_mutex_); |
3446 | 225k | bool has_new_listener = listener != nullptr; |
3447 | 225k | bool has_old_listener = num_sst_files_changed_listener_ != nullptr; |
3448 | 225k | LOG_IF_WITH_PREFIX63 (DFATAL, has_new_listener == has_old_listener) |
3449 | 63 | << __func__ << " in wrong state, has_old_listener: " << has_old_listener; |
3450 | 225k | num_sst_files_changed_listener_ = std::move(listener); |
3451 | 225k | } |
3452 | | |
3453 | | void Tablet::InitRocksDBOptions( |
3454 | | rocksdb::Options* options, const std::string& log_prefix, |
3455 | 451k | rocksdb::BlockBasedTableOptions table_options) { |
3456 | 451k | docdb::InitRocksDBOptions( |
3457 | 451k | options, log_prefix, regulardb_statistics_, tablet_options_, std::move(table_options)); |
3458 | 451k | } |
3459 | | |
3460 | 3.59k | rocksdb::Env& Tablet::rocksdb_env() const { |
3461 | 3.59k | return *tablet_options_.rocksdb_env; |
3462 | 3.59k | } |
3463 | | |
3464 | 3.93M | const std::string& Tablet::tablet_id() const { |
3465 | 3.93M | return metadata_->raft_group_id(); |
3466 | 3.93M | } |
3467 | | |
3468 | 144 | Result<std::string> Tablet::GetEncodedMiddleSplitKey() const { |
3469 | 144 | auto error_prefix = [this]() { |
3470 | 0 | return Format( |
3471 | 0 | "Failed to detect middle key for tablet $0 (key_bounds: $1 - $2)", |
3472 | 0 | tablet_id(), |
3473 | 0 | Slice(key_bounds_.lower).ToDebugHexString(), |
3474 | 0 | Slice(key_bounds_.upper).ToDebugHexString()); |
3475 | 0 | }; |
3476 | | |
3477 | | // TODO(tsplit): should take key_bounds_ into account. |
3478 | 144 | auto middle_key = VERIFY_RESULT143 (regular_db_->GetMiddleKey());143 |
3479 | | |
3480 | | // In some rare cases middle key can point to a special internal record which is not visible |
3481 | | // for a user, but tablet splitting routines expect the specific structure for partition keys |
3482 | | // that does not match the struct of the internally used records. Moreover, it is expected |
3483 | | // to have two child tablets with alive user records after the splitting, but the split |
3484 | | // by the internal record will lead to a case when one tablet will consist of internal records |
3485 | | // only and these records will be compacted out at some point making an empty tablet. |
3486 | 143 | if (PREDICT_FALSE(docdb::IsInternalRecordKeyType(docdb::DecodeValueType(middle_key[0])))) { |
3487 | 0 | return STATUS_FORMAT( |
3488 | 0 | IllegalState, "$0: got internal record \"$1\"", |
3489 | 0 | error_prefix(), Slice(middle_key).ToDebugHexString()); |
3490 | 0 | } |
3491 | | |
3492 | 143 | const auto key_part = metadata()->partition_schema()->IsHashPartitioning() |
3493 | 143 | ? docdb::DocKeyPart::kUpToHashCode |
3494 | 143 | : docdb::DocKeyPart::kWholeDocKey0 ; |
3495 | 143 | const auto split_key_size = VERIFY_RESULT(DocKey::EncodedSize(middle_key, key_part)); |
3496 | 143 | if (PREDICT_FALSE(split_key_size == 0)) { |
3497 | | // Using this verification just to have a more sensible message. The below verification will |
3498 | | // not pass with split_key_size == 0 also, but its message is not accurate enough. This failure |
3499 | | // may happen when a key cannot be decoded with key_part inside DocKey::EncodedSize and the key |
3500 | | // still valid for any reason (e.g. gettining non-hash key for hash partitioning). |
3501 | 0 | return STATUS_FORMAT( |
3502 | 0 | IllegalState, "$0: got unexpected key \"$1\"", |
3503 | 0 | error_prefix(), Slice(middle_key).ToDebugHexString()); |
3504 | 0 | } |
3505 | | |
3506 | 143 | middle_key.resize(split_key_size); |
3507 | 143 | const Slice middle_key_slice(middle_key); |
3508 | 143 | if (middle_key_slice.compare(key_bounds_.lower) <= 0 || |
3509 | 143 | (!key_bounds_.upper.empty() && middle_key_slice.compare(key_bounds_.upper) >= 075 )) { |
3510 | 0 | return STATUS_FORMAT( |
3511 | 0 | IllegalState, |
3512 | 0 | "$0: got \"$1\". This can happen if post-split tablet wasn't fully compacted after split", |
3513 | 0 | error_prefix(), middle_key_slice.ToDebugHexString()); |
3514 | 0 | } |
3515 | 143 | return middle_key; |
3516 | 143 | } |
3517 | | |
3518 | | Status Tablet::TriggerPostSplitCompactionIfNeeded( |
3519 | 142k | std::function<std::unique_ptr<ThreadPoolToken>()> get_token_for_compaction) { |
3520 | 142k | if (post_split_compaction_task_pool_token_) { |
3521 | 0 | return STATUS( |
3522 | 0 | IllegalState, "Already triggered post split compaction for this tablet instance."); |
3523 | 0 | } |
3524 | 142k | if (VERIFY_RESULT(StillHasOrphanedPostSplitData())) { |
3525 | 132 | post_split_compaction_task_pool_token_ = get_token_for_compaction(); |
3526 | 132 | return post_split_compaction_task_pool_token_->SubmitFunc( |
3527 | 132 | std::bind(&Tablet::TriggerPostSplitCompactionSync, this)); |
3528 | 132 | } |
3529 | 142k | return Status::OK(); |
3530 | 142k | } |
3531 | | |
3532 | 133 | void Tablet::TriggerPostSplitCompactionSync() { |
3533 | 133 | TEST_PAUSE_IF_FLAG(TEST_pause_before_post_split_compaction); |
3534 | 133 | WARN_WITH_PREFIX_NOT_OK( |
3535 | 133 | ForceFullRocksDBCompact(), LogPrefix() + "Failed to compact post-split tablet."); |
3536 | 133 | } |
3537 | | |
3538 | 3 | Status Tablet::VerifyDataIntegrity() { |
3539 | 3 | LOG_WITH_PREFIX(INFO) << "Beginning data integrity checks on this tablet"; |
3540 | | |
3541 | | // Verify regular db. |
3542 | 3 | if (regular_db_) { |
3543 | 3 | const auto& db_dir = metadata()->rocksdb_dir(); |
3544 | 3 | RETURN_NOT_OK(OpenDbAndCheckIntegrity(db_dir)); |
3545 | 3 | } |
3546 | | |
3547 | | // Verify intents db. |
3548 | 1 | if (intents_db_) { |
3549 | 0 | const auto& db_dir = metadata()->intents_rocksdb_dir(); |
3550 | 0 | RETURN_NOT_OK(OpenDbAndCheckIntegrity(db_dir)); |
3551 | 0 | } |
3552 | | |
3553 | 1 | return Status::OK(); |
3554 | 1 | } |
3555 | | |
3556 | 3 | Status Tablet::OpenDbAndCheckIntegrity(const std::string& db_dir) { |
3557 | | // Similar to ldb's CheckConsistency, we open db as read-only with paranoid checks on. |
3558 | | // If any corruption is detected then the open will fail with a Corruption status. |
3559 | 3 | rocksdb::Options db_opts; |
3560 | 3 | InitRocksDBOptions(&db_opts, LogPrefix()); |
3561 | 3 | db_opts.paranoid_checks = true; |
3562 | | |
3563 | 3 | std::unique_ptr<rocksdb::DB> db; |
3564 | 3 | rocksdb::DB* db_raw = nullptr; |
3565 | 3 | rocksdb::Status st = rocksdb::DB::OpenForReadOnly(db_opts, db_dir, &db_raw); |
3566 | 3 | if (db_raw != nullptr) { |
3567 | 1 | db.reset(db_raw); |
3568 | 1 | } |
3569 | 3 | if (!st.ok()) { |
3570 | 2 | if (st.IsCorruption()) { |
3571 | 2 | LOG_WITH_PREFIX(WARNING) << "Detected rocksdb data corruption: " << st; |
3572 | | // TODO: should we bump metric here or in top-level validation or both? |
3573 | 2 | metrics()->tablet_data_corruptions->Increment(); |
3574 | 2 | return st; |
3575 | 2 | } |
3576 | | |
3577 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to open read-only RocksDB in directory " << db_dir |
3578 | 0 | << ": " << st; |
3579 | 0 | return Status::OK(); |
3580 | 2 | } |
3581 | | |
3582 | | // TODO: we can add more checks here to verify block contents/checksums |
3583 | | |
3584 | 1 | return Status::OK(); |
3585 | 3 | } |
3586 | | |
3587 | 68 | void Tablet::SplitDone() { |
3588 | 68 | { |
3589 | 68 | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
3590 | 68 | if (completed_split_operation_filter_) { |
3591 | 0 | LOG_WITH_PREFIX(DFATAL) << "Already have split operation filter"; |
3592 | 0 | return; |
3593 | 0 | } |
3594 | | |
3595 | 68 | completed_split_operation_filter_ = MakeFunctorOperationFilter( |
3596 | 68 | [this](const OpId& op_id, consensus::OperationType op_type) -> Status { |
3597 | 25 | if (SplitOperation::ShouldAllowOpAfterSplitTablet(op_type)) { |
3598 | 4 | return Status::OK(); |
3599 | 4 | } |
3600 | | |
3601 | 21 | auto children = metadata_->split_child_tablet_ids(); |
3602 | 21 | return SplitOperation::RejectionStatus(OpId(), op_id, op_type, children[0], children[1]); |
3603 | 25 | }); |
3604 | 68 | operation_filters_.push_back(*completed_split_operation_filter_); |
3605 | | |
3606 | 68 | completed_split_log_anchor_ = std::make_unique<log::LogAnchor>(); |
3607 | | |
3608 | 68 | log_anchor_registry_->Register( |
3609 | 68 | metadata_->split_op_id().index, "Splitted tablet", completed_split_log_anchor_.get()); |
3610 | 68 | } |
3611 | 68 | } |
3612 | | |
3613 | 150k | void Tablet::SyncRestoringOperationFilter(ResetSplit reset_split) { |
3614 | 150k | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
3615 | | |
3616 | 150k | if (reset_split) { |
3617 | 30 | if (completed_split_log_anchor_) { |
3618 | 0 | WARN_NOT_OK(log_anchor_registry_->Unregister(completed_split_log_anchor_.get()), |
3619 | 0 | "Unregister split anchor"); |
3620 | 0 | completed_split_log_anchor_ = nullptr; |
3621 | 0 | } |
3622 | | |
3623 | 30 | if (completed_split_operation_filter_) { |
3624 | 0 | UnregisterOperationFilterUnlocked(completed_split_operation_filter_.get()); |
3625 | 0 | completed_split_operation_filter_ = nullptr; |
3626 | 0 | } |
3627 | 30 | } |
3628 | | |
3629 | 150k | if (metadata_->has_active_restoration()) { |
3630 | 30 | if (restoring_operation_filter_) { |
3631 | 0 | return; |
3632 | 0 | } |
3633 | 30 | restoring_operation_filter_ = MakeFunctorOperationFilter( |
3634 | 30 | [](const OpId& op_id, consensus::OperationType op_type) -> Status { |
3635 | 15 | if (SnapshotOperation::ShouldAllowOpDuringRestore(op_type)) { |
3636 | 15 | return Status::OK(); |
3637 | 15 | } |
3638 | | |
3639 | 0 | return SnapshotOperation::RejectionStatus(op_id, op_type); |
3640 | 15 | }); |
3641 | 30 | operation_filters_.push_back(*restoring_operation_filter_); |
3642 | 150k | } else { |
3643 | 150k | if (!restoring_operation_filter_150k ) { |
3644 | 150k | return; |
3645 | 150k | } |
3646 | | |
3647 | 18.4E | UnregisterOperationFilterUnlocked(restoring_operation_filter_.get()); |
3648 | 18.4E | restoring_operation_filter_ = nullptr; |
3649 | 18.4E | } |
3650 | 150k | } |
3651 | | |
3652 | 30 | Status Tablet::RestoreStarted(const TxnSnapshotRestorationId& restoration_id) { |
3653 | 30 | metadata_->RegisterRestoration(restoration_id); |
3654 | 30 | RETURN_NOT_OK(metadata_->Flush()); |
3655 | | |
3656 | 30 | SyncRestoringOperationFilter(ResetSplit::kTrue); |
3657 | | |
3658 | 30 | return Status::OK(); |
3659 | 30 | } |
3660 | | |
3661 | | Status Tablet::RestoreFinished( |
3662 | 30 | const TxnSnapshotRestorationId& restoration_id, HybridTime restoration_hybrid_time) { |
3663 | 30 | metadata_->UnregisterRestoration(restoration_id); |
3664 | 30 | if (restoration_hybrid_time) { |
3665 | 30 | metadata_->SetRestorationHybridTime(restoration_hybrid_time); |
3666 | 30 | if (transaction_participant_ && FLAGS_consistent_restore27 ) { |
3667 | 0 | transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time); |
3668 | 0 | } |
3669 | 30 | } |
3670 | 30 | RETURN_NOT_OK(metadata_->Flush()); |
3671 | | |
3672 | 30 | SyncRestoringOperationFilter(ResetSplit::kFalse); |
3673 | | |
3674 | 30 | return Status::OK(); |
3675 | 30 | } |
3676 | | |
3677 | 2.71k | Status Tablet::CheckRestorations(const RestorationCompleteTimeMap& restoration_complete_time) { |
3678 | 2.71k | auto restoration_hybrid_time = metadata_->CheckCompleteRestorations(restoration_complete_time); |
3679 | 2.71k | if (restoration_hybrid_time != HybridTime::kMin |
3680 | 2.71k | && transaction_participant_3 |
3681 | 2.71k | && FLAGS_consistent_restore3 ) { |
3682 | 0 | transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time); |
3683 | 0 | } |
3684 | | |
3685 | | // We cannot do it in a single shot, because should update transaction participant before |
3686 | | // removing active transactions. |
3687 | 2.71k | if (!metadata_->CleanupRestorations(restoration_complete_time)) { |
3688 | 2.71k | return Status::OK(); |
3689 | 2.71k | } |
3690 | | |
3691 | 3 | RETURN_NOT_OK(metadata_->Flush()); |
3692 | 3 | SyncRestoringOperationFilter(ResetSplit::kFalse); |
3693 | | |
3694 | 3 | return Status::OK(); |
3695 | 3 | } |
3696 | | |
3697 | 5.11M | Status Tablet::CheckOperationAllowed(const OpId& op_id, consensus::OperationType op_type) { |
3698 | 5.11M | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
3699 | 5.11M | for (const auto& filter : operation_filters_) { |
3700 | 853 | RETURN_NOT_OK(filter.CheckOperationAllowed(op_id, op_type)); |
3701 | 853 | } |
3702 | | |
3703 | 5.11M | return Status::OK(); |
3704 | 5.11M | } |
3705 | | |
3706 | 1.87k | void Tablet::RegisterOperationFilter(OperationFilter* filter) { |
3707 | 1.87k | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
3708 | 1.87k | operation_filters_.push_back(*filter); |
3709 | 1.87k | } |
3710 | | |
3711 | 1.87k | void Tablet::UnregisterOperationFilter(OperationFilter* filter) { |
3712 | 1.87k | std::lock_guard<simple_spinlock> lock(operation_filters_mutex_); |
3713 | 1.87k | UnregisterOperationFilterUnlocked(filter); |
3714 | 1.87k | } |
3715 | | |
3716 | 1.94k | void Tablet::UnregisterOperationFilterUnlocked(OperationFilter* filter) { |
3717 | 1.94k | operation_filters_.erase(operation_filters_.iterator_to(*filter)); |
3718 | 1.94k | } |
3719 | | |
3720 | 13.9M | SchemaPtr Tablet::GetSchema(const std::string& table_id) const { |
3721 | 13.9M | if (table_id.empty()) { |
3722 | 7.25M | return metadata_->schema(); |
3723 | 7.25M | } |
3724 | 6.70M | auto table_info = CHECK_RESULT(metadata_->GetTableInfo(table_id)); |
3725 | 6.70M | return SchemaPtr(table_info, table_info->schema.get()); |
3726 | 13.9M | } |
3727 | | |
3728 | 72.9k | Schema Tablet::GetKeySchema(const std::string& table_id) const { |
3729 | 72.9k | if (table_id.empty()) { |
3730 | 2 | return *key_schema_; |
3731 | 2 | } |
3732 | 72.9k | auto table_info = CHECK_RESULT(metadata_->GetTableInfo(table_id)); |
3733 | 72.9k | return table_info->schema->CreateKeyProjection(); |
3734 | 72.9k | } |
3735 | | |
3736 | | // ------------------------------------------------------------------------------------------------ |
3737 | | |
3738 | | Result<ScopedReadOperation> ScopedReadOperation::Create( |
3739 | | AbstractTablet* tablet, |
3740 | | RequireLease require_lease, |
3741 | 10.0M | ReadHybridTime read_time) { |
3742 | 10.0M | if (!read_time) { |
3743 | 6.32k | read_time = ReadHybridTime::SingleTime(VERIFY_RESULT(tablet->SafeTime(require_lease))); |
3744 | 6.32k | } |
3745 | 10.0M | auto* retention_policy = tablet->RetentionPolicy(); |
3746 | 10.0M | if (retention_policy) { |
3747 | 9.78M | RETURN_NOT_OK(retention_policy->RegisterReaderTimestamp(read_time.read)); |
3748 | 9.78M | } |
3749 | 10.0M | return ScopedReadOperation(tablet, read_time); |
3750 | 10.0M | } |
3751 | | |
3752 | | ScopedReadOperation::ScopedReadOperation( |
3753 | | AbstractTablet* tablet, const ReadHybridTime& read_time) |
3754 | 10.0M | : tablet_(tablet), read_time_(read_time) { |
3755 | 10.0M | } |
3756 | | |
3757 | 42.0M | ScopedReadOperation::~ScopedReadOperation() { |
3758 | 42.0M | Reset(); |
3759 | 42.0M | } |
3760 | | |
3761 | 9.44M | void ScopedReadOperation::operator=(ScopedReadOperation&& rhs) { |
3762 | 9.44M | Reset(); |
3763 | 9.44M | tablet_ = rhs.tablet_; |
3764 | 9.44M | read_time_ = rhs.read_time_; |
3765 | 9.44M | rhs.tablet_ = nullptr; |
3766 | 9.44M | } |
3767 | | |
3768 | 51.5M | void ScopedReadOperation::Reset() { |
3769 | 51.5M | if (tablet_) { |
3770 | 10.0M | auto* retention_policy = tablet_->RetentionPolicy(); |
3771 | 10.0M | if (retention_policy) { |
3772 | 9.72M | retention_policy->UnregisterReaderTimestamp(read_time_.read); |
3773 | 9.72M | } |
3774 | 10.0M | tablet_ = nullptr; |
3775 | 10.0M | } |
3776 | 51.5M | } |
3777 | | |
3778 | | } // namespace tablet |
3779 | | } // namespace yb |