YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/tablet.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/tablet.h"
34
35
#include <boost/container/static_vector.hpp>
36
37
#include "yb/client/client.h"
38
#include "yb/client/error.h"
39
#include "yb/client/meta_data_cache.h"
40
#include "yb/client/session.h"
41
#include "yb/client/table.h"
42
#include "yb/client/transaction.h"
43
#include "yb/client/transaction_manager.h"
44
#include "yb/client/yb_op.h"
45
46
#include "yb/common/index_column.h"
47
#include "yb/common/pgsql_error.h"
48
#include "yb/common/ql_rowblock.h"
49
#include "yb/common/row_mark.h"
50
#include "yb/common/schema.h"
51
#include "yb/common/transaction_error.h"
52
#include "yb/common/wire_protocol.h"
53
54
#include "yb/consensus/consensus.pb.h"
55
#include "yb/consensus/log_anchor_registry.h"
56
#include "yb/consensus/opid_util.h"
57
58
#include "yb/docdb/compaction_file_filter.h"
59
#include "yb/docdb/conflict_resolution.h"
60
#include "yb/docdb/consensus_frontier.h"
61
#include "yb/docdb/cql_operation.h"
62
#include "yb/docdb/doc_rowwise_iterator.h"
63
#include "yb/docdb/doc_write_batch.h"
64
#include "yb/docdb/docdb.h"
65
#include "yb/docdb/docdb_compaction_filter.h"
66
#include "yb/docdb/docdb_compaction_filter_intents.h"
67
#include "yb/docdb/docdb_debug.h"
68
#include "yb/docdb/docdb_rocksdb_util.h"
69
#include "yb/docdb/pgsql_operation.h"
70
#include "yb/docdb/ql_rocksdb_storage.h"
71
#include "yb/docdb/redis_operation.h"
72
#include "yb/docdb/rocksdb_writer.h"
73
74
#include "yb/gutil/casts.h"
75
76
#include "yb/rocksdb/db/memtable.h"
77
#include "yb/rocksdb/utilities/checkpoint.h"
78
79
#include "yb/rocksutil/yb_rocksdb.h"
80
81
#include "yb/server/hybrid_clock.h"
82
83
#include "yb/tablet/operations/change_metadata_operation.h"
84
#include "yb/tablet/operations/operation.h"
85
#include "yb/tablet/operations/snapshot_operation.h"
86
#include "yb/tablet/operations/split_operation.h"
87
#include "yb/tablet/operations/truncate_operation.h"
88
#include "yb/tablet/operations/write_operation.h"
89
#include "yb/tablet/read_result.h"
90
#include "yb/tablet/snapshot_coordinator.h"
91
#include "yb/tablet/tablet_bootstrap_if.h"
92
#include "yb/tablet/tablet_metadata.h"
93
#include "yb/tablet/tablet_metrics.h"
94
#include "yb/tablet/tablet_retention_policy.h"
95
#include "yb/tablet/tablet_snapshots.h"
96
#include "yb/tablet/transaction_coordinator.h"
97
#include "yb/tablet/transaction_participant.h"
98
#include "yb/tablet/write_query.h"
99
100
#include "yb/tserver/tserver.pb.h"
101
102
#include "yb/util/debug-util.h"
103
#include "yb/util/debug/trace_event.h"
104
#include "yb/util/flag_tags.h"
105
#include "yb/util/format.h"
106
#include "yb/util/logging.h"
107
#include "yb/util/mem_tracker.h"
108
#include "yb/util/metrics.h"
109
#include "yb/util/net/net_util.h"
110
#include "yb/util/pg_util.h"
111
#include "yb/util/scope_exit.h"
112
#include "yb/util/status_format.h"
113
#include "yb/util/status_log.h"
114
#include "yb/util/stopwatch.h"
115
#include "yb/util/trace.h"
116
#include "yb/util/yb_pg_errcodes.h"
117
118
#include "yb/yql/pgwrapper/libpq_utils.h"
119
120
DEFINE_bool(tablet_do_dup_key_checks, true,
121
            "Whether to check primary keys for duplicate on insertion. "
122
            "Use at your own risk!");
123
TAG_FLAG(tablet_do_dup_key_checks, unsafe);
124
125
DEFINE_bool(tablet_do_compaction_cleanup_for_intents, true,
126
            "Whether to clean up intents for aborted transactions in compaction.");
127
128
DEFINE_int32(tablet_bloom_block_size, 4096,
129
             "Block size of the bloom filters used for tablet keys.");
130
TAG_FLAG(tablet_bloom_block_size, advanced);
131
132
DEFINE_double(tablet_bloom_target_fp_rate, 0.01f,
133
              "Target false-positive rate (between 0 and 1) to size tablet key bloom filters. "
134
              "A lower false positive rate may reduce the number of disk seeks required "
135
              "in heavy insert workloads, at the expense of more space and RAM "
136
              "required for bloom filters.");
137
TAG_FLAG(tablet_bloom_target_fp_rate, advanced);
138
139
METRIC_DEFINE_entity(table);
140
METRIC_DEFINE_entity(tablet);
141
142
// TODO: use a lower default for truncate / snapshot restore Raft operations. The one-minute timeout
143
// is probably OK for shutdown.
144
DEFINE_int32(tablet_rocksdb_ops_quiet_down_timeout_ms, 60000,
145
             "Max amount of time we can wait for read/write operations on RocksDB to finish "
146
             "so that we can perform exclusive-ownership operations on RocksDB, such as removing "
147
             "all data in the tablet by replacing the RocksDB instance with an empty one.");
148
149
DEFINE_int32(intents_flush_max_delay_ms, 2000,
150
             "Max time to wait for regular db to flush during flush of intents. "
151
             "After this time flush of regular db will be forced.");
152
153
DEFINE_int32(num_raft_ops_to_force_idle_intents_db_to_flush, 1000,
154
             "When writes to intents RocksDB are stopped and the number of Raft operations after "
155
             "the last write to the intents RocksDB "
156
             "is greater than this value, the intents RocksDB would be requested to flush.");
157
158
DEFINE_bool(delete_intents_sst_files, true,
159
            "Delete whole intents .SST files when possible.");
160
161
DEFINE_uint64(backfill_index_write_batch_size, 128, "The batch size for backfilling the index.");
162
TAG_FLAG(backfill_index_write_batch_size, advanced);
163
TAG_FLAG(backfill_index_write_batch_size, runtime);
164
165
DEFINE_int32(backfill_index_rate_rows_per_sec, 0, "Rate of at which the "
166
             "indexed table's entries are populated into the index table during index "
167
             "backfill. This is a per-tablet flag, i.e. a tserver responsible for "
168
             "multiple tablets could be processing more than this.");
169
TAG_FLAG(backfill_index_rate_rows_per_sec, advanced);
170
TAG_FLAG(backfill_index_rate_rows_per_sec, runtime);
171
172
DEFINE_uint64(verify_index_read_batch_size, 128, "The batch size for reading the index.");
173
TAG_FLAG(verify_index_read_batch_size, advanced);
174
TAG_FLAG(verify_index_read_batch_size, runtime);
175
176
DEFINE_int32(verify_index_rate_rows_per_sec, 0,
177
    "Rate of at which the indexed table's entries are read during index consistency checks."
178
    "This is a per-tablet flag, i.e. a tserver responsible for multiple tablets could be "
179
    "processing more than this.");
180
TAG_FLAG(verify_index_rate_rows_per_sec, advanced);
181
TAG_FLAG(verify_index_rate_rows_per_sec, runtime);
182
183
DEFINE_int32(backfill_index_timeout_grace_margin_ms, -1,
184
             "The time we give the backfill process to wrap up the current set "
185
             "of writes and return successfully the RPC with the information about "
186
             "how far we have processed the rows.");
187
TAG_FLAG(backfill_index_timeout_grace_margin_ms, advanced);
188
TAG_FLAG(backfill_index_timeout_grace_margin_ms, runtime);
189
190
DEFINE_bool(yql_allow_compatible_schema_versions, true,
191
            "Allow YCQL requests to be accepted even if they originate from a client who is ahead "
192
            "of the server's schema, but is determined to be compatible with the current version.");
193
TAG_FLAG(yql_allow_compatible_schema_versions, advanced);
194
TAG_FLAG(yql_allow_compatible_schema_versions, runtime);
195
196
DEFINE_bool(disable_alter_vs_write_mutual_exclusion, false,
197
             "A safety switch to disable the changes from D8710 which makes a schema "
198
             "operation take an exclusive lock making all write operations wait for it.");
199
TAG_FLAG(disable_alter_vs_write_mutual_exclusion, advanced);
200
TAG_FLAG(disable_alter_vs_write_mutual_exclusion, runtime);
201
202
DEFINE_bool(cleanup_intents_sst_files, true,
203
            "Cleanup intents files that are no more relevant to any running transaction.");
204
205
DEFINE_int32(ysql_transaction_abort_timeout_ms, 15 * 60 * 1000,  // 15 minutes
206
             "Max amount of time we can wait for active transactions to abort on a tablet "
207
             "after DDL (eg. DROP TABLE) is executed. This deadline is same as "
208
             "unresponsive_ts_rpc_timeout_ms");
209
210
DEFINE_test_flag(int32, backfill_sabotage_frequency, 0,
211
    "If set to value greater than 0, every nth row will be corrupted in the backfill process "
212
    "to create an inconsistency between the index and the indexed tables where n is the "
213
    "input parameter given.");
214
215
DEFINE_test_flag(int32, backfill_drop_frequency, 0,
216
    "If set to value greater than 0, every nth row will be dropped in the backfill process "
217
    "to create an inconsistency between the index and the indexed tables where n is the "
218
    "input parameter given.");
219
220
DEFINE_bool(tablet_enable_ttl_file_filter, false,
221
            "Enables compaction to directly delete files that have expired based on TTL, "
222
            "rather than removing them via the normal compaction process.");
223
224
DEFINE_test_flag(int32, slowdown_backfill_by_ms, 0,
225
                 "If set > 0, slows down the backfill process by this amount.");
226
227
DEFINE_test_flag(uint64, backfill_paging_size, 0,
228
                 "If set > 0, returns early after processing this number of rows.");
229
230
DEFINE_test_flag(bool, tablet_verify_flushed_frontier_after_modifying, false,
231
                 "After modifying the flushed frontier in RocksDB, verify that the restored value "
232
                 "of it is as expected. Used for testing.");
233
234
DEFINE_test_flag(bool, docdb_log_write_batches, false,
235
                 "Dump write batches being written to RocksDB");
236
237
DEFINE_test_flag(bool, export_intentdb_metrics, false,
238
                 "Dump intentsdb statistics to prometheus metrics");
239
240
DEFINE_test_flag(bool, pause_before_post_split_compaction, false,
241
                 "Pause before triggering post split compaction.");
242
243
DEFINE_test_flag(bool, disable_adding_user_frontier_to_sst, false,
244
                 "Prevents adding the UserFrontier to SST file in order to mimic older files.");
245
246
// FLAGS_TEST_disable_getting_user_frontier_from_mem_table is used in conjunction with
247
// FLAGS_TEST_disable_adding_user_frontier_to_sst.  Two flags are needed for the case in which
248
// we're writing a mixture of SST files with and without UserFrontiers, to ensure that we're
249
// not attempting to read the UserFrontier from the MemTable in either case.
250
DEFINE_test_flag(bool, disable_getting_user_frontier_from_mem_table, false,
251
                 "Prevents checking the MemTable for a UserFrontier for test cases where we are "
252
                 "generating SST files without UserFrontiers.");
253
254
DECLARE_int32(client_read_write_timeout_ms);
255
DECLARE_bool(consistent_restore);
256
DECLARE_int32(rocksdb_level0_slowdown_writes_trigger);
257
DECLARE_int32(rocksdb_level0_stop_writes_trigger);
258
DECLARE_uint64(rocksdb_max_file_size_for_compaction);
259
DECLARE_int64(apply_intents_task_injected_delay_ms);
260
DECLARE_string(regular_tablets_data_block_key_value_encoding);
261
262
DEFINE_test_flag(uint64, inject_sleep_before_applying_intents_ms, 0,
263
                 "Sleep before applying intents to docdb after transaction commit");
264
265
using namespace std::placeholders;
266
267
using std::shared_ptr;
268
using std::make_shared;
269
using std::string;
270
using std::unordered_set;
271
using std::vector;
272
using std::unique_ptr;
273
using namespace std::literals;  // NOLINT
274
275
using rocksdb::WriteBatch;
276
using rocksdb::SequenceNumber;
277
using yb::tserver::WriteRequestPB;
278
using yb::tserver::WriteResponsePB;
279
using yb::docdb::KeyValueWriteBatchPB;
280
using yb::tserver::ReadRequestPB;
281
using yb::docdb::DocOperation;
282
using yb::docdb::RedisWriteOperation;
283
using yb::docdb::QLWriteOperation;
284
using yb::docdb::PgsqlWriteOperation;
285
using yb::docdb::DocDBCompactionFilterFactory;
286
using yb::docdb::InitMarkerBehavior;
287
288
namespace yb {
289
namespace tablet {
290
291
using consensus::MaximumOpId;
292
using log::LogAnchorRegistry;
293
using strings::Substitute;
294
using base::subtle::Barrier_AtomicIncrement;
295
296
using client::ChildTransactionData;
297
using client::TransactionManager;
298
using client::YBSession;
299
using client::YBTransaction;
300
using client::YBTablePtr;
301
302
using docdb::DocKey;
303
using docdb::DocPath;
304
using docdb::DocRowwiseIterator;
305
using docdb::DocWriteBatch;
306
using docdb::SubDocKey;
307
using docdb::PrimitiveValue;
308
using docdb::StorageDbType;
309
310
////////////////////////////////////////////////////////////
311
// Tablet
312
////////////////////////////////////////////////////////////
313
314
namespace {
315
316
std::string MakeTabletLogPrefix(
317
2.25M
    const TabletId& tablet_id, const std::string& log_prefix_suffix) {
318
2.25M
  return Format("T $0$1: ", tablet_id, log_prefix_suffix);
319
2.25M
}
320
321
docdb::ConsensusFrontiers* InitFrontiers(
322
    const OpId op_id,
323
    const HybridTime log_ht,
324
6.46M
    docdb::ConsensusFrontiers* frontiers) {
325
6.46M
  if (FLAGS_TEST_disable_adding_user_frontier_to_sst) {
326
0
    return nullptr;
327
0
  }
328
6.46M
  set_op_id(op_id, frontiers);
329
6.46M
  set_hybrid_time(log_ht, frontiers);
330
6.46M
  return frontiers;
331
6.46M
}
332
333
template <class Data>
334
1.70M
docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) {
335
1.70M
  return InitFrontiers(data.op_id, data.log_ht, frontiers);
336
1.70M
}
tablet.cc:_ZN2yb6tablet12_GLOBAL__N_113InitFrontiersINS0_20TransactionApplyDataEEEPN7rocksdb17UserFrontiersBaseINS_5docdb17ConsensusFrontierEEERKT_S9_
Line
Count
Source
334
737k
docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) {
335
737k
  return InitFrontiers(data.op_id, data.log_ht, frontiers);
336
737k
}
tablet.cc:_ZN2yb6tablet12_GLOBAL__N_113InitFrontiersINS0_17RemoveIntentsDataEEEPN7rocksdb17UserFrontiersBaseINS_5docdb17ConsensusFrontierEEERKT_S9_
Line
Count
Source
334
968k
docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) {
335
968k
  return InitFrontiers(data.op_id, data.log_ht, frontiers);
336
968k
}
337
338
rocksdb::UserFrontierPtr MemTableFrontierFromDb(
339
    rocksdb::DB* db,
340
1.72M
    rocksdb::UpdateUserValueType type) {
341
1.72M
  if (FLAGS_TEST_disable_getting_user_frontier_from_mem_table) {
342
0
    return nullptr;
343
0
  }
344
1.72M
  return db->GetMutableMemTableFrontier(type);
345
1.72M
}
346
347
} // namespace
348
349
class Tablet::RegularRocksDbListener : public rocksdb::EventListener {
350
 public:
351
  RegularRocksDbListener(Tablet* tablet, const std::string& log_prefix)
352
      : tablet_(*CHECK_NOTNULL(tablet)),
353
220k
        log_prefix_(log_prefix) {}
354
355
433
  void OnCompactionCompleted(rocksdb::DB* db, const rocksdb::CompactionJobInfo& ci) override {
356
433
    if (ci.is_full_compaction) {
357
230
      auto& metadata = *CHECK_NOTNULL(tablet_.metadata());
358
230
      if (!metadata.has_been_fully_compacted()) {
359
104
        metadata.set_has_been_fully_compacted(true);
360
104
        ERROR_NOT_OK(metadata.Flush(), log_prefix_);
361
104
      }
362
230
    }
363
433
  }
364
365
 private:
366
  Tablet& tablet_;
367
  const std::string log_prefix_;
368
};
369
370
Tablet::Tablet(const TabletInitData& data)
371
    : key_schema_(std::make_unique<Schema>(data.metadata->schema()->CreateKeyProjection())),
372
      metadata_(data.metadata),
373
      table_type_(data.metadata->table_type()),
374
      log_anchor_registry_(data.log_anchor_registry),
375
      mem_tracker_(MemTracker::CreateTracker(
376
          Format("tablet-$0", tablet_id()), data.parent_mem_tracker, AddToParent::kTrue,
377
          CreateMetrics::kFalse)),
378
      block_based_table_mem_tracker_(data.block_based_table_mem_tracker),
379
      clock_(data.clock),
380
      mvcc_(
381
          MakeTabletLogPrefix(data.metadata->raft_group_id(), data.log_prefix_suffix), data.clock),
382
      tablet_options_(data.tablet_options),
383
      pending_non_abortable_op_counter_("RocksDB non-abortable read/write operations"),
384
      pending_abortable_op_counter_("RocksDB abortable read/write operations"),
385
      write_ops_being_submitted_counter_("Tablet schema"),
386
      client_future_(data.client_future),
387
      local_tablet_filter_(data.local_tablet_filter),
388
      log_prefix_suffix_(data.log_prefix_suffix),
389
      is_sys_catalog_(data.is_sys_catalog),
390
      txns_enabled_(data.txns_enabled),
391
      retention_policy_(std::make_shared<TabletRetentionPolicy>(
392
89.2k
          clock_, data.allowed_history_cutoff_provider, metadata_.get())) {
393
89.2k
  CHECK(schema()->has_column_ids());
394
89.2k
  LOG_WITH_PREFIX(INFO) << "Schema version for " << metadata_->table_name() << " is "
395
89.2k
                        << metadata_->schema_version();
396
397
89.2k
  if (data.metric_registry) {
398
88.8k
    MetricEntity::AttributeMap attrs;
399
    // TODO(KUDU-745): table_id is apparently not set in the metadata.
400
88.8k
    attrs["table_id"] = metadata_->table_id();
401
88.8k
    attrs["table_name"] = metadata_->table_name();
402
88.8k
    attrs["namespace_name"] = metadata_->namespace_name();
403
88.8k
    table_metrics_entity_ =
404
88.8k
        METRIC_ENTITY_table.Instantiate(data.metric_registry, metadata_->table_id(), attrs);
405
88.8k
    tablet_metrics_entity_ =
406
88.8k
        METRIC_ENTITY_tablet.Instantiate(data.metric_registry, tablet_id(), attrs);
407
    // If we are creating a KV table create the metrics callback.
408
88.8k
    regulardb_statistics_ =
409
88.8k
        rocksdb::CreateDBStatistics(table_metrics_entity_, tablet_metrics_entity_);
410
88.8k
    intentsdb_statistics_ =
411
88.8k
        (GetAtomicFlag(&FLAGS_TEST_export_intentdb_metrics)
412
199
             ? rocksdb::CreateDBStatistics(table_metrics_entity_, tablet_metrics_entity_, true)
413
88.6k
             : rocksdb::CreateDBStatistics(table_metrics_entity_, nullptr, true));
414
415
88.8k
    metrics_.reset(new TabletMetrics(table_metrics_entity_, tablet_metrics_entity_));
416
417
88.8k
    mem_tracker_->SetMetricEntity(tablet_metrics_entity_);
418
88.8k
  }
419
420
89.2k
  auto table_info = metadata_->primary_table_info();
421
89.2k
  bool has_index = !table_info->index_map->empty();
422
89.2k
  bool transactional = data.metadata->schema()->table_properties().is_transactional();
423
89.2k
  if (transactional) {
424
20.4k
    server::HybridClock::EnableClockSkewControl();
425
20.4k
  }
426
89.2k
  if (txns_enabled_ &&
427
89.0k
      data.transaction_participant_context &&
428
88.6k
      (is_sys_catalog_ || transactional)) {
429
25.7k
    transaction_participant_ = std::make_unique<TransactionParticipant>(
430
25.7k
        data.transaction_participant_context, this, tablet_metrics_entity_);
431
    // Create transaction manager for secondary index update.
432
25.7k
    if (has_index) {
433
0
      transaction_manager_ = std::make_unique<client::TransactionManager>(
434
0
          client_future_.get(), scoped_refptr<server::Clock>(clock_), local_tablet_filter_);
435
0
    }
436
25.7k
  }
437
438
  // Create index table metadata cache for secondary index update.
439
89.2k
  if (has_index) {
440
0
    CreateNewYBMetaDataCache();
441
0
  }
442
443
  // If this is a unique index tablet, set up the index primary key schema.
444
89.2k
  if (table_info->index_info && table_info->index_info->is_unique()) {
445
1.92k
    unique_index_key_schema_ = std::make_unique<Schema>();
446
1.92k
    const auto ids = table_info->index_info->index_key_column_ids();
447
1.92k
    CHECK_OK(table_info->schema->CreateProjectionByIdsIgnoreMissing(
448
1.92k
        ids, unique_index_key_schema_.get()));
449
1.92k
  }
450
451
89.2k
  if (data.transaction_coordinator_context &&
452
88.7k
      table_info->table_type == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
453
29.1k
    transaction_coordinator_ = std::make_unique<TransactionCoordinator>(
454
29.1k
        metadata_->fs_manager()->uuid(),
455
29.1k
        data.transaction_coordinator_context,
456
29.1k
        metrics_->expired_transactions.get());
457
29.1k
  }
458
459
89.2k
  snapshots_ = std::make_unique<TabletSnapshots>(this);
460
461
89.2k
  snapshot_coordinator_ = data.snapshot_coordinator;
462
463
89.2k
  if (metadata_->tablet_data_state() == TabletDataState::TABLET_DATA_SPLIT_COMPLETED) {
464
1
    SplitDone();
465
1
  }
466
89.2k
  auto restoration_hybrid_time = metadata_->restoration_hybrid_time();
467
89.2k
  if (restoration_hybrid_time && transaction_participant_ && FLAGS_consistent_restore) {
468
0
    transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time);
469
0
  }
470
89.2k
  SyncRestoringOperationFilter(ResetSplit::kFalse);
471
89.2k
}
472
473
48.1k
Tablet::~Tablet() {
474
48.1k
  if (StartShutdown()) {
475
481
    CompleteShutdown();
476
47.6k
  } else {
477
47.6k
    auto state = state_;
478
18.4E
    LOG_IF_WITH_PREFIX(DFATAL, state != kShutdown)
479
18.4E
        << "Destroying Tablet that did not complete shutdown: " << state;
480
47.6k
  }
481
48.1k
  if (block_based_table_mem_tracker_) {
482
47.6k
    block_based_table_mem_tracker_->UnregisterFromParent();
483
47.6k
  }
484
48.1k
  mem_tracker_->UnregisterFromParent();
485
48.1k
}
486
487
89.2k
Status Tablet::Open() {
488
89.2k
  TRACE_EVENT0("tablet", "Tablet::Open");
489
89.2k
  std::lock_guard<rw_spinlock> lock(component_lock_);
490
0
  CHECK_EQ(state_, kInitialized) << "already open";
491
89.2k
  CHECK(schema()->has_column_ids());
492
493
89.2k
  switch (table_type_) {
494
10.5k
    case TableType::PGSQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
495
56.7k
    case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
496
60.0k
    case TableType::REDIS_TABLE_TYPE:
497
60.0k
      RETURN_NOT_OK(OpenKeyValueTablet());
498
60.0k
      state_ = kBootstrapping;
499
60.0k
      return Status::OK();
500
29.1k
    case TableType::TRANSACTION_STATUS_TABLE_TYPE:
501
29.1k
      state_ = kBootstrapping;
502
29.1k
      return Status::OK();
503
0
  }
504
0
  FATAL_INVALID_ENUM_VALUE(TableType, table_type_);
505
506
0
  return Status::OK();
507
0
}
508
509
220k
Status Tablet::CreateTabletDirectories(const string& db_dir, FsManager* fs) {
510
220k
  LOG_WITH_PREFIX(INFO) << "Creating RocksDB database in dir " << db_dir;
511
512
  // Create the directory table-uuid first.
513
220k
  RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(DirName(db_dir)),
514
220k
                        Format("Failed to create RocksDB table directory $0", DirName(db_dir)));
515
516
220k
  RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(db_dir),
517
220k
                        Format("Failed to create RocksDB tablet directory $0", db_dir));
518
519
220k
  RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(db_dir + kIntentsDBSuffix),
520
220k
                        Format("Failed to create RocksDB tablet intents directory $0", db_dir));
521
522
220k
  RETURN_NOT_OK(snapshots_->CreateDirectories(db_dir, fs));
523
524
220k
  return Status::OK();
525
220k
}
526
527
46.7k
void Tablet::ResetYBMetaDataCache() {
528
46.7k
  std::atomic_store_explicit(&metadata_cache_, {}, std::memory_order_release);
529
46.7k
}
530
531
39.7k
void Tablet::CreateNewYBMetaDataCache() {
532
39.7k
  std::atomic_store_explicit(&metadata_cache_,
533
39.7k
      std::make_shared<client::YBMetaDataCache>(client_future_.get(),
534
39.7k
                                                false /* Update permissions cache */),
535
39.7k
      std::memory_order_release);
536
39.7k
}
537
538
43.4k
std::shared_ptr<client::YBMetaDataCache> Tablet::YBMetaDataCache() {
539
43.4k
  return std::atomic_load_explicit(&metadata_cache_, std::memory_order_acquire);
540
43.4k
}
541
542
template <class F>
543
328k
auto MakeMemTableFlushFilterFactory(const F& f) {
544
  // Trick to get type of mem_table_flush_filter_factory field.
545
328k
  typedef typename decltype(
546
328k
      static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type
547
328k
      MemTableFlushFilterFactoryType;
548
328k
  return std::make_shared<MemTableFlushFilterFactoryType>(f);
549
328k
}
tablet.cc:_ZN2yb6tablet30MakeMemTableFlushFilterFactoryIZNS0_6Tablet18OpenKeyValueTabletEvE3$_0EEDaRKT_
Line
Count
Source
543
220k
auto MakeMemTableFlushFilterFactory(const F& f) {
544
  // Trick to get type of mem_table_flush_filter_factory field.
545
220k
  typedef typename decltype(
546
220k
      static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type
547
220k
      MemTableFlushFilterFactoryType;
548
220k
  return std::make_shared<MemTableFlushFilterFactoryType>(f);
549
220k
}
tablet.cc:_ZN2yb6tablet30MakeMemTableFlushFilterFactoryIZNS0_6Tablet18OpenKeyValueTabletEvE3$_2EEDaRKT_
Line
Count
Source
543
107k
auto MakeMemTableFlushFilterFactory(const F& f) {
544
  // Trick to get type of mem_table_flush_filter_factory field.
545
107k
  typedef typename decltype(
546
107k
      static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type
547
107k
      MemTableFlushFilterFactoryType;
548
107k
  return std::make_shared<MemTableFlushFilterFactoryType>(f);
549
107k
}
550
551
template <class F>
552
220k
auto MakeMaxFileSizeWithTableTTLFunction(const F& f) {
553
  // Trick to get type of max_file_size_for_compaction field.
554
220k
  typedef typename decltype(
555
220k
      static_cast<rocksdb::Options*>(nullptr)->max_file_size_for_compaction)::element_type
556
220k
      MaxFileSizeWithTableTTLFunction;
557
220k
  return std::make_shared<MaxFileSizeWithTableTTLFunction>(f);
558
220k
}
559
560
6.93k
Result<bool> Tablet::IntentsDbFlushFilter(const rocksdb::MemTable& memtable) {
561
0
  VLOG_WITH_PREFIX(4) << __func__;
562
563
6.93k
  auto frontiers = memtable.Frontiers();
564
6.93k
  if (frontiers) {
565
6.93k
    const auto& intents_largest =
566
6.93k
        down_cast<const docdb::ConsensusFrontier&>(frontiers->Largest());
567
568
    // We allow to flush intents DB only after regular DB.
569
    // Otherwise we could lose applied intents when corresponding regular records were not
570
    // flushed.
571
6.93k
    auto regular_flushed_frontier = regular_db_->GetFlushedFrontier();
572
6.93k
    if (regular_flushed_frontier) {
573
2.00k
      const auto& regular_flushed_largest =
574
2.00k
          static_cast<const docdb::ConsensusFrontier&>(*regular_flushed_frontier);
575
2.00k
      if (regular_flushed_largest.op_id().index >= intents_largest.op_id().index) {
576
0
        VLOG_WITH_PREFIX(4) << __func__ << ", regular already flushed";
577
82
        return true;
578
82
      }
579
0
    }
580
0
  } else {
581
0
    VLOG_WITH_PREFIX(4) << __func__ << ", no frontiers";
582
0
  }
583
584
  // If regular db does not have anything to flush, it means that we have just added intents,
585
  // without apply, so it is OK to flush the intents RocksDB.
586
6.85k
  auto flush_intention = regular_db_->GetFlushAbility();
587
6.85k
  if (flush_intention == rocksdb::FlushAbility::kNoNewData) {
588
0
    VLOG_WITH_PREFIX(4) << __func__ << ", no new data";
589
34
    return true;
590
34
  }
591
592
  // Force flush of regular DB if we were not able to flush for too long.
593
6.81k
  auto timeout = std::chrono::milliseconds(FLAGS_intents_flush_max_delay_ms);
594
6.81k
  if (flush_intention != rocksdb::FlushAbility::kAlreadyFlushing &&
595
1.61k
      (shutdown_requested_.load(std::memory_order_acquire) ||
596
1.61k
       std::chrono::steady_clock::now() > memtable.FlushStartTime() + timeout)) {
597
0
    VLOG_WITH_PREFIX(2) << __func__ << ", force flush";
598
599
20
    rocksdb::FlushOptions options;
600
20
    options.wait = false;
601
20
    RETURN_NOT_OK(regular_db_->Flush(options));
602
20
  }
603
604
6.81k
  return false;
605
6.81k
}
606
607
1.84M
std::string Tablet::LogPrefix() const {
608
1.84M
  return MakeTabletLogPrefix(tablet_id(), log_prefix_suffix_);
609
1.84M
}
610
611
namespace {
612
613
328k
std::string LogDbTypePrefix(docdb::StorageDbType db_type) {
614
328k
  switch (db_type) {
615
220k
    case docdb::StorageDbType::kRegular:
616
220k
      return "R";
617
108k
    case docdb::StorageDbType::kIntents:
618
108k
      return "I";
619
0
  }
620
0
  FATAL_INVALID_ENUM_VALUE(docdb::StorageDbType, db_type);
621
0
}
622
623
std::string MakeTabletLogPrefix(
624
328k
    const TabletId& tablet_id, const std::string& log_prefix_suffix, docdb::StorageDbType db_type) {
625
328k
  return MakeTabletLogPrefix(
626
328k
      tablet_id, Format("$0 [$1]", log_prefix_suffix, LogDbTypePrefix(db_type)));
627
328k
}
628
629
} // namespace
630
631
328k
std::string Tablet::LogPrefix(docdb::StorageDbType db_type) const {
632
328k
  return MakeTabletLogPrefix(tablet_id(), log_prefix_suffix_, db_type);
633
328k
}
634
635
220k
Status Tablet::OpenKeyValueTablet() {
636
220k
  static const std::string kRegularDB = "RegularDB"s;
637
220k
  static const std::string kIntentsDB = "IntentsDB"s;
638
639
220k
  rocksdb::BlockBasedTableOptions table_options;
640
220k
  if (!metadata()->primary_table_info()->index_info || metadata()->colocated()) {
641
    // This tablet is not dedicated to the index table, so it should be effective to use
642
    // advanced key-value encoding algorithm optimized for docdb keys structure.
643
130k
    table_options.use_delta_encoding = true;
644
130k
    table_options.data_block_key_value_encoding_format =
645
130k
        VERIFY_RESULT(docdb::GetConfiguredKeyValueEncodingFormat(
646
130k
            FLAGS_regular_tablets_data_block_key_value_encoding));
647
130k
  }
648
220k
  rocksdb::Options rocksdb_options;
649
220k
  InitRocksDBOptions(
650
220k
      &rocksdb_options, LogPrefix(docdb::StorageDbType::kRegular), std::move(table_options));
651
220k
  rocksdb_options.mem_tracker = MemTracker::FindOrCreateTracker(kRegularDB, mem_tracker_);
652
220k
  rocksdb_options.block_based_table_mem_tracker =
653
220k
      MemTracker::FindOrCreateTracker(
654
220k
          Format("$0-$1", kRegularDB, tablet_id()), block_based_table_mem_tracker_,
655
220k
          AddToParent::kTrue, CreateMetrics::kFalse);
656
  // We may not have a metrics_entity_ instantiated in tests.
657
220k
  if (tablet_metrics_entity_) {
658
220k
    rocksdb_options.block_based_table_mem_tracker->SetMetricEntity(
659
220k
        tablet_metrics_entity_, Format("$0_$1", "BlockBasedTable", kRegularDB));
660
220k
  }
661
662
220k
  key_bounds_ = docdb::KeyBounds(metadata()->lower_bound_key(), metadata()->upper_bound_key());
663
664
  // Install the history cleanup handler. Note that TabletRetentionPolicy is going to hold a raw ptr
665
  // to this tablet. So, we ensure that rocksdb_ is reset before this tablet gets destroyed.
666
220k
  rocksdb_options.compaction_filter_factory = make_shared<DocDBCompactionFilterFactory>(
667
220k
      retention_policy_, &key_bounds_);
668
669
2.65k
  rocksdb_options.mem_table_flush_filter_factory = MakeMemTableFlushFilterFactory([this] {
670
2.65k
    if (mem_table_flush_filter_factory_) {
671
2.09k
      return mem_table_flush_filter_factory_();
672
2.09k
    }
673
563
    return rocksdb::MemTableFilter();
674
563
  });
675
220k
  if (FLAGS_tablet_enable_ttl_file_filter) {
676
0
    rocksdb_options.compaction_file_filter_factory =
677
0
        std::make_shared<docdb::DocDBCompactionFileFilterFactory>(retention_policy_, clock());
678
0
  }
679
680
  // Use a function that checks the table TTL before returning a value for max file size
681
  // for compactions.
682
904k
  rocksdb_options.max_file_size_for_compaction = MakeMaxFileSizeWithTableTTLFunction([this] {
683
904k
    if (FLAGS_rocksdb_max_file_size_for_compaction > 0 &&
684
0
        retention_policy_->GetRetentionDirective().table_ttl != docdb::Value::kMaxTtl) {
685
0
      return FLAGS_rocksdb_max_file_size_for_compaction;
686
0
    }
687
904k
    return std::numeric_limits<uint64_t>::max();
688
904k
  });
689
690
220k
  rocksdb_options.disable_auto_compactions = true;
691
220k
  rocksdb_options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
692
220k
  rocksdb_options.level0_stop_writes_trigger = std::numeric_limits<int>::max();
693
694
220k
  rocksdb::Options regular_rocksdb_options(rocksdb_options);
695
220k
  regular_rocksdb_options.listeners.push_back(
696
220k
      std::make_shared<RegularRocksDbListener>(this, regular_rocksdb_options.log_prefix));
697
698
220k
  const string db_dir = metadata()->rocksdb_dir();
699
220k
  RETURN_NOT_OK(CreateTabletDirectories(db_dir, metadata()->fs_manager()));
700
701
220k
  LOG(INFO) << "Opening RocksDB at: " << db_dir;
702
220k
  rocksdb::DB* db = nullptr;
703
220k
  rocksdb::Status rocksdb_open_status = rocksdb::DB::Open(regular_rocksdb_options, db_dir, &db);
704
220k
  if (!rocksdb_open_status.ok()) {
705
0
    LOG_WITH_PREFIX(ERROR) << "Failed to open a RocksDB database in directory " << db_dir << ": "
706
0
                           << rocksdb_open_status;
707
0
    if (db != nullptr) {
708
0
      delete db;
709
0
    }
710
0
    return STATUS(IllegalState, rocksdb_open_status.ToString());
711
0
  }
712
220k
  regular_db_.reset(db);
713
220k
  regular_db_->ListenFilesChanged(std::bind(&Tablet::RegularDbFilesChanged, this));
714
715
220k
  if (transaction_participant_) {
716
107k
    LOG_WITH_PREFIX(INFO) << "Opening intents DB at: " << db_dir + kIntentsDBSuffix;
717
107k
    rocksdb::Options intents_rocksdb_options(rocksdb_options);
718
107k
    docdb::SetLogPrefix(&intents_rocksdb_options, LogPrefix(docdb::StorageDbType::kIntents));
719
720
6.93k
    intents_rocksdb_options.mem_table_flush_filter_factory = MakeMemTableFlushFilterFactory([this] {
721
6.93k
      return std::bind(&Tablet::IntentsDbFlushFilter, this, _1);
722
6.93k
    });
723
724
107k
    intents_rocksdb_options.compaction_filter_factory =
725
107k
        FLAGS_tablet_do_compaction_cleanup_for_intents ?
726
18.4E
        std::make_shared<docdb::DocDBIntentsCompactionFilterFactory>(this, &key_bounds_) : nullptr;
727
728
107k
    intents_rocksdb_options.mem_tracker = MemTracker::FindOrCreateTracker(kIntentsDB, mem_tracker_);
729
107k
    intents_rocksdb_options.block_based_table_mem_tracker =
730
107k
        MemTracker::FindOrCreateTracker(
731
107k
            Format("$0-$1", kIntentsDB, tablet_id()), block_based_table_mem_tracker_,
732
107k
            AddToParent::kTrue, CreateMetrics::kFalse);
733
    // We may not have a metrics_entity_ instantiated in tests.
734
107k
    if (tablet_metrics_entity_) {
735
107k
      intents_rocksdb_options.block_based_table_mem_tracker->SetMetricEntity(
736
107k
          tablet_metrics_entity_, Format("$0_$1", "BlockBasedTable", kIntentsDB));
737
107k
    }
738
107k
    intents_rocksdb_options.statistics = intentsdb_statistics_;
739
740
107k
    rocksdb::DB* intents_db = nullptr;
741
107k
    RETURN_NOT_OK(
742
107k
        rocksdb::DB::Open(intents_rocksdb_options, db_dir + kIntentsDBSuffix, &intents_db));
743
107k
    intents_db_.reset(intents_db);
744
107k
    intents_db_->ListenFilesChanged(std::bind(&Tablet::CleanupIntentFiles, this));
745
107k
  }
746
747
220k
  ql_storage_.reset(new docdb::QLRocksDBStorage(doc_db()));
748
220k
  if (transaction_participant_) {
749
107k
    transaction_participant_->SetDB(doc_db(), &key_bounds_, &pending_non_abortable_op_counter_);
750
107k
  }
751
752
  // Don't allow reads at timestamps lower than the highest history cutoff of a past compaction.
753
220k
  auto regular_flushed_frontier = regular_db_->GetFlushedFrontier();
754
220k
  if (regular_flushed_frontier) {
755
1.55k
    retention_policy_->UpdateCommittedHistoryCutoff(
756
1.55k
        static_cast<const docdb::ConsensusFrontier&>(*regular_flushed_frontier).history_cutoff());
757
1.55k
  }
758
759
220k
  LOG_WITH_PREFIX(INFO) << "Successfully opened a RocksDB database at " << db_dir
760
220k
                        << ", obj: " << db;
761
762
220k
  return Status::OK();
763
220k
}
764
765
3.08k
void Tablet::RegularDbFilesChanged() {
766
3.08k
  std::lock_guard<std::mutex> lock(num_sst_files_changed_listener_mutex_);
767
3.08k
  if (num_sst_files_changed_listener_) {
768
2.40k
    num_sst_files_changed_listener_();
769
2.40k
  }
770
3.08k
}
771
772
88.7k
void Tablet::SetCleanupPool(ThreadPool* thread_pool) {
773
88.7k
  if (!transaction_participant_) {
774
62.8k
    return;
775
62.8k
  }
776
777
25.8k
  cleanup_intent_files_token_ = thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL);
778
779
25.8k
  CleanupIntentFiles();
780
25.8k
}
781
782
32.8k
void Tablet::CleanupIntentFiles() {
783
32.8k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
784
32.8k
  if (!scoped_read_operation.ok() || state_ != State::kOpen || !FLAGS_delete_intents_sst_files ||
785
32.8k
      !cleanup_intent_files_token_) {
786
0
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Skip";
787
0
    return;
788
0
  }
789
790
32.8k
  WARN_NOT_OK(
791
32.8k
      cleanup_intent_files_token_->SubmitFunc(std::bind(&Tablet::DoCleanupIntentFiles, this)),
792
32.8k
      "Submit cleanup intent files failed");
793
32.8k
}
794
795
32.8k
void Tablet::DoCleanupIntentFiles() {
796
32.8k
  if (metadata_->is_under_twodc_replication()) {
797
0
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Exit because of TwoDC replication";
798
0
    return;
799
0
  }
800
32.8k
  HybridTime best_file_max_ht = HybridTime::kMax;
801
32.8k
  std::vector<rocksdb::LiveFileMetaData> files;
802
  // Stops when there are no more files to delete.
803
32.8k
  std::string previous_name;
804
32.9k
  while (GetAtomicFlag(&FLAGS_cleanup_intents_sst_files)) {
805
32.9k
    auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
806
32.9k
    if (!scoped_read_operation.ok()) {
807
0
      VLOG_WITH_PREFIX_AND_FUNC(4) << "Failed to acquire scoped read operation";
808
0
      break;
809
0
    }
810
811
32.9k
    best_file_max_ht = HybridTime::kMax;
812
32.9k
    const rocksdb::LiveFileMetaData* best_file = nullptr;
813
32.9k
    files.clear();
814
32.9k
    intents_db_->GetLiveFilesMetaData(&files);
815
32.9k
    auto min_largest_seq_no = std::numeric_limits<rocksdb::SequenceNumber>::max();
816
817
5
    VLOG_WITH_PREFIX_AND_FUNC(5) << "Files: " << AsString(files);
818
819
124
    for (const auto& file : files) {
820
124
      if (file.largest.seqno < min_largest_seq_no) {
821
124
        min_largest_seq_no = file.largest.seqno;
822
124
        if (file.largest.user_frontier) {
823
124
          auto& frontier = down_cast<docdb::ConsensusFrontier&>(*file.largest.user_frontier);
824
124
          best_file_max_ht = frontier.hybrid_time();
825
0
        } else {
826
0
          best_file_max_ht = HybridTime::kMax;
827
0
        }
828
124
        best_file = &file;
829
124
      }
830
124
    }
831
832
32.9k
    auto min_running_start_ht = transaction_participant_->MinRunningHybridTime();
833
32.9k
    if (!min_running_start_ht.is_valid() || min_running_start_ht <= best_file_max_ht) {
834
18.4E
      VLOG_WITH_PREFIX_AND_FUNC(4)
835
18.4E
          << "Cannot delete because of running transactions: " << min_running_start_ht
836
18.4E
          << ", best file max ht: " << best_file_max_ht;
837
32.8k
      break;
838
32.8k
    }
839
103
    if (best_file->name == previous_name) {
840
0
      LOG_WITH_PREFIX_AND_FUNC(INFO)
841
0
          << "Attempt to delete same file: " << previous_name << ", stopping cleanup";
842
0
      break;
843
0
    }
844
103
    previous_name = best_file->name;
845
846
103
    LOG_WITH_PREFIX_AND_FUNC(INFO)
847
103
        << "Intents SST file will be deleted: " << best_file->ToString()
848
103
        << ", max ht: " << best_file_max_ht << ", min running transaction start ht: "
849
103
        << min_running_start_ht;
850
103
    auto flush_status = regular_db_->Flush(rocksdb::FlushOptions());
851
103
    if (!flush_status.ok()) {
852
0
      LOG_WITH_PREFIX_AND_FUNC(WARNING) << "Failed to flush regular db: " << flush_status;
853
0
      break;
854
0
    }
855
103
    auto delete_status = intents_db_->DeleteFile(best_file->name);
856
103
    if (!delete_status.ok()) {
857
0
      LOG_WITH_PREFIX_AND_FUNC(WARNING)
858
0
          << "Failed to delete " << best_file->ToString() << ", all files " << AsString(files)
859
0
          << ": " << delete_status;
860
0
      break;
861
0
    }
862
103
  }
863
864
32.8k
  if (best_file_max_ht != HybridTime::kMax) {
865
0
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Wait min running hybrid time: " << best_file_max_ht;
866
31
    transaction_participant_->WaitMinRunningHybridTime(best_file_max_ht);
867
31
  }
868
32.8k
}
869
870
89.6k
Status Tablet::EnableCompactions(ScopedRWOperationPause* non_abortable_ops_pause) {
871
89.6k
  if (!non_abortable_ops_pause) {
872
88.8k
    auto operation = CreateNonAbortableScopedRWOperation();
873
88.8k
    RETURN_NOT_OK(operation);
874
88.8k
    return DoEnableCompactions();
875
862
  }
876
877
862
  return DoEnableCompactions();
878
862
}
879
880
249k
Status Tablet::DoEnableCompactions() {
881
249k
  Status regular_db_status;
882
249k
  std::unordered_map<std::string, std::string> new_options = {
883
249k
      { "level0_slowdown_writes_trigger"s,
884
249k
        std::to_string(max_if_negative(FLAGS_rocksdb_level0_slowdown_writes_trigger))},
885
249k
      { "level0_stop_writes_trigger"s,
886
249k
        std::to_string(max_if_negative(FLAGS_rocksdb_level0_stop_writes_trigger))},
887
249k
  };
888
249k
  if (regular_db_) {
889
220k
    WARN_WITH_PREFIX_NOT_OK(
890
220k
        regular_db_->SetOptions(new_options, /* dump_options= */ false),
891
220k
        "Failed to set options on regular DB");
892
220k
    regular_db_status =
893
220k
        regular_db_->EnableAutoCompaction({regular_db_->DefaultColumnFamily()});
894
220k
    if (!regular_db_status.ok()) {
895
0
      LOG_WITH_PREFIX(WARNING) << "Failed to enable compactions on regular DB: "
896
0
                               << regular_db_status;
897
0
    }
898
220k
  }
899
249k
  if (intents_db_) {
900
107k
    WARN_WITH_PREFIX_NOT_OK(
901
107k
        intents_db_->SetOptions(new_options, /* dump_options= */ false),
902
107k
        "Failed to set options on provisional records DB");
903
107k
    Status intents_db_status =
904
107k
        intents_db_->EnableAutoCompaction({intents_db_->DefaultColumnFamily()});
905
107k
    if (!intents_db_status.ok()) {
906
0
      LOG_WITH_PREFIX(WARNING)
907
0
          << "Failed to enable compactions on provisional records DB: " << intents_db_status;
908
0
      return intents_db_status;
909
0
    }
910
249k
  }
911
249k
  return regular_db_status;
912
249k
}
913
914
89.0k
void Tablet::MarkFinishedBootstrapping() {
915
89.0k
  CHECK_EQ(state_, kBootstrapping);
916
89.0k
  state_ = kOpen;
917
89.0k
}
918
919
144k
bool Tablet::StartShutdown(const IsDropTable is_drop_table) {
920
144k
  LOG_WITH_PREFIX(INFO) << __func__;
921
922
144k
  bool expected = false;
923
144k
  if (!shutdown_requested_.compare_exchange_strong(expected, true)) {
924
95.9k
    return false;
925
95.9k
  }
926
927
48.2k
  if (transaction_participant_) {
928
19.3k
    transaction_participant_->StartShutdown();
929
19.3k
  }
930
931
48.2k
  return true;
932
48.2k
}
933
934
48.2k
void Tablet::CompleteShutdown(IsDropTable is_drop_table) {
935
48.2k
  LOG_WITH_PREFIX(INFO) << __func__ << "(" << is_drop_table << ")";
936
937
48.2k
  StartShutdown();
938
939
48.2k
  auto op_pauses = StartShutdownRocksDBs(DisableFlushOnShutdown(is_drop_table), Stop::kTrue);
940
48.2k
  if (!op_pauses.ok()) {
941
0
    LOG_WITH_PREFIX(DFATAL) << "Failed to shut down: " << op_pauses.status();
942
0
    return;
943
0
  }
944
945
48.2k
  cleanup_intent_files_token_.reset();
946
947
48.2k
  if (transaction_coordinator_) {
948
128
    transaction_coordinator_->Shutdown();
949
128
  }
950
951
48.2k
  if (transaction_participant_) {
952
19.3k
    transaction_participant_->CompleteShutdown();
953
19.3k
  }
954
955
48.2k
  {
956
48.2k
    std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
957
958
48.2k
    if (completed_split_log_anchor_) {
959
21
      WARN_NOT_OK(log_anchor_registry_->Unregister(completed_split_log_anchor_.get()),
960
21
                  "Unregister split anchor");
961
21
    }
962
963
48.2k
    if (completed_split_operation_filter_) {
964
21
      UnregisterOperationFilterUnlocked(completed_split_operation_filter_.get());
965
21
    }
966
967
48.2k
    if (restoring_operation_filter_) {
968
0
      UnregisterOperationFilterUnlocked(restoring_operation_filter_.get());
969
0
    }
970
48.2k
  }
971
972
48.2k
  std::lock_guard<rw_spinlock> lock(component_lock_);
973
974
  // Shutdown the RocksDB instance for this tablet, if present.
975
  // Destroy intents and regular DBs in reverse order to their creation.
976
  // Also it makes sure that regular DB is alive during flush filter of intents db.
977
48.2k
  WARN_NOT_OK(CompleteShutdownRocksDBs(Destroy::kFalse, &(*op_pauses)),
978
48.2k
              "Failed to reset rocksdb during shutdown");
979
980
48.2k
  if (post_split_compaction_task_pool_token_) {
981
0
    post_split_compaction_task_pool_token_->Shutdown();
982
0
  }
983
984
48.2k
  state_ = kShutdown;
985
986
96.3k
  for (auto* op_pause : op_pauses->AsArray()) {
987
    // Release the mutex that prevents snapshot restore / truncate operations from running. Such
988
    // operations are no longer possible because the tablet has shut down. When we start the
989
    // "read/write operation pause", we incremented the "exclusive operation" counter. This will
990
    // prevent us from decrementing that counter back, disabling read/write operations permanently.
991
96.3k
    op_pause->ReleaseMutexButKeepDisabled();
992
    // Ensure that op_pause stays in scope throughout this function.
993
18.4E
    LOG_IF(DFATAL, !op_pause->status().ok()) << op_pause->status();
994
96.3k
  }
995
48.2k
}
996
997
CHECKED_STATUS ResetRocksDB(
998
417k
    bool destroy, const rocksdb::Options& options, std::unique_ptr<rocksdb::DB>* db) {
999
417k
  if (!*db) {
1000
107k
    return Status::OK();
1001
107k
  }
1002
1003
310k
  auto dir = (**db).GetName();
1004
310k
  db->reset();
1005
310k
  if (!destroy) {
1006
67.4k
    return Status::OK();
1007
67.4k
  }
1008
1009
242k
  return rocksdb::DestroyDB(dir, options);
1010
242k
}
1011
1012
Result<TabletScopedRWOperationPauses> Tablet::StartShutdownRocksDBs(
1013
208k
    DisableFlushOnShutdown disable_flush_on_shutdown, Stop stop) {
1014
208k
  TabletScopedRWOperationPauses op_pauses;
1015
1016
417k
  auto pause = [this, stop](const Abortable abortable) -> Result<ScopedRWOperationPause> {
1017
417k
    auto op_pause = PauseReadWriteOperations(abortable, stop);
1018
417k
    if (!op_pause.ok()) {
1019
0
      return op_pause.status().CloneAndPrepend("Failed to stop read/write operations: ");
1020
0
    }
1021
417k
    return std::move(op_pause);
1022
417k
  };
1023
1024
208k
  op_pauses.non_abortable = VERIFY_RESULT(pause(Abortable::kFalse));
1025
1026
208k
  bool expected = false;
1027
  // If shutdown has been already requested, we still might need to wait for all pending read/write
1028
  // operations to complete here, because caller is not holding ScopedRWOperationPause.
1029
208k
  if (rocksdb_shutdown_requested_.compare_exchange_strong(expected, true)) {
1030
417k
    for (auto* db : {regular_db_.get(), intents_db_.get()}) {
1031
417k
      if (db) {
1032
310k
        db->SetDisableFlushOnShutdown(disable_flush_on_shutdown);
1033
310k
        db->StartShutdown();
1034
310k
      }
1035
417k
    }
1036
208k
  }
1037
1038
208k
  op_pauses.abortable = VERIFY_RESULT(pause(Abortable::kTrue));
1039
1040
208k
  return op_pauses;
1041
208k
}
1042
1043
Status Tablet::CompleteShutdownRocksDBs(
1044
208k
    Destroy destroy, TabletScopedRWOperationPauses* ops_pauses) {
1045
  // We need non-null ops_pauses just to guarantee that PauseReadWriteOperations has been called.
1046
208k
  RSTATUS_DCHECK(
1047
208k
      ops_pauses != nullptr, InvalidArgument,
1048
208k
      "ops_pauses could not be null, StartRocksDbShutdown should be called before "
1049
208k
      "ShutdownRocksDBs.");
1050
1051
208k
  if (intents_db_) {
1052
101k
    intents_db_->ListenFilesChanged(nullptr);
1053
101k
  }
1054
1055
208k
  rocksdb::Options rocksdb_options;
1056
208k
  if (destroy) {
1057
160k
    InitRocksDBOptions(&rocksdb_options, LogPrefix());
1058
160k
  }
1059
1060
208k
  Status intents_status = ResetRocksDB(destroy, rocksdb_options, &intents_db_);
1061
208k
  Status regular_status = ResetRocksDB(destroy, rocksdb_options, &regular_db_);
1062
208k
  key_bounds_ = docdb::KeyBounds();
1063
  // Reset rocksdb_shutdown_requested_ to the initial state like RocksDBs were never opened,
1064
  // so we don't have to reset it on RocksDB open (we potentially can have several places in the
1065
  // code doing opening RocksDB while RocksDB shutdown is always going through
1066
  // Tablet::ShutdownRocksDBs).
1067
208k
  rocksdb_shutdown_requested_ = false;
1068
1069
18.4E
  return regular_status.ok() ? intents_status : regular_status;
1070
208k
}
1071
1072
Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::NewRowIterator(
1073
    const Schema &projection,
1074
    const ReadHybridTime read_hybrid_time,
1075
    const TableId& table_id,
1076
    CoarseTimePoint deadline,
1077
    AllowBootstrappingState allow_bootstrapping_state,
1078
123k
    const Slice& sub_doc_key) const {
1079
123k
  if (state_ != kOpen && (!allow_bootstrapping_state || state_ != kBootstrapping)) {
1080
0
    return STATUS_FORMAT(IllegalState, "Tablet in wrong state: $0", state_);
1081
0
  }
1082
1083
123k
  if (table_type_ != TableType::YQL_TABLE_TYPE && table_type_ != TableType::PGSQL_TABLE_TYPE) {
1084
0
    return STATUS_FORMAT(NotSupported, "Invalid table type: $0", table_type_);
1085
0
  }
1086
1087
123k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
1088
123k
  RETURN_NOT_OK(scoped_read_operation);
1089
1090
18.4E
  VLOG_WITH_PREFIX(2) << "Created new Iterator reading at " << read_hybrid_time.ToString();
1091
1092
123k
  const std::shared_ptr<tablet::TableInfo> table_info =
1093
123k
      VERIFY_RESULT(metadata_->GetTableInfo(table_id));
1094
123k
  const Schema& schema = *table_info->schema;
1095
123k
  auto mapped_projection = std::make_unique<Schema>();
1096
123k
  RETURN_NOT_OK(schema.GetMappedReadProjection(projection, mapped_projection.get()));
1097
1098
123k
  auto txn_op_ctx = VERIFY_RESULT(CreateTransactionOperationContext(
1099
123k
      /* transaction_id */ boost::none,
1100
123k
      schema.table_properties().is_ysql_catalog_table()));
1101
123k
  const auto read_time = read_hybrid_time
1102
24.0k
      ? read_hybrid_time
1103
99.6k
      : ReadHybridTime::SingleTime(VERIFY_RESULT(SafeTime(RequireLease::kFalse)));
1104
123k
  auto result = std::make_unique<DocRowwiseIterator>(
1105
123k
      std::move(mapped_projection), schema, txn_op_ctx, doc_db(),
1106
123k
      deadline, read_time, &pending_non_abortable_op_counter_);
1107
123k
  RETURN_NOT_OK(result->Init(table_type_, sub_doc_key));
1108
123k
  return std::move(result);
1109
123k
}
1110
1111
Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::NewRowIterator(
1112
0
    const TableId& table_id) const {
1113
0
  const std::shared_ptr<tablet::TableInfo> table_info =
1114
0
      VERIFY_RESULT(metadata_->GetTableInfo(table_id));
1115
0
  return NewRowIterator(*table_info->schema, {}, table_id);
1116
0
}
1117
1118
Status Tablet::ApplyRowOperations(
1119
4.76M
    WriteOperation* operation, AlreadyAppliedToRegularDB already_applied_to_regular_db) {
1120
4.76M
  const auto& write_request =
1121
4.76M
      operation->consensus_round() && operation->consensus_round()->replicate_msg()
1122
          // Online case.
1123
4.47M
          ? operation->consensus_round()->replicate_msg()->write()
1124
          // Bootstrap case.
1125
284k
          : *operation->request();
1126
4.76M
  const KeyValueWriteBatchPB& put_batch = write_request.write_batch();
1127
4.76M
  if (metrics_) {
1128
53
    VLOG(3) << "Applying write batch (write_pairs=" << put_batch.write_pairs().size() << "): "
1129
53
            << put_batch.ShortDebugString();
1130
4.60M
    metrics_->rows_inserted->IncrementBy(put_batch.write_pairs().size());
1131
4.60M
  }
1132
1133
4.76M
  return ApplyOperation(
1134
4.76M
      *operation, write_request.batch_idx(), put_batch, already_applied_to_regular_db);
1135
4.76M
}
1136
1137
Status Tablet::ApplyOperation(
1138
    const Operation& operation, int64_t batch_idx,
1139
    const docdb::KeyValueWriteBatchPB& write_batch,
1140
4.76M
    AlreadyAppliedToRegularDB already_applied_to_regular_db) {
1141
4.76M
  auto hybrid_time = operation.WriteHybridTime();
1142
1143
4.76M
  docdb::ConsensusFrontiers frontiers;
1144
  // Even if we have an external hybrid time, use the local commit hybrid time in the consensus
1145
  // frontier.
1146
4.76M
  auto frontiers_ptr =
1147
4.76M
      InitFrontiers(operation.op_id(), operation.hybrid_time(), &frontiers);
1148
4.76M
  if (frontiers_ptr) {
1149
4.76M
    auto ttl = write_batch.has_ttl()
1150
276
        ? MonoDelta::FromNanoseconds(write_batch.ttl())
1151
4.76M
        : docdb::Value::kMaxTtl;
1152
4.76M
    frontiers_ptr->Largest().set_max_value_level_ttl_expiration_time(
1153
4.76M
        docdb::FileExpirationFromValueTTL(operation.hybrid_time(), ttl));
1154
4.76M
  }
1155
4.76M
  return ApplyKeyValueRowOperations(
1156
4.76M
      batch_idx, write_batch, frontiers_ptr, hybrid_time, already_applied_to_regular_db);
1157
4.76M
}
1158
1159
Status Tablet::WriteTransactionalBatch(
1160
    int64_t batch_idx,
1161
    const KeyValueWriteBatchPB& put_batch,
1162
    HybridTime hybrid_time,
1163
1.32M
    const rocksdb::UserFrontiers* frontiers) {
1164
1.32M
  auto transaction_id = CHECK_RESULT(
1165
1.32M
      FullyDecodeTransactionId(put_batch.transaction().transaction_id()));
1166
1167
1.32M
  bool store_metadata = false;
1168
1.32M
  if (put_batch.transaction().has_isolation()) {
1169
    // Store transaction metadata (status tablet, isolation level etc.)
1170
1.02M
    auto metadata = VERIFY_RESULT(TransactionMetadata::FromPB(put_batch.transaction()));
1171
1.02M
    auto add_result = transaction_participant()->Add(metadata);
1172
1.02M
    if (!add_result.ok()) {
1173
60.3k
      return add_result.status();
1174
60.3k
    }
1175
968k
    store_metadata = add_result.get();
1176
968k
  }
1177
1.26M
  boost::container::small_vector<uint8_t, 16> encoded_replicated_batch_idx_set;
1178
1.26M
  auto prepare_batch_data = transaction_participant()->PrepareBatchData(
1179
1.26M
      transaction_id, batch_idx, &encoded_replicated_batch_idx_set);
1180
1.26M
  if (!prepare_batch_data) {
1181
    // If metadata is missing it could be caused by aborted and removed transaction.
1182
    // In this case we should not add new intents for it.
1183
119
    return STATUS(TryAgain,
1184
119
                  Format("Transaction metadata missing: $0, looks like it was just aborted",
1185
119
                         transaction_id), Slice(),
1186
119
                         PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE));
1187
119
  }
1188
1189
1.26M
  auto isolation_level = prepare_batch_data->first;
1190
1.26M
  auto& last_batch_data = prepare_batch_data->second;
1191
1192
1.26M
  docdb::TransactionalWriter writer(
1193
1.26M
      put_batch, hybrid_time, transaction_id, isolation_level,
1194
1.26M
      docdb::PartialRangeKeyIntents(metadata_->UsePartialRangeKeyIntents()),
1195
1.26M
      Slice(encoded_replicated_batch_idx_set.data(), encoded_replicated_batch_idx_set.size()),
1196
1.26M
      last_batch_data.next_write_id);
1197
1.26M
  if (store_metadata) {
1198
967k
    writer.SetMetadataToStore(&put_batch.transaction());
1199
967k
  }
1200
1.26M
  rocksdb::WriteBatch write_batch;
1201
1.26M
  write_batch.SetDirectWriter(&writer);
1202
1.26M
  RequestScope request_scope(transaction_participant_.get());
1203
1204
1.26M
  WriteToRocksDB(frontiers, &write_batch, StorageDbType::kIntents);
1205
1206
1.26M
  last_batch_data.hybrid_time = hybrid_time;
1207
1.26M
  last_batch_data.next_write_id = writer.intra_txn_write_id();
1208
1.26M
  transaction_participant()->BatchReplicated(transaction_id, last_batch_data);
1209
1210
1.26M
  return Status::OK();
1211
1.26M
}
1212
1213
Status Tablet::ApplyKeyValueRowOperations(
1214
    int64_t batch_idx,
1215
    const KeyValueWriteBatchPB& put_batch,
1216
    const rocksdb::UserFrontiers* frontiers,
1217
    const HybridTime hybrid_time,
1218
4.76M
    AlreadyAppliedToRegularDB already_applied_to_regular_db) {
1219
4.76M
  if (put_batch.write_pairs().empty() && put_batch.read_pairs().empty() &&
1220
8.72k
      put_batch.apply_external_transactions().empty()) {
1221
8.72k
    return Status::OK();
1222
8.72k
  }
1223
1224
  // Could return failure only for cases where it is safe to skip applying operations to DB.
1225
  // For instance where aborted transaction intents are written.
1226
  // In all other cases we should crash instead of skipping apply.
1227
1228
4.75M
  if (put_batch.has_transaction()) {
1229
1.32M
    RETURN_NOT_OK(WriteTransactionalBatch(batch_idx, put_batch, hybrid_time, frontiers));
1230
3.43M
  } else {
1231
3.43M
    rocksdb::WriteBatch regular_write_batch;
1232
3.42M
    auto* regular_write_batch_ptr = !already_applied_to_regular_db ? &regular_write_batch : nullptr;
1233
1234
    // See comments for PrepareExternalWriteBatch.
1235
3.43M
    rocksdb::WriteBatch intents_write_batch;
1236
3.43M
    bool has_non_exteranl_records = PrepareExternalWriteBatch(
1237
3.43M
        put_batch, hybrid_time, intents_db_.get(), regular_write_batch_ptr, &intents_write_batch);
1238
1239
3.43M
    if (intents_write_batch.Count() != 0) {
1240
0
      if (!metadata_->is_under_twodc_replication()) {
1241
0
        RETURN_NOT_OK(metadata_->SetIsUnderTwodcReplicationAndFlush(true));
1242
0
      }
1243
0
      WriteToRocksDB(frontiers, &intents_write_batch, StorageDbType::kIntents);
1244
0
    }
1245
1246
3.43M
    docdb::NonTransactionalWriter writer(put_batch, hybrid_time);
1247
3.43M
    if (!already_applied_to_regular_db && has_non_exteranl_records) {
1248
3.42M
      regular_write_batch.SetDirectWriter(&writer);
1249
3.42M
    }
1250
3.43M
    if (regular_write_batch.Count() != 0 || regular_write_batch.HasDirectWriter()) {
1251
3.42M
      WriteToRocksDB(frontiers, &regular_write_batch, StorageDbType::kRegular);
1252
3.42M
    }
1253
1254
3.43M
    if (snapshot_coordinator_) {
1255
20.3M
      for (const auto& pair : put_batch.write_pairs()) {
1256
20.3M
        WARN_NOT_OK(snapshot_coordinator_->ApplyWritePair(pair.key(), pair.value()),
1257
20.3M
                    "ApplyWritePair failed");
1258
20.3M
      }
1259
435k
    }
1260
3.43M
  }
1261
1262
4.69M
  return Status::OK();
1263
4.75M
}
1264
1265
void Tablet::WriteToRocksDB(
1266
    const rocksdb::UserFrontiers* frontiers,
1267
    rocksdb::WriteBatch* write_batch,
1268
6.39M
    docdb::StorageDbType storage_db_type) {
1269
6.39M
  rocksdb::DB* dest_db = nullptr;
1270
6.39M
  switch (storage_db_type) {
1271
4.16M
    case StorageDbType::kRegular: dest_db = regular_db_.get(); break;
1272
2.22M
    case StorageDbType::kIntents: dest_db = intents_db_.get(); break;
1273
6.39M
  }
1274
1275
  // Frontiers can be null for deferred apply operations.
1276
6.39M
  if (frontiers) {
1277
6.39M
    write_batch->SetFrontiers(frontiers);
1278
6.39M
  }
1279
1280
  // We are using Raft replication index for the RocksDB sequence number for
1281
  // all members of this write batch.
1282
6.39M
  rocksdb::WriteOptions write_options;
1283
6.39M
  InitRocksDBWriteOptions(&write_options);
1284
1285
6.39M
  auto rocksdb_write_status = dest_db->Write(write_options, write_batch);
1286
6.39M
  if (!rocksdb_write_status.ok()) {
1287
0
    LOG_WITH_PREFIX(FATAL) << "Failed to write a batch with " << write_batch->Count()
1288
0
                           << " operations into RocksDB: " << rocksdb_write_status;
1289
0
  }
1290
1291
6.39M
  if (FLAGS_TEST_docdb_log_write_batches) {
1292
0
    LOG_WITH_PREFIX(INFO)
1293
0
        << "Wrote " << write_batch->Count() << " key/value pairs to " << storage_db_type
1294
0
        << " RocksDB:\n" << docdb::WriteBatchToString(
1295
0
            *write_batch,
1296
0
            storage_db_type,
1297
0
            BinaryOutputFormat::kEscapedAndHex,
1298
0
            WriteBatchOutputFormat::kArrow,
1299
0
            "  " + LogPrefix(storage_db_type));
1300
0
  }
1301
6.39M
}
1302
1303
//--------------------------------------------------------------------------------------------------
1304
// Redis Request Processing.
1305
Status Tablet::HandleRedisReadRequest(CoarseTimePoint deadline,
1306
                                      const ReadHybridTime& read_time,
1307
                                      const RedisReadRequestPB& redis_read_request,
1308
42.9k
                                      RedisResponsePB* response) {
1309
  // TODO: move this locking to the top-level read request handler in TabletService.
1310
42.9k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline);
1311
42.9k
  RETURN_NOT_OK(scoped_read_operation);
1312
1313
42.9k
  ScopedTabletMetricsTracker metrics_tracker(metrics_->redis_read_latency);
1314
1315
42.9k
  docdb::RedisReadOperation doc_op(redis_read_request, doc_db(), deadline, read_time);
1316
42.9k
  RETURN_NOT_OK(doc_op.Execute());
1317
42.9k
  *response = std::move(doc_op.response());
1318
42.9k
  return Status::OK();
1319
42.9k
}
1320
1321
bool IsSchemaVersionCompatible(
1322
7.06M
    uint32_t current_version, uint32_t request_version, bool compatible_with_previous_version) {
1323
7.06M
  if (request_version == current_version) {
1324
7.06M
    return true;
1325
7.06M
  }
1326
1327
1.12k
  if (compatible_with_previous_version && request_version == current_version + 1) {
1328
0
    DVLOG(1) << (FLAGS_yql_allow_compatible_schema_versions ? "A" : "Not a")
1329
0
             << "ccepting request that is ahead of us by 1 version";
1330
77
    return FLAGS_yql_allow_compatible_schema_versions;
1331
77
  }
1332
1333
872
  return false;
1334
872
}
1335
1336
//--------------------------------------------------------------------------------------------------
1337
// CQL Request Processing.
1338
Status Tablet::HandleQLReadRequest(
1339
    CoarseTimePoint deadline,
1340
    const ReadHybridTime& read_time,
1341
    const QLReadRequestPB& ql_read_request,
1342
    const TransactionMetadataPB& transaction_metadata,
1343
3.68M
    QLReadRequestResult* result) {
1344
3.68M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline);
1345
3.68M
  RETURN_NOT_OK(scoped_read_operation);
1346
3.68M
  ScopedTabletMetricsTracker metrics_tracker(metrics_->ql_read_latency);
1347
1348
3.68M
  if (!IsSchemaVersionCompatible(
1349
3.68M
          metadata()->schema_version(), ql_read_request.schema_version(),
1350
132
          ql_read_request.is_compatible_with_previous_version())) {
1351
0
    DVLOG(1) << "Setting status for read as YQL_STATUS_SCHEMA_VERSION_MISMATCH";
1352
132
    result->response.set_status(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH);
1353
132
    result->response.set_error_message(Format(
1354
132
        "schema version mismatch for table $0: expected $1, got $2 (compt with prev: $3)",
1355
132
        metadata()->table_id(),
1356
132
        metadata()->schema_version(),
1357
132
        ql_read_request.schema_version(),
1358
132
        ql_read_request.is_compatible_with_previous_version()));
1359
132
    return Status::OK();
1360
132
  }
1361
1362
3.68M
  Result<TransactionOperationContext> txn_op_ctx =
1363
3.68M
      CreateTransactionOperationContext(transaction_metadata, /* is_ysql_catalog_table */ false);
1364
3.68M
  RETURN_NOT_OK(txn_op_ctx);
1365
3.68M
  return AbstractTablet::HandleQLReadRequest(
1366
3.68M
      deadline, read_time, ql_read_request, *txn_op_ctx, result);
1367
3.68M
}
1368
1369
CHECKED_STATUS Tablet::CreatePagingStateForRead(const QLReadRequestPB& ql_read_request,
1370
                                                const size_t row_count,
1371
3.66M
                                                QLResponsePB* response) const {
1372
1373
  // If the response does not have a next partition key, it means we are done reading the current
1374
  // tablet. But, if the request does not have the hash columns set, this must be a table-scan,
1375
  // so we need to decide if we are done or if we need to move to the next tablet.
1376
  // If we did not reach the:
1377
  //   1. max number of results (LIMIT clause -- if set)
1378
  //   2. end of the table (this was the last tablet)
1379
  //   3. max partition key (upper bound condition using 'token' -- if set)
1380
  // we set the paging state to point to the exclusive end partition key of this tablet, which is
1381
  // the start key of the next tablet).
1382
3.66M
  if (ql_read_request.hashed_column_values().empty() &&
1383
43.1k
      !response->paging_state().has_next_partition_key()) {
1384
    // Check we did not reach the results limit.
1385
    // If return_paging_state is set, it means the request limit is actually just the page size.
1386
42.1k
    if (!ql_read_request.has_limit() ||
1387
41.0k
        row_count < ql_read_request.limit() ||
1388
42.1k
        ql_read_request.return_paging_state()) {
1389
1390
      // Check we did not reach the last tablet.
1391
42.1k
      const string& next_partition_key = metadata_->partition()->partition_key_end();
1392
42.1k
      if (!next_partition_key.empty()) {
1393
33.1k
        uint16_t next_hash_code = PartitionSchema::DecodeMultiColumnHashValue(next_partition_key);
1394
1395
        // Check we did not reach the max partition key.
1396
33.1k
        if (!ql_read_request.has_max_hash_code() ||
1397
33.1k
            next_hash_code <= ql_read_request.max_hash_code()) {
1398
33.1k
          response->mutable_paging_state()->set_next_partition_key(next_partition_key);
1399
33.1k
        }
1400
33.1k
      }
1401
42.1k
    }
1402
42.1k
  }
1403
1404
  // If there is a paging state, update the total number of rows read so far.
1405
3.66M
  if (response->has_paging_state()) {
1406
34.6k
    response->mutable_paging_state()->set_total_num_rows_read(
1407
34.6k
        ql_read_request.paging_state().total_num_rows_read() + row_count);
1408
34.6k
  }
1409
3.66M
  return Status::OK();
1410
3.66M
}
1411
1412
//--------------------------------------------------------------------------------------------------
1413
// PGSQL Request Processing.
1414
//--------------------------------------------------------------------------------------------------
1415
Status Tablet::HandlePgsqlReadRequest(
1416
    CoarseTimePoint deadline,
1417
    const ReadHybridTime& read_time,
1418
    bool is_explicit_request_read_time,
1419
    const PgsqlReadRequestPB& pgsql_read_request,
1420
    const TransactionMetadataPB& transaction_metadata,
1421
    const SubTransactionMetadataPB& subtransaction_metadata,
1422
    PgsqlReadRequestResult* result,
1423
1.49M
    size_t* num_rows_read) {
1424
1.49M
  TRACE(LogPrefix());
1425
1.49M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline);
1426
1.49M
  RETURN_NOT_OK(scoped_read_operation);
1427
  // TODO(neil) Work on metrics for PGSQL.
1428
  // ScopedTabletMetricsTracker metrics_tracker(metrics_->pgsql_read_latency);
1429
1430
1.49M
  const shared_ptr<tablet::TableInfo> table_info =
1431
1.49M
      VERIFY_RESULT(metadata_->GetTableInfo(pgsql_read_request.table_id()));
1432
  // Assert the table is a Postgres table.
1433
1.49M
  DCHECK_EQ(table_info->table_type, TableType::PGSQL_TABLE_TYPE);
1434
1.49M
  if (table_info->schema_version != pgsql_read_request.schema_version()) {
1435
6
    result->response.set_status(PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH);
1436
6
    result->response.set_error_message(
1437
6
        Format("schema version mismatch for table $0: expected $1, got $2",
1438
6
               table_info->table_id,
1439
6
               table_info->schema_version,
1440
6
               pgsql_read_request.schema_version()));
1441
6
    return Status::OK();
1442
6
  }
1443
1444
1.49M
  Result<TransactionOperationContext> txn_op_ctx =
1445
1.49M
      CreateTransactionOperationContext(
1446
1.49M
          transaction_metadata,
1447
1.49M
          table_info->schema->table_properties().is_ysql_catalog_table(),
1448
1.49M
          &subtransaction_metadata);
1449
1.49M
  RETURN_NOT_OK(txn_op_ctx);
1450
1.49M
  return AbstractTablet::HandlePgsqlReadRequest(
1451
1.49M
      deadline, read_time, is_explicit_request_read_time,
1452
1.49M
      pgsql_read_request, *txn_op_ctx, result, num_rows_read);
1453
1.49M
}
1454
1455
// Returns true if the query can be satisfied by rows present in current tablet.
1456
// Returns false if query requires other tablets to also be scanned. Examples of this include:
1457
//   (1) full table scan queries
1458
//   (2) queries that whose key conditions are such that the query will require a multi tablet
1459
//       scan.
1460
//
1461
// Requests that are of the form batched index lookups of ybctids are sent only to a single tablet.
1462
// However there can arise situations where tablets splitting occurs after such requests are being
1463
// prepared by the pggate layer (specifically pg_doc_op.cc). Under such circumstances, if tablets
1464
// are split into two sub-tablets, then such batched index lookups of ybctid requests should be sent
1465
// to multiple tablets (the two sub-tablets). Hence, the request ends up not being a single tablet
1466
// request.
1467
Result<bool> Tablet::IsQueryOnlyForTablet(
1468
1.49M
    const PgsqlReadRequestPB& pgsql_read_request, size_t row_count) const {
1469
1.49M
  if ((!pgsql_read_request.ybctid_column_value().value().binary_value().empty() &&
1470
6.57k
       (implicit_cast<size_t>(pgsql_read_request.batch_arguments_size()) == row_count ||
1471
4.80k
        pgsql_read_request.batch_arguments_size() == 0)) ||
1472
1.49M
       !pgsql_read_request.partition_column_values().empty() ) {
1473
1.06M
    return true;
1474
1.06M
  }
1475
1476
435k
  std::shared_ptr<const Schema> schema = metadata_->schema();
1477
435k
  if (schema->has_pgtable_id() || schema->has_cotable_id())  {
1478
    // This is a colocated table.
1479
0
    return true;
1480
0
  }
1481
1482
435k
  if (schema->num_hash_key_columns() == 0 &&
1483
379k
      schema->num_range_key_columns() ==
1484
25.6k
          implicit_cast<size_t>(pgsql_read_request.range_column_values_size())) {
1485
    // PK is contained within this tablet.
1486
25.6k
    return true;
1487
25.6k
  }
1488
410k
  return false;
1489
410k
}
1490
1491
Result<bool> Tablet::HasScanReachedMaxPartitionKey(
1492
    const PgsqlReadRequestPB& pgsql_read_request,
1493
    const string& partition_key,
1494
35.3k
    size_t row_count) const {
1495
35.3k
  auto schema = metadata_->schema();
1496
1497
35.3k
  if (schema->num_hash_key_columns() > 0) {
1498
35.2k
    uint16_t next_hash_code = PartitionSchema::DecodeMultiColumnHashValue(partition_key);
1499
    // For batched index lookup of ybctids, check if the current partition hash is lesser than
1500
    // upper bound. If it is, we can then avoid paging. Paging of batched index lookup of ybctids
1501
    // occur when tablets split after request is prepared.
1502
35.2k
    if (pgsql_read_request.has_ybctid_column_value() &&
1503
35
        implicit_cast<size_t>(pgsql_read_request.batch_arguments_size()) > row_count) {
1504
35
      if (!pgsql_read_request.upper_bound().has_key()) {
1505
0
          return false;
1506
0
      }
1507
35
      uint16_t upper_bound_hash =
1508
35
          PartitionSchema::DecodeMultiColumnHashValue(pgsql_read_request.upper_bound().key());
1509
35
      uint16_t partition_hash =
1510
35
          PartitionSchema::DecodeMultiColumnHashValue(partition_key);
1511
35
      return pgsql_read_request.upper_bound().is_inclusive() ?
1512
0
          partition_hash > upper_bound_hash :
1513
35
          partition_hash >= upper_bound_hash;
1514
35
    }
1515
35.2k
    if (pgsql_read_request.has_max_hash_code() &&
1516
1.15k
        next_hash_code > pgsql_read_request.max_hash_code()) {
1517
1.15k
      return true;
1518
1.15k
    }
1519
41
  } else if (pgsql_read_request.has_upper_bound()) {
1520
30
    docdb::DocKey partition_doc_key(*schema);
1521
30
    VERIFY_RESULT(partition_doc_key.DecodeFrom(
1522
30
        partition_key, docdb::DocKeyPart::kWholeDocKey, docdb::AllowSpecial::kTrue));
1523
30
    docdb::DocKey max_partition_doc_key(*schema);
1524
30
    VERIFY_RESULT(max_partition_doc_key.DecodeFrom(
1525
30
        pgsql_read_request.upper_bound().key(), docdb::DocKeyPart::kWholeDocKey,
1526
30
        docdb::AllowSpecial::kTrue));
1527
1528
30
    auto cmp = partition_doc_key.CompareTo(max_partition_doc_key);
1529
30
    return pgsql_read_request.upper_bound().is_inclusive() ? cmp > 0 : cmp >= 0;
1530
34.0k
  }
1531
1532
34.0k
  return false;
1533
34.0k
}
1534
1535
namespace {
1536
1537
void SetBackfillSpecForYsqlBackfill(
1538
    const PgsqlReadRequestPB& pgsql_read_request,
1539
    const size_t& row_count,
1540
289
    PgsqlResponsePB* response) {
1541
289
  PgsqlBackfillSpecPB in_spec;
1542
289
  in_spec.ParseFromString(a2b_hex(pgsql_read_request.backfill_spec()));
1543
1544
289
  auto limit = in_spec.limit();
1545
289
  PgsqlBackfillSpecPB out_spec;
1546
289
  out_spec.set_limit(limit);
1547
289
  out_spec.set_count(in_spec.count() + row_count);
1548
289
  response->set_is_backfill_batch_done(!response->has_paging_state());
1549
0
  VLOG(2) << " limit is " << limit << " set_count to " << out_spec.count();
1550
289
  if (limit >= 0 && out_spec.count() >= limit) {
1551
    // Hint postgres to stop scanning now. And set up the
1552
    // next_row_key based on the paging state.
1553
42
    if (response->has_paging_state()) {
1554
22
      out_spec.set_next_row_key(response->paging_state().next_row_key());
1555
22
    }
1556
42
    response->set_is_backfill_batch_done(true);
1557
42
  }
1558
1559
0
  VLOG(2) << "Got input spec " << yb::ToString(in_spec)
1560
0
          << " set output spec " << yb::ToString(out_spec)
1561
0
          << " batch_done=" << response->is_backfill_batch_done();
1562
289
  string serialized_pb;
1563
289
  out_spec.SerializeToString(&serialized_pb);
1564
289
  response->set_backfill_spec(b2a_hex(serialized_pb));
1565
289
}
1566
1567
}  // namespace
1568
1569
CHECKED_STATUS Tablet::CreatePagingStateForRead(const PgsqlReadRequestPB& pgsql_read_request,
1570
                                                const size_t row_count,
1571
1.49M
                                                PgsqlResponsePB* response) const {
1572
  // If there is no hash column in the read request, this is a full-table query. And if there is no
1573
  // paging state in the response, we are done reading from the current tablet. In this case, we
1574
  // should return the exclusive end partition key of this tablet if not empty which is the start
1575
  // key of the next tablet. Do so only if the request has no row count limit, or there is and we
1576
  // haven't hit it, or we are asked to return paging state even when we have hit the limit.
1577
  // Otherwise, leave the paging state empty which means we are completely done reading for the
1578
  // whole SELECT statement.
1579
1.49M
  const bool single_tablet_query =
1580
1.49M
      VERIFY_RESULT(IsQueryOnlyForTablet(pgsql_read_request, row_count));
1581
1.49M
  if (!single_tablet_query &&
1582
409k
      !response->has_paging_state() &&
1583
386k
      (!pgsql_read_request.has_limit() || row_count < pgsql_read_request.limit() ||
1584
386k
       pgsql_read_request.return_paging_state())) {
1585
    // For backward scans partition_key_start must be used as next_partition_key.
1586
    // Client level logic will check it and route next request to the preceding tablet.
1587
386k
    const auto& next_partition_key =
1588
386k
        pgsql_read_request.has_hash_code() ||
1589
353k
        pgsql_read_request.is_forward_scan()
1590
386k
            ? metadata_->partition()->partition_key_end()
1591
18.4E
            : metadata_->partition()->partition_key_start();
1592
    // Check we did not reach the last tablet.
1593
386k
    const bool end_scan = next_partition_key.empty() ||
1594
386k
        VERIFY_RESULT(HasScanReachedMaxPartitionKey(
1595
386k
            pgsql_read_request, next_partition_key, row_count));
1596
386k
    if (!end_scan) {
1597
34.0k
      response->mutable_paging_state()->set_next_partition_key(next_partition_key);
1598
34.0k
    }
1599
386k
  }
1600
1601
  // If there is a paging state, update the total number of rows read so far.
1602
1.49M
  if (response->has_paging_state()) {
1603
57.8k
    response->mutable_paging_state()->set_total_num_rows_read(
1604
57.8k
        pgsql_read_request.paging_state().total_num_rows_read() + row_count);
1605
57.8k
  }
1606
1607
1.49M
  if (pgsql_read_request.is_for_backfill()) {
1608
    // BackfillSpec is used to implement "paging" across multiple BackfillIndex
1609
    // rpcs from the master.
1610
289
    SetBackfillSpecForYsqlBackfill(pgsql_read_request, row_count, response);
1611
289
  }
1612
1.49M
  return Status::OK();
1613
1.49M
}
1614
1615
//--------------------------------------------------------------------------------------------------
1616
1617
1.74M
void Tablet::AcquireLocksAndPerformDocOperations(std::unique_ptr<WriteQuery> query) {
1618
1.74M
  TRACE(__func__);
1619
1.74M
  if (table_type_ == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
1620
0
    query->Cancel(
1621
0
        STATUS(NotSupported, "Transaction status table does not support write"));
1622
0
    return;
1623
0
  }
1624
1625
1.74M
  if (!GetAtomicFlag(&FLAGS_disable_alter_vs_write_mutual_exclusion)) {
1626
1.74M
    auto write_permit = GetPermitToWrite(query->deadline());
1627
1.74M
    if (!write_permit.ok()) {
1628
7.17k
      TRACE("Could not get the write permit.");
1629
7.17k
      WriteQuery::StartSynchronization(std::move(query), MoveStatus(write_permit));
1630
7.17k
      return;
1631
7.17k
    }
1632
    // Save the write permit to be released after the operation is submitted
1633
    // to Raft queue.
1634
1.73M
    query->UseSubmitToken(std::move(write_permit));
1635
1.73M
  }
1636
1637
1.74M
  WriteQuery::Execute(std::move(query));
1638
1.74M
}
1639
1640
161k
Status Tablet::Flush(FlushMode mode, FlushFlags flags, int64_t ignore_if_flushed_after_tick) {
1641
161k
  TRACE_EVENT0("tablet", "Tablet::Flush");
1642
1643
161k
  auto pending_op = CreateNonAbortableScopedRWOperation();
1644
1645
161k
  rocksdb::FlushOptions options;
1646
161k
  options.ignore_if_flushed_after_tick = ignore_if_flushed_after_tick;
1647
161k
  bool flush_intents = intents_db_ && HasFlags(flags, FlushFlags::kIntents);
1648
161k
  if (flush_intents) {
1649
82.1k
    options.wait = false;
1650
82.1k
    WARN_NOT_OK(intents_db_->Flush(options), "Flush intents DB");
1651
82.1k
  }
1652
1653
161k
  if (HasFlags(flags, FlushFlags::kRegular) && regular_db_) {
1654
160k
    options.wait = mode == FlushMode::kSync;
1655
160k
    WARN_NOT_OK(regular_db_->Flush(options), "Flush regular DB");
1656
160k
  }
1657
1658
161k
  if (flush_intents && mode == FlushMode::kSync) {
1659
47
    RETURN_NOT_OK(intents_db_->WaitForFlush());
1660
47
  }
1661
1662
161k
  return Status::OK();
1663
161k
}
1664
1665
26
Status Tablet::WaitForFlush() {
1666
26
  TRACE_EVENT0("tablet", "Tablet::WaitForFlush");
1667
1668
26
  if (regular_db_) {
1669
26
    RETURN_NOT_OK(regular_db_->WaitForFlush());
1670
26
  }
1671
26
  if (intents_db_) {
1672
26
    RETURN_NOT_OK(intents_db_->WaitForFlush());
1673
26
  }
1674
1675
26
  return Status::OK();
1676
26
}
1677
1678
0
Status Tablet::ImportData(const std::string& source_dir) {
1679
  // We import only regular records, so don't have to deal with intents here.
1680
0
  return regular_db_->Import(source_dir);
1681
0
}
1682
1683
// We apply intents by iterating over whole transaction reverse index.
1684
// Using value of reverse index record we find original intent record and apply it.
1685
// After that we delete both intent record and reverse index record.
1686
737k
Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApplyData& data) {
1687
701
  VLOG_WITH_PREFIX(4) << __func__ << ": " << data.transaction_id;
1688
1689
  // This flag enables tests to induce a situation where a transaction has committed but its intents
1690
  // haven't yet moved to regular db for a sufficiently long period. For example, it can help a test
1691
  // to reliably assert that conflict resolution/ concurrency control with a conflicting committed
1692
  // transaction is done properly in the rare situation where the committed transaction's intents
1693
  // are still in intents db and not yet in regular db.
1694
737k
  AtomicFlagSleepMs(&FLAGS_TEST_inject_sleep_before_applying_intents_ms);
1695
737k
  docdb::ApplyIntentsContext context(
1696
737k
      data.transaction_id, data.apply_state, data.aborted, data.commit_ht, data.log_ht,
1697
737k
      &key_bounds_, intents_db_.get());
1698
737k
  docdb::IntentsWriter intents_writer(
1699
737k
      data.apply_state ? data.apply_state->key : Slice(), intents_db_.get(), &context);
1700
737k
  rocksdb::WriteBatch regular_write_batch;
1701
737k
  regular_write_batch.SetDirectWriter(&intents_writer);
1702
  // data.hybrid_time contains transaction commit time.
1703
  // We don't set transaction field of put_batch, otherwise we would write another bunch of intents.
1704
737k
  docdb::ConsensusFrontiers frontiers;
1705
737k
  auto frontiers_ptr = data.op_id.empty() ? nullptr : InitFrontiers(data, &frontiers);
1706
737k
  WriteToRocksDB(frontiers_ptr, &regular_write_batch, StorageDbType::kRegular);
1707
737k
  return context.apply_state();
1708
737k
}
1709
1710
template <class Ids>
1711
968k
CHECKED_STATUS Tablet::RemoveIntentsImpl(const RemoveIntentsData& data, const Ids& ids) {
1712
968k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
1713
968k
  RETURN_NOT_OK(scoped_read_operation);
1714
1715
968k
  rocksdb::WriteBatch intents_write_batch;
1716
968k
  for (const auto& id : ids) {
1717
968k
    boost::optional<docdb::ApplyTransactionState> apply_state;
1718
968k
    for (;;) {
1719
968k
      docdb::RemoveIntentsContext context(id);
1720
968k
      docdb::IntentsWriter writer(
1721
968k
          apply_state ? apply_state->key : Slice(), intents_db_.get(), &context);
1722
968k
      intents_write_batch.SetDirectWriter(&writer);
1723
968k
      docdb::ConsensusFrontiers frontiers;
1724
968k
      auto frontiers_ptr = InitFrontiers(data, &frontiers);
1725
968k
      WriteToRocksDB(frontiers_ptr, &intents_write_batch, StorageDbType::kIntents);
1726
1727
968k
      if (!context.apply_state().active()) {
1728
968k
        break;
1729
968k
      }
1730
1731
241
      apply_state = std::move(context.apply_state());
1732
241
      intents_write_batch.Clear();
1733
1734
241
      AtomicFlagSleepMs(&FLAGS_apply_intents_task_injected_delay_ms);
1735
241
    }
1736
968k
  }
1737
1738
968k
  return Status::OK();
1739
968k
}
_ZN2yb6tablet6Tablet17RemoveIntentsImplISt16initializer_listINS_17StronglyTypedUuidINS_17TransactionId_TagEEEEEENS_6StatusERKNS0_17RemoveIntentsDataERKT_
Line
Count
Source
1711
968k
CHECKED_STATUS Tablet::RemoveIntentsImpl(const RemoveIntentsData& data, const Ids& ids) {
1712
968k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
1713
968k
  RETURN_NOT_OK(scoped_read_operation);
1714
1715
968k
  rocksdb::WriteBatch intents_write_batch;
1716
968k
  for (const auto& id : ids) {
1717
968k
    boost::optional<docdb::ApplyTransactionState> apply_state;
1718
968k
    for (;;) {
1719
968k
      docdb::RemoveIntentsContext context(id);
1720
968k
      docdb::IntentsWriter writer(
1721
968k
          apply_state ? apply_state->key : Slice(), intents_db_.get(), &context);
1722
968k
      intents_write_batch.SetDirectWriter(&writer);
1723
968k
      docdb::ConsensusFrontiers frontiers;
1724
968k
      auto frontiers_ptr = InitFrontiers(data, &frontiers);
1725
968k
      WriteToRocksDB(frontiers_ptr, &intents_write_batch, StorageDbType::kIntents);
1726
1727
968k
      if (!context.apply_state().active()) {
1728
968k
        break;
1729
968k
      }
1730
1731
241
      apply_state = std::move(context.apply_state());
1732
241
      intents_write_batch.Clear();
1733
1734
241
      AtomicFlagSleepMs(&FLAGS_apply_intents_task_injected_delay_ms);
1735
241
    }
1736
968k
  }
1737
1738
968k
  return Status::OK();
1739
968k
}
Unexecuted instantiation: _ZN2yb6tablet6Tablet17RemoveIntentsImplINSt3__113unordered_setINS_17StronglyTypedUuidINS_17TransactionId_TagEEEN5boost4hashIS7_EENS3_8equal_toIS7_EENS3_9allocatorIS7_EEEEEENS_6StatusERKNS0_17RemoveIntentsDataERKT_
1740
1741
1742
968k
Status Tablet::RemoveIntents(const RemoveIntentsData& data, const TransactionId& id) {
1743
968k
  return RemoveIntentsImpl(data, std::initializer_list<TransactionId>{id});
1744
968k
}
1745
1746
0
Status Tablet::RemoveIntents(const RemoveIntentsData& data, const TransactionIdSet& transactions) {
1747
0
  return RemoveIntentsImpl(data, transactions);
1748
0
}
1749
1750
// We batch this as some tx could be very large and may not fit in one batch
1751
CHECKED_STATUS Tablet::GetIntents(
1752
    const TransactionId& id,
1753
    std::vector<docdb::IntentKeyValueForCDC>* key_value_intents,
1754
74
    docdb::ApplyTransactionState* stream_state) {
1755
74
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
1756
74
  RETURN_NOT_OK(scoped_read_operation);
1757
1758
74
  docdb::ApplyTransactionState new_stream_state;
1759
1760
74
  new_stream_state = VERIFY_RESULT(
1761
74
      docdb::GetIntentsBatch(id, &key_bounds_, stream_state, intents_db_.get(), key_value_intents));
1762
74
  stream_state->key = new_stream_state.key;
1763
74
  stream_state->write_id = new_stream_state.write_id;
1764
1765
74
  return Status::OK();
1766
74
}
1767
1768
0
Result<HybridTime> Tablet::ApplierSafeTime(HybridTime min_allowed, CoarseTimePoint deadline) {
1769
  // We could not use mvcc_ directly, because correct lease should be passed to it.
1770
0
  return SafeTime(RequireLease::kFalse, min_allowed, deadline);
1771
0
}
1772
1773
Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::CreateCDCSnapshotIterator(
1774
6
    const Schema& projection, const ReadHybridTime& time, const string& next_key) {
1775
0
  VLOG_WITH_PREFIX(2) << "The nextKey is " << next_key;
1776
1777
6
  Slice next_slice;
1778
6
  if (!next_key.empty()) {
1779
0
    SubDocKey start_sub_doc_key;
1780
0
    docdb::KeyBytes start_key_bytes(next_key);
1781
0
    RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice()));
1782
0
    next_slice = start_sub_doc_key.doc_key().Encode().AsSlice();
1783
0
    VLOG_WITH_PREFIX(2) << "The nextKey doc is " << next_key;
1784
0
  }
1785
6
  return NewRowIterator(
1786
6
      projection, time, "", CoarseTimePoint::max(), AllowBootstrappingState::kFalse, next_slice);
1787
6
}
1788
1789
Status Tablet::CreatePreparedChangeMetadata(
1790
547k
    ChangeMetadataOperation *operation, const Schema* schema) {
1791
547k
  if (schema) {
1792
50.4k
    auto key_schema = GetKeySchema(operation->has_table_id() ? operation->table_id() : "");
1793
50.7k
    if (!key_schema.KeyEquals(*schema)) {
1794
0
      return STATUS_FORMAT(
1795
0
          InvalidArgument,
1796
0
          "Schema keys cannot be altered. New schema key: $0. Existing schema key: $1",
1797
0
          schema->CreateKeyProjection(),
1798
0
          key_schema);
1799
0
    }
1800
1801
50.7k
    if (!schema->has_column_ids()) {
1802
      // this probably means that the request is not from the Master
1803
0
      return STATUS(InvalidArgument, "Missing Column IDs");
1804
0
    }
1805
547k
  }
1806
1807
547k
  operation->set_schema(schema);
1808
547k
  return Status::OK();
1809
547k
}
1810
1811
489k
Status Tablet::AddTable(const TableInfoPB& table_info) {
1812
489k
  Schema schema;
1813
489k
  RETURN_NOT_OK(SchemaFromPB(table_info.schema(), &schema));
1814
1815
489k
  PartitionSchema partition_schema;
1816
489k
  RETURN_NOT_OK(PartitionSchema::FromPB(table_info.partition_schema(), schema, &partition_schema));
1817
1818
489k
  metadata_->AddTable(
1819
489k
      table_info.table_id(), table_info.namespace_name(), table_info.table_name(),
1820
489k
      table_info.table_type(), schema, IndexMap(), partition_schema, boost::none,
1821
489k
      table_info.schema_version());
1822
1823
489k
  RETURN_NOT_OK(metadata_->Flush());
1824
1825
489k
  return Status::OK();
1826
489k
}
1827
1828
45
Status Tablet::RemoveTable(const std::string& table_id) {
1829
45
  metadata_->RemoveTable(table_id);
1830
45
  RETURN_NOT_OK(metadata_->Flush());
1831
45
  return Status::OK();
1832
45
}
1833
1834
7.30k
Status Tablet::MarkBackfillDone(const TableId& table_id) {
1835
7.30k
  auto table_info = table_id.empty() ?
1836
7.30k
    metadata_->primary_table_info() : VERIFY_RESULT(metadata_->GetTableInfo(table_id));
1837
7.30k
  LOG_WITH_PREFIX(INFO) << "Setting backfill as done. Current schema  "
1838
7.30k
                        << table_info->schema->ToString();
1839
7.30k
  const vector<DeletedColumn> empty_deleted_cols;
1840
7.30k
  Schema new_schema = *table_info->schema;
1841
7.30k
  new_schema.SetRetainDeleteMarkers(false);
1842
7.30k
  metadata_->SetSchema(
1843
7.30k
      new_schema, *table_info->index_map, empty_deleted_cols, table_info->schema_version, table_id);
1844
7.30k
  return metadata_->Flush();
1845
7.30k
}
1846
1847
48.2k
Status Tablet::AlterSchema(ChangeMetadataOperation *operation) {
1848
48.2k
  auto current_table_info = VERIFY_RESULT(metadata_->GetTableInfo(
1849
48.2k
        operation->request()->has_alter_table_id() ?
1850
48.2k
        operation->request()->alter_table_id() : ""));
1851
48.2k
  auto key_schema = current_table_info->schema->CreateKeyProjection();
1852
1853
48.2k
  RSTATUS_DCHECK_NE(operation->schema(), static_cast<void*>(nullptr), InvalidArgument,
1854
48.2k
                    "Schema could not be null");
1855
48.2k
  RSTATUS_DCHECK(key_schema.KeyEquals(*DCHECK_NOTNULL(operation->schema())), InvalidArgument,
1856
48.2k
                 "Schema keys cannot be altered");
1857
1858
  // Abortable read/write operations could be long and they shouldn't access metadata_ without
1859
  // locks, so no need to wait for them here.
1860
48.2k
  auto op_pause = PauseReadWriteOperations(Abortable::kFalse);
1861
48.2k
  RETURN_NOT_OK(op_pause);
1862
1863
  // If the current version >= new version, there is nothing to do.
1864
48.2k
  if (current_table_info->schema_version >= operation->schema_version()) {
1865
1.64k
    LOG_WITH_PREFIX(INFO)
1866
1.64k
        << "Already running schema version " << current_table_info->schema_version
1867
1.64k
        << " got alter request for version " << operation->schema_version();
1868
1.64k
    return Status::OK();
1869
1.64k
  }
1870
1871
46.6k
  LOG_WITH_PREFIX(INFO) << "Alter schema from " << current_table_info->schema->ToString()
1872
46.6k
                        << " version " << current_table_info->schema_version
1873
46.6k
                        << " to " << operation->schema()->ToString()
1874
46.6k
                        << " version " << operation->schema_version();
1875
1876
  // Find out which columns have been deleted in this schema change, and add them to metadata.
1877
46.6k
  vector<DeletedColumn> deleted_cols;
1878
248k
  for (const auto& col : current_table_info->schema->column_ids()) {
1879
248k
    if (operation->schema()->find_column_by_id(col) == Schema::kColumnNotFound) {
1880
963
      deleted_cols.emplace_back(col, clock_->Now());
1881
963
      LOG_WITH_PREFIX(INFO) << "Column " << col << " recorded as deleted.";
1882
963
    }
1883
248k
  }
1884
1885
46.6k
  metadata_->SetSchema(*operation->schema(), operation->index_map(), deleted_cols,
1886
46.6k
                      operation->schema_version(), current_table_info->table_id);
1887
46.6k
  if (operation->has_new_table_name()) {
1888
46.6k
    metadata_->SetTableName(current_table_info->namespace_name, operation->new_table_name());
1889
46.6k
    if (table_metrics_entity_) {
1890
46.6k
      table_metrics_entity_->SetAttribute("table_name", operation->new_table_name());
1891
46.6k
      table_metrics_entity_->SetAttribute("namespace_name", current_table_info->namespace_name);
1892
46.6k
    }
1893
46.7k
    if (tablet_metrics_entity_) {
1894
46.7k
      tablet_metrics_entity_->SetAttribute("table_name", operation->new_table_name());
1895
46.7k
      tablet_metrics_entity_->SetAttribute("namespace_name", current_table_info->namespace_name);
1896
46.7k
    }
1897
46.6k
  }
1898
1899
  // Clear old index table metadata cache.
1900
46.6k
  ResetYBMetaDataCache();
1901
1902
  // Create transaction manager and index table metadata cache for secondary index update.
1903
46.6k
  if (!operation->index_map().empty()) {
1904
39.7k
    if (current_table_info->schema->table_properties().is_transactional() &&
1905
30.0k
        !transaction_manager_) {
1906
4.05k
      transaction_manager_ = std::make_unique<client::TransactionManager>(
1907
4.05k
          client_future_.get(), scoped_refptr<server::Clock>(clock_), local_tablet_filter_);
1908
4.05k
    }
1909
39.7k
    CreateNewYBMetaDataCache();
1910
39.7k
  }
1911
1912
  // Flush the updated schema metadata to disk.
1913
46.6k
  return metadata_->Flush();
1914
46.6k
}
1915
1916
2.55k
Status Tablet::AlterWalRetentionSecs(ChangeMetadataOperation* operation) {
1917
2.55k
  if (operation->has_wal_retention_secs()) {
1918
2.55k
    LOG_WITH_PREFIX(INFO) << "Altering metadata wal_retention_secs from "
1919
2.55k
                          << metadata_->wal_retention_secs()
1920
2.55k
                          << " to " << operation->wal_retention_secs();
1921
2.55k
    metadata_->set_wal_retention_secs(operation->wal_retention_secs());
1922
    // Flush the updated schema metadata to disk.
1923
2.55k
    return metadata_->Flush();
1924
2.55k
  }
1925
1
  return STATUS_SUBSTITUTE(InvalidArgument, "Invalid ChangeMetadataOperation: $0",
1926
1
                           operation->ToString());
1927
1
}
1928
1929
namespace {
1930
1931
Result<pgwrapper::PGConnPtr> ConnectToPostgres(
1932
    const HostPort& pgsql_proxy_bind_address,
1933
    const std::string& database_name,
1934
235
    const uint64_t postgres_auth_key) {
1935
  // Construct connection string.  Note that the plain password in the connection string will be
1936
  // sent over the wire, but since it only goes over a unix-domain socket, there should be no
1937
  // eavesdropping/tampering issues.
1938
235
  std::string conn_str = Format(
1939
235
      "user=$0 password=$1 host=$2 port=$3 dbname=$4",
1940
235
      "postgres",
1941
235
      postgres_auth_key,
1942
235
      PgDeriveSocketDir(pgsql_proxy_bind_address.host()),
1943
235
      pgsql_proxy_bind_address.port(),
1944
235
      pgwrapper::PqEscapeLiteral(database_name));
1945
235
  std::string conn_str_for_log = Format(
1946
235
      "user=$0 password=$1 host=$2 port=$3 dbname=$4",
1947
235
      "postgres",
1948
235
      "<REDACTED>",
1949
235
      PgDeriveSocketDir(pgsql_proxy_bind_address.host()),
1950
235
      pgsql_proxy_bind_address.port(),
1951
235
      pgwrapper::PqEscapeLiteral(database_name));
1952
0
  VLOG(1) << __func__ << ": libpq connection string: " << conn_str_for_log;
1953
1954
  // Connect.
1955
235
  pgwrapper::PGConnPtr conn(PQconnectdb(conn_str.c_str()));
1956
235
  if (!conn) {
1957
0
    return STATUS(IllegalState, "backfill failed to connect to DB");
1958
0
  }
1959
235
  if (PQstatus(conn.get()) == CONNECTION_BAD) {
1960
0
    std::string msg(PQerrorMessage(conn.get()));
1961
1962
    // Avoid double newline (postgres adds a newline after the error message).
1963
0
    if (msg.back() == '\n') {
1964
0
      msg.resize(msg.size() - 1);
1965
0
    }
1966
0
    LOG(WARNING) << "libpq connection \"" << conn_str_for_log << "\" failed: " << msg;
1967
0
    return STATUS_FORMAT(IllegalState, "backfill connection to DB failed: $0", msg);
1968
0
  }
1969
235
  return conn;
1970
235
}
1971
1972
257
string GenerateSerializedBackfillSpec(size_t batch_size, const string& next_row_to_backfill) {
1973
257
  PgsqlBackfillSpecPB backfill_spec;
1974
257
  std::string serialized_backfill_spec;
1975
  // Note that although we set the desired batch_size as the limit, postgres
1976
  // has its own internal paging size of 1024 (controlled by --ysql_prefetch_limit). So the actual
1977
  // rows processed could be larger than the limit set here; unless it happens
1978
  // to be a multiple of FLAGS_ysql_prefetch_limit
1979
257
  backfill_spec.set_limit(batch_size);
1980
257
  backfill_spec.set_next_row_key(next_row_to_backfill);
1981
257
  backfill_spec.SerializeToString(&serialized_backfill_spec);
1982
0
  VLOG(2) << "Generating backfill_spec " << yb::ToString(backfill_spec) << " encoded as "
1983
0
          << b2a_hex(serialized_backfill_spec) << " a string of length "
1984
0
          << serialized_backfill_spec.length();
1985
257
  return serialized_backfill_spec;
1986
257
}
1987
1988
Result<PgsqlBackfillSpecPB> QueryPostgresToDoBackfill(
1989
257
    const pgwrapper::PGConnPtr& conn, const string& query_str) {
1990
  // Execute.
1991
257
  pgwrapper::PGResultPtr res(PQexec(conn.get(), query_str.c_str()));
1992
257
  if (!res) {
1993
0
    std::string msg(PQerrorMessage(conn.get()));
1994
1995
    // Avoid double newline (postgres adds a newline after the error message).
1996
0
    if (msg.back() == '\n') {
1997
0
      msg.resize(msg.size() - 1);
1998
0
    }
1999
0
    LOG(WARNING) << "libpq query \"" << query_str << "\" was not sent: " << msg;
2000
0
    return STATUS_FORMAT(IllegalState, "backfill query couldn't be sent: $0", msg);
2001
0
  }
2002
2003
257
  ExecStatusType status = PQresultStatus(res.get());
2004
  // TODO(jason): more properly handle bad statuses
2005
257
  if (status != PGRES_TUPLES_OK) {
2006
1
    std::string msg(PQresultErrorMessage(res.get()));
2007
2008
    // Avoid double newline (postgres adds a newline after the error message).
2009
1
    if (msg.back() == '\n') {
2010
1
      msg.resize(msg.size() - 1);
2011
1
    }
2012
1
    LOG(WARNING) << "libpq query \"" << query_str << "\" returned " << PQresStatus(status) << ": "
2013
1
                 << msg;
2014
1
    return STATUS(IllegalState, msg);
2015
1
  }
2016
2017
256
  CHECK_EQ(PQntuples(res.get()), 1);
2018
256
  CHECK_EQ(PQnfields(res.get()), 1);
2019
256
  const std::string returned_spec = CHECK_RESULT(pgwrapper::GetString(res.get(), 0, 0));
2020
0
  VLOG(3) << "Got back " << returned_spec << " of length " << returned_spec.length();
2021
2022
256
  PgsqlBackfillSpecPB spec;
2023
256
  spec.ParseFromString(a2b_hex(returned_spec));
2024
256
  return spec;
2025
256
}
2026
2027
struct BackfillParams {
2028
  explicit BackfillParams(const CoarseTimePoint deadline)
2029
      : start_time(CoarseMonoClock::Now()),
2030
        deadline(deadline),
2031
        rate_per_sec(GetAtomicFlag(&FLAGS_backfill_index_rate_rows_per_sec)),
2032
2.61k
        batch_size(GetAtomicFlag(&FLAGS_backfill_index_write_batch_size)) {
2033
2.61k
    auto grace_margin_ms = GetAtomicFlag(&FLAGS_backfill_index_timeout_grace_margin_ms);
2034
2.61k
    if (grace_margin_ms < 0) {
2035
      // We need: grace_margin_ms >= 1000 * batch_size / rate_per_sec;
2036
      // By default, we will set it to twice the minimum value + 1s.
2037
2.61k
      grace_margin_ms = (rate_per_sec > 0 ? 1000 * (1 + 2.0 * batch_size / rate_per_sec) : 1000);
2038
2.61k
      YB_LOG_EVERY_N_SECS(INFO, 10)
2039
713
          << "Using grace margin of " << grace_margin_ms << "ms, original deadline: "
2040
713
          << MonoDelta(deadline - start_time);
2041
2.61k
    }
2042
2.61k
    modified_deadline = deadline - grace_margin_ms * 1ms;
2043
2.61k
  }
2044
2045
  CoarseTimePoint start_time;
2046
  CoarseTimePoint deadline;
2047
  size_t rate_per_sec;
2048
  size_t batch_size;
2049
  CoarseTimePoint modified_deadline;
2050
};
2051
2052
// Slow down before the next batch to throttle the rate of processing.
2053
void MaybeSleepToThrottleBackfill(
2054
    const CoarseTimePoint& start_time,
2055
3.06k
    size_t number_of_rows_processed) {
2056
3.06k
  if (FLAGS_backfill_index_rate_rows_per_sec <= 0) {
2057
3.00k
    return;
2058
3.00k
  }
2059
2060
61
  auto now = CoarseMonoClock::Now();
2061
61
  auto duration_for_rows_processed = MonoDelta(now - start_time);
2062
61
  auto expected_time_for_processing_rows = MonoDelta::FromMilliseconds(
2063
61
      number_of_rows_processed * 1000 / FLAGS_backfill_index_rate_rows_per_sec);
2064
7
  DVLOG(3) << "Duration since last batch " << duration_for_rows_processed << " expected duration "
2065
7
           << expected_time_for_processing_rows << " extra time to sleep: "
2066
7
           << expected_time_for_processing_rows - duration_for_rows_processed;
2067
61
  if (duration_for_rows_processed < expected_time_for_processing_rows) {
2068
51
    SleepFor(expected_time_for_processing_rows - duration_for_rows_processed);
2069
51
  }
2070
61
}
2071
2072
bool CanProceedToBackfillMoreRows(
2073
    const BackfillParams& backfill_params,
2074
2.89k
    size_t number_of_rows_processed) {
2075
2.89k
  auto now = CoarseMonoClock::Now();
2076
2.89k
  if (now > backfill_params.modified_deadline ||
2077
2.89k
      (FLAGS_TEST_backfill_paging_size > 0 &&
2078
296
       number_of_rows_processed >= FLAGS_TEST_backfill_paging_size)) {
2079
    // We are done if we are out of time.
2080
    // Or, if for testing purposes we have a bound on the size of batches to process.
2081
96
    return false;
2082
96
  }
2083
2.79k
  return true;
2084
2.79k
}
2085
2086
bool CanProceedToBackfillMoreRows(
2087
    const BackfillParams& backfill_params,
2088
    const string& backfilled_until,
2089
256
    size_t number_of_rows_processed) {
2090
256
  if (backfilled_until.empty()) {
2091
    // The backfill is done for this tablet. No need to do another batch.
2092
234
    return false;
2093
234
  }
2094
2095
22
  return CanProceedToBackfillMoreRows(backfill_params, number_of_rows_processed);
2096
22
}
2097
2098
}  // namespace
2099
2100
// Assume that we are already in the Backfilling mode.
2101
Status Tablet::BackfillIndexesForYsql(
2102
    const std::vector<IndexInfo>& indexes,
2103
    const std::string& backfill_from,
2104
    const CoarseTimePoint deadline,
2105
    const HybridTime read_time,
2106
    const HostPort& pgsql_proxy_bind_address,
2107
    const std::string& database_name,
2108
    const uint64_t postgres_auth_key,
2109
    size_t* number_of_rows_processed,
2110
235
    std::string* backfilled_until) {
2111
235
  if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) {
2112
0
    TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms);
2113
0
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms));
2114
0
  }
2115
235
  LOG(INFO) << "Begin " << __func__ << " at " << read_time << " from "
2116
235
            << (backfill_from.empty() ? "<start-of-the-tablet>" : strings::b2a_hex(backfill_from))
2117
235
            << " for " << AsString(indexes);
2118
235
  *backfilled_until = backfill_from;
2119
235
  pgwrapper::PGConnPtr conn =
2120
235
      VERIFY_RESULT(ConnectToPostgres(pgsql_proxy_bind_address, database_name, postgres_auth_key));
2121
2122
  // Construct query string.
2123
235
  std::string index_oids;
2124
235
  {
2125
235
    std::stringstream ss;
2126
235
    for (auto& index : indexes) {
2127
      // Cannot use Oid type because for large OID such as 2147500041, it overflows Postgres
2128
      // lexer <ival> type. Use int to output as -2147467255 that is accepted by <ival>.
2129
235
      int index_oid = VERIFY_RESULT(GetPgsqlTableOid(index.table_id()));
2130
235
      ss << index_oid << ",";
2131
235
    }
2132
235
    index_oids = ss.str();
2133
235
    index_oids.pop_back();
2134
235
  }
2135
235
  std::string partition_key = metadata_->partition()->partition_key_start();
2136
2137
235
  BackfillParams backfill_params(deadline);
2138
235
  *number_of_rows_processed = 0;
2139
257
  do {
2140
257
    std::string serialized_backfill_spec =
2141
257
        GenerateSerializedBackfillSpec(backfill_params.batch_size, *backfilled_until);
2142
2143
    // This should be safe from injection attacks because the parameters only consist of characters
2144
    // [-,0-9a-f].
2145
257
    std::string query_str = Format(
2146
257
        "BACKFILL INDEX $0 WITH x'$1' READ TIME $2 PARTITION x'$3';",
2147
257
        index_oids,
2148
257
        b2a_hex(serialized_backfill_spec),
2149
257
        read_time.ToUint64(),
2150
257
        b2a_hex(partition_key));
2151
0
    VLOG(1) << __func__ << ": libpq query string: " << query_str;
2152
2153
256
    PgsqlBackfillSpecPB spec = VERIFY_RESULT(QueryPostgresToDoBackfill(conn, query_str));
2154
256
    *number_of_rows_processed += spec.count();
2155
256
    *backfilled_until = spec.next_row_key();
2156
2157
0
    VLOG(2) << "Backfilled " << *number_of_rows_processed << " rows. "
2158
0
            << "Setting backfilled_until to " << b2a_hex(*backfilled_until) << " of length "
2159
0
            << backfilled_until->length();
2160
2161
256
    MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed);
2162
256
  } while (CanProceedToBackfillMoreRows(
2163
256
      backfill_params, *backfilled_until, *number_of_rows_processed));
2164
2165
0
  VLOG(1) << "Backfilled " << *number_of_rows_processed << " rows. "
2166
0
          << "Set backfilled_until to "
2167
0
          << (backfilled_until->empty() ? "(empty)" : b2a_hex(*backfilled_until));
2168
234
  return Status::OK();
2169
235
}
2170
2171
std::vector<yb::ColumnSchema> Tablet::GetColumnSchemasForIndex(
2172
2.38k
    const std::vector<IndexInfo>& indexes) {
2173
2.38k
  std::unordered_set<yb::ColumnId> col_ids_set;
2174
2.38k
  std::vector<yb::ColumnSchema> columns;
2175
2176
12.2k
  for (auto idx : schema()->column_ids()) {
2177
12.2k
    if (schema()->is_key_column(idx)) {
2178
6.54k
      col_ids_set.insert(idx);
2179
6.54k
      auto res = schema()->column_by_id(idx);
2180
6.54k
      if (res) {
2181
6.53k
        columns.push_back(*res);
2182
10
      } else {
2183
10
        LOG(DFATAL) << "Unexpected: cannot find the column in the main table for "
2184
10
                    << idx;
2185
10
      }
2186
6.54k
    }
2187
12.2k
  }
2188
2.42k
  for (const IndexInfo& idx : indexes) {
2189
9.73k
    for (const auto& idx_col : idx.columns()) {
2190
9.73k
      if (col_ids_set.find(idx_col.indexed_column_id) == col_ids_set.end()) {
2191
3.15k
        col_ids_set.insert(idx_col.indexed_column_id);
2192
3.15k
        auto res = schema()->column_by_id(idx_col.indexed_column_id);
2193
3.16k
        if (res) {
2194
3.16k
          columns.push_back(*res);
2195
18.4E
        } else {
2196
18.4E
          LOG(DFATAL) << "Unexpected: cannot find the column in the main table for "
2197
18.4E
                      << idx_col.indexed_column_id;
2198
18.4E
        }
2199
3.15k
      }
2200
9.73k
    }
2201
2.42k
    if (idx.where_predicate_spec()) {
2202
1.32k
      for (const auto col_in_pred : idx.where_predicate_spec()->column_ids()) {
2203
1.32k
        ColumnId col_id_in_pred(col_in_pred);
2204
1.32k
        if (col_ids_set.find(col_id_in_pred) == col_ids_set.end()) {
2205
152
          col_ids_set.insert(col_id_in_pred);
2206
152
          auto res = schema()->column_by_id(col_id_in_pred);
2207
153
          if (res) {
2208
153
            columns.push_back(*res);
2209
18.4E
          } else {
2210
18.4E
            LOG(DFATAL) << "Unexpected: cannot find the column in the main table for " <<
2211
18.4E
              col_id_in_pred;
2212
18.4E
          }
2213
152
        }
2214
1.32k
      }
2215
1.09k
    }
2216
2.42k
  }
2217
2.38k
  return columns;
2218
2.38k
}
2219
2220
namespace {
2221
2222
2.38k
std::vector<TableId> GetIndexIds(const std::vector<IndexInfo>& indexes) {
2223
2.38k
  std::vector<TableId> index_ids;
2224
2.40k
  for (const IndexInfo& idx : indexes) {
2225
2.40k
    index_ids.push_back(idx.table_id());
2226
2.40k
  }
2227
2.38k
  return index_ids;
2228
2.38k
}
2229
2230
template <typename SomeVector>
2231
void SleepToThrottleRate(
2232
0
    SomeVector* index_requests, int32 row_access_rate_per_sec, CoarseTimePoint* last_flushed_at) {
2233
0
  auto now = CoarseMonoClock::Now();
2234
0
  if (row_access_rate_per_sec > 0) {
2235
0
    auto duration_since_last_batch = MonoDelta(now - *last_flushed_at);
2236
0
    auto expected_duration_ms =
2237
0
        MonoDelta::FromMilliseconds(index_requests->size() * 1000 / row_access_rate_per_sec);
2238
0
    DVLOG(3) << "Duration since last batch " << duration_since_last_batch << " expected duration "
2239
0
             << expected_duration_ms
2240
0
             << " extra time so sleep: " << expected_duration_ms - duration_since_last_batch;
2241
0
    if (duration_since_last_batch < expected_duration_ms) {
2242
0
      SleepFor(expected_duration_ms - duration_since_last_batch);
2243
0
    }
2244
0
  }
2245
0
}
2246
2247
Result<client::YBTablePtr> GetTable(
2248
9.54k
    const TableId& table_id, const std::shared_ptr<client::YBMetaDataCache>& metadata_cache) {
2249
  // TODO create async version of GetTable.
2250
  // It is ok to have sync call here, because we use cache and it should not take too long.
2251
9.54k
  client::YBTablePtr index_table;
2252
9.54k
  bool cache_used_ignored = false;
2253
9.54k
  RETURN_NOT_OK(metadata_cache->GetTable(table_id, &index_table, &cache_used_ignored));
2254
9.54k
  return index_table;
2255
9.54k
}
2256
2257
}  // namespace
2258
2259
// Should backfill the index with the information contained in this tablet.
2260
// Assume that we are already in the Backfilling mode.
2261
Status Tablet::BackfillIndexes(
2262
    const std::vector<IndexInfo>& indexes,
2263
    const std::string& backfill_from,
2264
    const CoarseTimePoint deadline,
2265
    const HybridTime read_time,
2266
    size_t* number_of_rows_processed,
2267
    std::string* backfilled_until,
2268
2.38k
    std::unordered_set<TableId>* failed_indexes) {
2269
2.38k
  TRACE(__func__);
2270
2.38k
  if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) {
2271
155
    TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms);
2272
155
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms));
2273
155
  }
2274
4
  VLOG(2) << "Begin BackfillIndexes at " << read_time << " for " << AsString(indexes);
2275
2276
2.38k
  std::vector<TableId> index_ids = GetIndexIds(indexes);
2277
2.38k
  std::vector<yb::ColumnSchema> columns = GetColumnSchemasForIndex(indexes);
2278
2279
2.38k
  Schema projection(columns, {}, schema()->num_key_columns());
2280
2.38k
  auto iter = VERIFY_RESULT(NewRowIterator(
2281
2.38k
      projection, ReadHybridTime::SingleTime(read_time), "" /* table_id */, deadline));
2282
2.38k
  QLTableRow row;
2283
2.38k
  std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>> index_requests;
2284
2285
2.38k
  BackfillParams backfill_params{deadline};
2286
2.38k
  constexpr auto kProgressInterval = 1000;
2287
2288
2.38k
  if (!backfill_from.empty()) {
2289
0
    VLOG(1) << "Resuming backfill from " << b2a_hex(backfill_from);
2290
97
    *backfilled_until = backfill_from;
2291
97
    RETURN_NOT_OK(iter->SeekTuple(Slice(backfill_from)));
2292
97
  }
2293
2294
2.38k
  string resume_backfill_from;
2295
2.38k
  *number_of_rows_processed = 0;
2296
2.38k
  int TEST_number_rows_corrupted = 0;
2297
2.38k
  int TEST_number_rows_dropped = 0;
2298
2299
2.86k
  while (VERIFY_RESULT(iter->HasNext())) {
2300
2.86k
    if (index_requests.empty()) {
2301
427
      *backfilled_until = VERIFY_RESULT(iter->GetTupleId()).ToBuffer();
2302
427
      MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed);
2303
427
    }
2304
2305
2.86k
    if (!CanProceedToBackfillMoreRows(backfill_params, *number_of_rows_processed)) {
2306
96
      resume_backfill_from = VERIFY_RESULT(iter->GetTupleId()).ToBuffer();
2307
96
      break;
2308
2.77k
    }
2309
2310
2.77k
    RETURN_NOT_OK(iter->NextRow(&row));
2311
2.77k
    if (FLAGS_TEST_backfill_sabotage_frequency > 0 &&
2312
0
        *number_of_rows_processed % FLAGS_TEST_backfill_sabotage_frequency == 0) {
2313
0
      VLOG(1) << "Corrupting fetched row: " << row.ToString();
2314
      // Corrupt first key column, since index should not be built on primary key
2315
0
      row.MarkTombstoned(schema()->column_id(0));
2316
0
      TEST_number_rows_corrupted++;
2317
0
    }
2318
2319
2.77k
    if (FLAGS_TEST_backfill_drop_frequency > 0 &&
2320
0
        *number_of_rows_processed % FLAGS_TEST_backfill_drop_frequency == 0) {
2321
0
      (*number_of_rows_processed)++;
2322
0
      VLOG(1) << "Dropping fetched row: " << row.ToString();
2323
0
      TEST_number_rows_dropped++;
2324
0
      continue;
2325
0
    }
2326
2327
0
    DVLOG(2) << "Building index for fetched row: " << row.ToString();
2328
2.77k
    RETURN_NOT_OK(UpdateIndexInBatches(
2329
2.77k
        row, indexes, read_time, backfill_params.deadline, &index_requests,
2330
2.77k
        failed_indexes));
2331
2332
2.77k
    if (++(*number_of_rows_processed) % kProgressInterval == 0) {
2333
0
      VLOG(1) << "Processed " << *number_of_rows_processed << " rows";
2334
0
    }
2335
2.77k
  }
2336
2337
2.38k
  if (FLAGS_TEST_backfill_sabotage_frequency > 0) {
2338
0
    LOG(INFO) << "In total, " << TEST_number_rows_corrupted
2339
0
              << " rows were corrupted in index backfill.";
2340
0
  }
2341
2342
2.38k
  if (FLAGS_TEST_backfill_drop_frequency > 0) {
2343
0
    LOG(INFO) << "In total, " << TEST_number_rows_dropped
2344
0
              << " rows were dropped in index backfill.";
2345
0
  }
2346
2347
5
  VLOG(1) << "Processed " << *number_of_rows_processed << " rows";
2348
2.38k
  RETURN_NOT_OK(FlushWriteIndexBatch(
2349
2.38k
      read_time, backfill_params.deadline, &index_requests, failed_indexes));
2350
2.38k
  MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed);
2351
2.38k
  *backfilled_until = resume_backfill_from;
2352
2.38k
  LOG(INFO) << "Done BackfillIndexes at " << read_time << " for " << AsString(index_ids)
2353
2.38k
            << " until "
2354
2.28k
            << (backfilled_until->empty() ? "<end of the tablet>" : b2a_hex(*backfilled_until));
2355
2.38k
  return Status::OK();
2356
2.38k
}
2357
2358
Status Tablet::UpdateIndexInBatches(
2359
    const QLTableRow& row,
2360
    const std::vector<IndexInfo>& indexes,
2361
    const HybridTime write_time,
2362
    const CoarseTimePoint deadline,
2363
    std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests,
2364
2.77k
    std::unordered_set<TableId>* failed_indexes) {
2365
2.77k
  const QLTableRow& kEmptyRow = QLTableRow::empty_row();
2366
2.77k
  QLExprExecutor expr_executor;
2367
2368
9.59k
  for (const IndexInfo& index : indexes) {
2369
9.59k
    QLWriteRequestPB* const index_request = VERIFY_RESULT(
2370
9.59k
        docdb::CreateAndSetupIndexInsertRequest(
2371
9.59k
            &expr_executor, /* index_has_write_permission */ true,
2372
9.59k
            kEmptyRow, row, &index, index_requests));
2373
9.59k
    if (index_request)
2374
9.54k
      index_request->set_is_backfill(true);
2375
9.59k
  }
2376
2377
  // Update the index write op.
2378
2.77k
  return FlushWriteIndexBatchIfRequired(write_time, deadline, index_requests, failed_indexes);
2379
2.77k
}
2380
2381
Result<std::shared_ptr<YBSession>> Tablet::GetSessionForVerifyOrBackfill(
2382
2.49k
    const CoarseTimePoint deadline) {
2383
2.49k
  if (!client_future_.valid()) {
2384
0
    return STATUS_FORMAT(IllegalState, "Client future is not set up for $0", tablet_id());
2385
0
  }
2386
2387
2.49k
  auto client = client_future_.get();
2388
2.49k
  auto session = std::make_shared<YBSession>(client);
2389
2.49k
  session->SetDeadline(deadline);
2390
2.49k
  return session;
2391
2.49k
}
2392
2393
Status Tablet::FlushWriteIndexBatchIfRequired(
2394
    const HybridTime write_time,
2395
    const CoarseTimePoint deadline,
2396
    std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests,
2397
2.77k
    std::unordered_set<TableId>* failed_indexes) {
2398
2.77k
  if (index_requests->size() < FLAGS_backfill_index_write_batch_size) {
2399
2.66k
    return Status::OK();
2400
2.66k
  }
2401
109
  return FlushWriteIndexBatch(write_time, deadline, index_requests, failed_indexes);
2402
109
}
2403
2404
Status Tablet::FlushWriteIndexBatch(
2405
    const HybridTime write_time,
2406
    const CoarseTimePoint deadline,
2407
    std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests,
2408
2.49k
    std::unordered_set<TableId>* failed_indexes) {
2409
2.49k
  if (!client_future_.valid()) {
2410
0
    return STATUS_FORMAT(IllegalState, "Client future is not set up for $0", tablet_id());
2411
2.49k
  } else if (!YBMetaDataCache()) {
2412
0
    return STATUS(IllegalState, "Table metadata cache is not present for index update");
2413
0
  }
2414
2.49k
  std::shared_ptr<YBSession> session = VERIFY_RESULT(GetSessionForVerifyOrBackfill(deadline));
2415
2416
2.49k
  std::unordered_set<
2417
2.49k
      client::YBqlWriteOpPtr, client::YBqlWritePrimaryKeyComparator,
2418
2.49k
      client::YBqlWritePrimaryKeyComparator>
2419
2.49k
      ops_by_primary_key;
2420
2.49k
  std::vector<shared_ptr<client::YBqlWriteOp>> write_ops;
2421
2422
2.49k
  constexpr int kMaxNumRetries = 10;
2423
2.49k
  auto metadata_cache = YBMetaDataCache();
2424
2425
9.54k
  for (auto& pair : *index_requests) {
2426
9.54k
    client::YBTablePtr index_table =
2427
9.54k
        VERIFY_RESULT(GetTable(pair.first->table_id(), metadata_cache));
2428
2429
9.54k
    shared_ptr<client::YBqlWriteOp> index_op(index_table->NewQLWrite());
2430
9.54k
    index_op->set_write_time_for_backfill(write_time);
2431
9.54k
    index_op->mutable_request()->Swap(&pair.second);
2432
9.54k
    if (index_table->IsUniqueIndex()) {
2433
545
      if (ops_by_primary_key.count(index_op) > 0) {
2434
0
        VLOG(2) << "Splitting the batch of writes because " << index_op->ToString()
2435
0
                << " collides with an existing update in this batch.";
2436
0
        VLOG(1) << "Flushing " << ops_by_primary_key.size() << " ops to the index";
2437
4
        RETURN_NOT_OK(FlushWithRetries(session, write_ops, kMaxNumRetries, failed_indexes));
2438
0
        VLOG(3) << "Done flushing ops to the index";
2439
3
        ops_by_primary_key.clear();
2440
3
      }
2441
544
      ops_by_primary_key.insert(index_op);
2442
544
    }
2443
9.54k
    session->Apply(index_op);
2444
9.54k
    write_ops.push_back(index_op);
2445
9.54k
  }
2446
2447
4
  VLOG(1) << Format("Flushing $0 ops to the index",
2448
0
                    (!ops_by_primary_key.empty() ? ops_by_primary_key.size()
2449
4
                                                 : write_ops.size()));
2450
2.49k
  RETURN_NOT_OK(FlushWithRetries(session, write_ops, kMaxNumRetries, failed_indexes));
2451
2.49k
  index_requests->clear();
2452
2453
2.49k
  return Status::OK();
2454
2.49k
}
2455
2456
template <typename SomeYBqlOp>
2457
Status Tablet::FlushWithRetries(
2458
    shared_ptr<YBSession> session,
2459
    const std::vector<shared_ptr<SomeYBqlOp>>& index_ops,
2460
    int num_retries,
2461
2.48k
    std::unordered_set<TableId>* failed_indexes) {
2462
2.48k
  auto retries_left = num_retries;
2463
2.48k
  std::vector<std::shared_ptr<SomeYBqlOp>> pending_ops = index_ops;
2464
2.48k
  std::unordered_map<string, int32_t> error_msg_cnts;
2465
2.48k
  do {
2466
2.48k
    std::vector<std::shared_ptr<SomeYBqlOp>> failed_ops;
2467
2.48k
    RETURN_NOT_OK_PREPEND(session->Flush(), "Flush failed.");
2468
18.4E
    VLOG(3) << "Done flushing ops to the index";
2469
9.55k
    for (auto index_op : pending_ops) {
2470
9.55k
      if (index_op->response().status() == QLResponsePB::YQL_STATUS_OK) {
2471
9.54k
        continue;
2472
9.54k
      }
2473
2474
0
      VLOG(2) << "Got response " << AsString(index_op->response()) << " for "
2475
0
              << AsString(index_op->request());
2476
5
      if (index_op->response().status() != QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) {
2477
5
        failed_indexes->insert(index_op->table()->id());
2478
5
        const string& error_message = index_op->response().error_message();
2479
5
        error_msg_cnts[error_message]++;
2480
0
        VLOG_WITH_PREFIX(3) << "Failing index " << index_op->table()->id()
2481
0
                            << " due to non-retryable errors " << error_message;
2482
5
        continue;
2483
5
      }
2484
2485
0
      failed_ops.push_back(index_op);
2486
0
      session->Apply(index_op);
2487
0
    }
2488
2489
2.48k
    if (!failed_ops.empty()) {
2490
0
      VLOG(1) << Format("Flushing $0 failed ops again to the index", failed_ops.size());
2491
0
    }
2492
2.48k
    pending_ops = std::move(failed_ops);
2493
2.48k
  } while (!pending_ops.empty() && --retries_left > 0);
2494
2495
2.48k
  if (!failed_indexes->empty()) {
2496
0
    VLOG_WITH_PREFIX(1) << "Failed due to non-retryable errors " << AsString(*failed_indexes);
2497
5
  }
2498
2.48k
  if (!pending_ops.empty()) {
2499
0
    for (auto index_op : pending_ops) {
2500
0
      failed_indexes->insert(index_op->table()->id());
2501
0
      const string& error_message = index_op->response().error_message();
2502
0
      error_msg_cnts[error_message]++;
2503
0
    }
2504
0
    VLOG_WITH_PREFIX(1) << "Failed indexes including retryable and non-retryable errors are "
2505
0
                        << AsString(*failed_indexes);
2506
0
  }
2507
2.48k
  return (
2508
2.48k
      failed_indexes->empty()
2509
2.49k
          ? Status::OK()
2510
18.4E
          : STATUS_SUBSTITUTE(
2511
2.48k
                IllegalState, "Index op failed for $0 requests after $1 retries with errors: $2",
2512
2.48k
                pending_ops.size(), num_retries, AsString(error_msg_cnts)));
2513
2.48k
}
_ZN2yb6tablet6Tablet16FlushWithRetriesINS_6client11YBqlWriteOpEEENS_6StatusENSt3__110shared_ptrINS3_9YBSessionEEERKNS6_6vectorINS7_IT_EENS6_9allocatorISC_EEEEiPNS6_13unordered_setINS6_12basic_stringIcNS6_11char_traitsIcEENSD_IcEEEENS6_4hashISN_EENS6_8equal_toISN_EENSD_ISN_EEEE
Line
Count
Source
2461
2.48k
    std::unordered_set<TableId>* failed_indexes) {
2462
2.48k
  auto retries_left = num_retries;
2463
2.48k
  std::vector<std::shared_ptr<SomeYBqlOp>> pending_ops = index_ops;
2464
2.48k
  std::unordered_map<string, int32_t> error_msg_cnts;
2465
2.48k
  do {
2466
2.48k
    std::vector<std::shared_ptr<SomeYBqlOp>> failed_ops;
2467
2.48k
    RETURN_NOT_OK_PREPEND(session->Flush(), "Flush failed.");
2468
18.4E
    VLOG(3) << "Done flushing ops to the index";
2469
9.55k
    for (auto index_op : pending_ops) {
2470
9.55k
      if (index_op->response().status() == QLResponsePB::YQL_STATUS_OK) {
2471
9.54k
        continue;
2472
9.54k
      }
2473
2474
0
      VLOG(2) << "Got response " << AsString(index_op->response()) << " for "
2475
0
              << AsString(index_op->request());
2476
5
      if (index_op->response().status() != QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) {
2477
5
        failed_indexes->insert(index_op->table()->id());
2478
5
        const string& error_message = index_op->response().error_message();
2479
5
        error_msg_cnts[error_message]++;
2480
0
        VLOG_WITH_PREFIX(3) << "Failing index " << index_op->table()->id()
2481
0
                            << " due to non-retryable errors " << error_message;
2482
5
        continue;
2483
5
      }
2484
2485
0
      failed_ops.push_back(index_op);
2486
0
      session->Apply(index_op);
2487
0
    }
2488
2489
2.48k
    if (!failed_ops.empty()) {
2490
0
      VLOG(1) << Format("Flushing $0 failed ops again to the index", failed_ops.size());
2491
0
    }
2492
2.48k
    pending_ops = std::move(failed_ops);
2493
2.48k
  } while (!pending_ops.empty() && --retries_left > 0);
2494
2495
2.48k
  if (!failed_indexes->empty()) {
2496
0
    VLOG_WITH_PREFIX(1) << "Failed due to non-retryable errors " << AsString(*failed_indexes);
2497
5
  }
2498
2.48k
  if (!pending_ops.empty()) {
2499
0
    for (auto index_op : pending_ops) {
2500
0
      failed_indexes->insert(index_op->table()->id());
2501
0
      const string& error_message = index_op->response().error_message();
2502
0
      error_msg_cnts[error_message]++;
2503
0
    }
2504
0
    VLOG_WITH_PREFIX(1) << "Failed indexes including retryable and non-retryable errors are "
2505
0
                        << AsString(*failed_indexes);
2506
0
  }
2507
2.48k
  return (
2508
2.48k
      failed_indexes->empty()
2509
2.49k
          ? Status::OK()
2510
18.4E
          : STATUS_SUBSTITUTE(
2511
2.48k
                IllegalState, "Index op failed for $0 requests after $1 retries with errors: $2",
2512
2.48k
                pending_ops.size(), num_retries, AsString(error_msg_cnts)));
2513
2.48k
}
Unexecuted instantiation: _ZN2yb6tablet6Tablet16FlushWithRetriesINS_6client10YBqlReadOpEEENS_6StatusENSt3__110shared_ptrINS3_9YBSessionEEERKNS6_6vectorINS7_IT_EENS6_9allocatorISC_EEEEiPNS6_13unordered_setINS6_12basic_stringIcNS6_11char_traitsIcEENSD_IcEEEENS6_4hashISN_EENS6_8equal_toISN_EENSD_ISN_EEEE
2514
2515
Status Tablet::VerifyIndexTableConsistencyForCQL(
2516
    const std::vector<IndexInfo>& indexes,
2517
    const std::string& start_key,
2518
    const int num_rows,
2519
    const CoarseTimePoint deadline,
2520
    const HybridTime read_time,
2521
    std::unordered_map<TableId, uint64>* consistency_stats,
2522
0
    std::string* verified_until) {
2523
0
  std::vector<TableId> index_ids = GetIndexIds(indexes);
2524
0
  std::vector<yb::ColumnSchema> columns = GetColumnSchemasForIndex(indexes);
2525
0
  return VerifyTableConsistencyForCQL(
2526
0
      index_ids, columns, start_key, num_rows, deadline, read_time, false, consistency_stats,
2527
0
      verified_until);
2528
0
}
2529
2530
Status Tablet::VerifyMainTableConsistencyForCQL(
2531
    const TableId& main_table_id,
2532
    const std::string& start_key,
2533
    const int num_rows,
2534
    const CoarseTimePoint deadline,
2535
    const HybridTime read_time,
2536
    std::unordered_map<TableId, uint64>* consistency_stats,
2537
0
    std::string* verified_until) {
2538
0
  const std::vector<yb::ColumnSchema>& columns = schema()->columns();
2539
0
  const std::vector<TableId>& table_ids = {main_table_id};
2540
0
  return VerifyTableConsistencyForCQL(
2541
0
      table_ids, columns, start_key, num_rows, deadline, read_time, true, consistency_stats,
2542
0
      verified_until);
2543
0
}
2544
2545
Status Tablet::VerifyTableConsistencyForCQL(
2546
    const std::vector<TableId>& table_ids,
2547
    const std::vector<yb::ColumnSchema>& columns,
2548
    const std::string& start_key,
2549
    const int num_rows,
2550
    const CoarseTimePoint deadline,
2551
    const HybridTime read_time,
2552
    const bool is_main_table,
2553
    std::unordered_map<TableId, uint64>* consistency_stats,
2554
0
    std::string* verified_until) {
2555
0
  Schema projection(columns, {}, schema()->num_key_columns());
2556
0
  auto iter = VERIFY_RESULT(NewRowIterator(
2557
0
      projection, ReadHybridTime::SingleTime(read_time), "" /* table_id */, deadline));
2558
2559
0
  if (!start_key.empty()) {
2560
0
    VLOG(2) << "Starting verify index from " << b2a_hex(start_key);
2561
0
    RETURN_NOT_OK(iter->SeekTuple(Slice(start_key)));
2562
0
  }
2563
2564
0
  constexpr int kProgressInterval = 1000;
2565
0
  CoarseTimePoint last_flushed_at;
2566
2567
0
  QLTableRow row;
2568
0
  std::vector<std::pair<const TableId, QLReadRequestPB>> requests;
2569
0
  std::unordered_set<TableId> failed_indexes;
2570
0
  std::string resume_verified_from;
2571
2572
0
  int rows_verified = 0;
2573
0
  while (VERIFY_RESULT(iter->HasNext()) && rows_verified < num_rows &&
2574
0
         CoarseMonoClock::Now() < deadline) {
2575
0
    resume_verified_from = VERIFY_RESULT(iter->GetTupleId()).ToBuffer();
2576
0
    RETURN_NOT_OK(iter->NextRow(&row));
2577
0
    VLOG(1) << "Verifying index for main table row: " << row.ToString();
2578
2579
0
    RETURN_NOT_OK(VerifyTableInBatches(
2580
0
        row, table_ids, read_time, deadline, is_main_table, &requests, &last_flushed_at,
2581
0
        &failed_indexes, consistency_stats));
2582
0
    if (++rows_verified % kProgressInterval == 0) {
2583
0
      VLOG(1) << "Verified " << rows_verified << " rows";
2584
0
    }
2585
0
    *verified_until = resume_verified_from;
2586
0
  }
2587
0
  return FlushVerifyBatch(
2588
0
      read_time, deadline, &requests, &last_flushed_at, &failed_indexes, consistency_stats);
2589
0
}
2590
2591
namespace {
2592
2593
0
QLConditionPB* InitWhereOp(QLReadRequestPB* req) {
2594
  // Add the hash column values
2595
0
  DCHECK(req->hashed_column_values().empty());
2596
2597
  // Add the range column values to the where clause
2598
0
  QLConditionPB* where_pb = req->mutable_where_expr()->mutable_condition();
2599
0
  if (!where_pb->has_op()) {
2600
0
    where_pb->set_op(QL_OP_AND);
2601
0
  }
2602
0
  DCHECK_EQ(where_pb->op(), QL_OP_AND);
2603
0
  return where_pb;
2604
0
}
2605
2606
0
void SetSelectedExprToTrue(QLReadRequestPB* req) {
2607
  // Set TRUE as selected exprs helps reduce
2608
  // the need for row retrieval in the index read request
2609
0
  req->add_selected_exprs()->mutable_value()->set_bool_value(true);
2610
0
  QLRSRowDescPB* rsrow_desc = req->mutable_rsrow_desc();
2611
0
  QLRSColDescPB* rscol_desc = rsrow_desc->add_rscol_descs();
2612
0
  rscol_desc->set_name("1");
2613
0
  rscol_desc->mutable_ql_type()->set_main(yb::DataType::BOOL);
2614
0
}
2615
2616
Status WhereMainTableToPB(
2617
    const QLTableRow& key,
2618
    const IndexInfo& index_info,
2619
    const Schema& main_table_schema,
2620
0
    QLReadRequestPB* req) {
2621
0
  std::unordered_map<ColumnId, ColumnId> column_id_map;
2622
0
  for (const auto& col : index_info.columns()) {
2623
0
    column_id_map.insert({col.indexed_column_id, col.column_id});
2624
0
  }
2625
2626
0
  auto column_refs = req->mutable_column_refs();
2627
0
  QLConditionPB* where_pb = InitWhereOp(req);
2628
2629
0
  for (const auto& col_id : main_table_schema.column_ids()) {
2630
0
    if (main_table_schema.is_hash_key_column(col_id)) {
2631
0
      *req->add_hashed_column_values()->mutable_value() = *key.GetValue(column_id_map[col_id]);
2632
0
      column_refs->add_ids(col_id);
2633
0
    } else {
2634
0
      auto it = column_id_map.find(col_id);
2635
0
      if (it != column_id_map.end()) {
2636
0
        QLConditionPB* col_cond_pb = where_pb->add_operands()->mutable_condition();
2637
0
        col_cond_pb->set_op(QL_OP_EQUAL);
2638
0
        col_cond_pb->add_operands()->set_column_id(col_id);
2639
0
        *col_cond_pb->add_operands()->mutable_value() = *key.GetValue(it->second);
2640
0
        column_refs->add_ids(col_id);
2641
0
      }
2642
0
    }
2643
0
  }
2644
2645
0
  SetSelectedExprToTrue(req);
2646
0
  return Status::OK();
2647
0
}
2648
2649
// Schema is index schema while key is row from main table
2650
Status WhereIndexToPB(
2651
    const QLTableRow& key,
2652
    const IndexInfo& index_info,
2653
    const Schema& schema,
2654
0
    QLReadRequestPB* req) {
2655
0
  QLConditionPB* where_pb = InitWhereOp(req);
2656
0
  auto column_refs = req->mutable_column_refs();
2657
2658
0
  for (size_t idx = 0; idx < index_info.columns().size(); idx++) {
2659
0
    const ColumnId& column_id = index_info.column(idx).column_id;
2660
0
    const ColumnId& indexed_column_id = index_info.column(idx).indexed_column_id;
2661
0
    if (schema.is_hash_key_column(column_id)) {
2662
0
      *req->add_hashed_column_values()->mutable_value() = *key.GetValue(indexed_column_id);
2663
0
    } else {
2664
0
      QLConditionPB* col_cond_pb = where_pb->add_operands()->mutable_condition();
2665
0
      col_cond_pb->set_op(QL_OP_EQUAL);
2666
0
      col_cond_pb->add_operands()->set_column_id(column_id);
2667
0
      *col_cond_pb->add_operands()->mutable_value() = *key.GetValue(indexed_column_id);
2668
0
    }
2669
0
    column_refs->add_ids(column_id);
2670
0
  }
2671
2672
0
  SetSelectedExprToTrue(req);
2673
0
  return Status::OK();
2674
0
}
2675
2676
}  // namespace
2677
2678
Status Tablet::VerifyTableInBatches(
2679
    const QLTableRow& row,
2680
    const std::vector<TableId>& table_ids,
2681
    const HybridTime read_time,
2682
    const CoarseTimePoint deadline,
2683
    const bool is_main_table,
2684
    std::vector<std::pair<const TableId, QLReadRequestPB>>* requests,
2685
    CoarseTimePoint* last_flushed_at,
2686
    std::unordered_set<TableId>* failed_indexes,
2687
0
    std::unordered_map<TableId, uint64>* consistency_stats) {
2688
0
  auto client = client_future_.get();
2689
0
  auto local_index_info = metadata_->primary_table_info()->index_info.get();
2690
0
  for (const TableId& table_id : table_ids) {
2691
0
    std::shared_ptr<client::YBTable> table;
2692
0
    RETURN_NOT_OK(client->OpenTable(table_id, &table));
2693
0
    std::shared_ptr<client::YBqlReadOp> read_op(table->NewQLSelect());
2694
2695
0
    QLReadRequestPB* req = read_op->mutable_request();
2696
0
    if (is_main_table) {
2697
0
      RETURN_NOT_OK(WhereMainTableToPB(row, *local_index_info, table->InternalSchema(), req));
2698
0
    } else {
2699
0
      RETURN_NOT_OK(WhereIndexToPB(row, table->index_info(), table->InternalSchema(), req));
2700
0
    }
2701
2702
0
    requests->emplace_back(table_id, *req);
2703
0
  }
2704
2705
0
  return FlushVerifyBatchIfRequired(
2706
0
      read_time, deadline, requests, last_flushed_at, failed_indexes, consistency_stats);
2707
0
}
2708
2709
Status Tablet::FlushVerifyBatchIfRequired(
2710
    const HybridTime read_time,
2711
    const CoarseTimePoint deadline,
2712
    std::vector<std::pair<const TableId, QLReadRequestPB>>* requests,
2713
    CoarseTimePoint* last_flushed_at,
2714
    std::unordered_set<TableId>* failed_indexes,
2715
0
    std::unordered_map<TableId, uint64>* consistency_stats) {
2716
0
  if (requests->size() < FLAGS_verify_index_read_batch_size) {
2717
0
    return Status::OK();
2718
0
  }
2719
0
  return FlushVerifyBatch(
2720
0
      read_time, deadline, requests, last_flushed_at, failed_indexes, consistency_stats);
2721
0
}
2722
2723
Status Tablet::FlushVerifyBatch(
2724
    const HybridTime read_time,
2725
    const CoarseTimePoint deadline,
2726
    std::vector<std::pair<const TableId, QLReadRequestPB>>* requests,
2727
    CoarseTimePoint* last_flushed_at,
2728
    std::unordered_set<TableId>* failed_indexes,
2729
0
    std::unordered_map<TableId, uint64>* consistency_stats) {
2730
0
  std::vector<client::YBqlReadOpPtr> read_ops;
2731
0
  std::shared_ptr<YBSession> session = VERIFY_RESULT(GetSessionForVerifyOrBackfill(deadline));
2732
2733
0
  auto client = client_future_.get();
2734
0
  for (auto& pair : *requests) {
2735
0
    client::YBTablePtr table;
2736
0
    RETURN_NOT_OK(client->OpenTable(pair.first, &table));
2737
2738
0
    client::YBqlReadOpPtr read_op(table->NewQLRead());
2739
0
    read_op->mutable_request()->Swap(&pair.second);
2740
0
    read_op->SetReadTime(ReadHybridTime::SingleTime(read_time));
2741
2742
0
    session->Apply(read_op);
2743
2744
    // Note: always emplace at tail because row keys must
2745
    // correspond sequentially with the read_ops in the vector
2746
0
    read_ops.push_back(read_op);
2747
0
  }
2748
2749
0
  RETURN_NOT_OK(FlushWithRetries(session, read_ops, 0, failed_indexes));
2750
2751
0
  for (size_t idx = 0; idx < requests->size(); idx++) {
2752
0
    const client::YBqlReadOpPtr& read_op = read_ops[idx];
2753
0
    auto row_block = read_op->MakeRowBlock();
2754
0
    if (row_block && row_block->row_count() == 1) continue;
2755
0
    (*consistency_stats)[read_op->table()->id()]++;
2756
0
  }
2757
2758
0
  SleepToThrottleRate(requests, FLAGS_verify_index_rate_rows_per_sec, last_flushed_at);
2759
0
  *last_flushed_at = CoarseMonoClock::Now();
2760
0
  requests->clear();
2761
2762
0
  return Status::OK();
2763
0
}
2764
2765
ScopedRWOperationPause Tablet::PauseReadWriteOperations(
2766
465k
    const Abortable abortable, const Stop stop) {
2767
465k
  VTRACE(1, LogPrefix());
2768
465k
  LOG_SLOW_EXECUTION(WARNING, 1000,
2769
466k
                     Substitute("$0Waiting for pending ops to complete", LogPrefix())) {
2770
466k
    return ScopedRWOperationPause(
2771
257k
        abortable ? &pending_abortable_op_counter_ : &pending_non_abortable_op_counter_,
2772
466k
        CoarseMonoClock::Now() +
2773
466k
            MonoDelta::FromMilliseconds(FLAGS_tablet_rocksdb_ops_quiet_down_timeout_ms),
2774
466k
        stop);
2775
466k
  }
2776
18.4E
  FATAL_ERROR("Unreachable code -- the previous block must always return");
2777
18.4E
}
2778
2779
90
ScopedRWOperation Tablet::CreateAbortableScopedRWOperation(const CoarseTimePoint deadline) const {
2780
90
  return ScopedRWOperation(&pending_abortable_op_counter_, deadline);
2781
90
}
2782
2783
ScopedRWOperation Tablet::CreateNonAbortableScopedRWOperation(
2784
43.5M
    const CoarseTimePoint deadline) const {
2785
43.5M
  return ScopedRWOperation(&pending_non_abortable_op_counter_, deadline);
2786
43.5M
}
2787
2788
Status Tablet::ModifyFlushedFrontier(
2789
    const docdb::ConsensusFrontier& frontier,
2790
160k
    rocksdb::FrontierModificationMode mode) {
2791
160k
  const Status s = regular_db_->ModifyFlushedFrontier(frontier.Clone(), mode);
2792
160k
  if (PREDICT_FALSE(!s.ok())) {
2793
0
    auto status = STATUS(IllegalState, "Failed to set flushed frontier", s.ToString());
2794
0
    LOG_WITH_PREFIX(WARNING) << status;
2795
0
    return status;
2796
0
  }
2797
160k
  {
2798
160k
    auto flushed_frontier = regular_db_->GetFlushedFrontier();
2799
160k
    const auto& consensus_flushed_frontier = *down_cast<docdb::ConsensusFrontier*>(
2800
160k
        flushed_frontier.get());
2801
160k
    DCHECK_EQ(frontier.op_id(), consensus_flushed_frontier.op_id());
2802
160k
    DCHECK_EQ(frontier.hybrid_time(), consensus_flushed_frontier.hybrid_time());
2803
160k
  }
2804
2805
160k
  if (FLAGS_TEST_tablet_verify_flushed_frontier_after_modifying &&
2806
0
      mode == rocksdb::FrontierModificationMode::kForce) {
2807
0
    LOG(INFO) << "Verifying that flushed frontier was force-set successfully";
2808
0
    string test_data_dir = VERIFY_RESULT(Env::Default()->GetTestDirectory());
2809
0
    const string checkpoint_dir_for_test = Format(
2810
0
        "$0/test_checkpoint_$1_$2", test_data_dir, tablet_id(), MonoTime::Now().ToUint64());
2811
0
    RETURN_NOT_OK(
2812
0
        rocksdb::checkpoint::CreateCheckpoint(regular_db_.get(), checkpoint_dir_for_test));
2813
0
    auto se = ScopeExit([checkpoint_dir_for_test] {
2814
0
      CHECK_OK(Env::Default()->DeleteRecursively(checkpoint_dir_for_test));
2815
0
    });
2816
0
    rocksdb::Options rocksdb_options;
2817
0
    docdb::InitRocksDBOptions(
2818
0
        &rocksdb_options, LogPrefix(), /* statistics */ nullptr, tablet_options_);
2819
0
    rocksdb_options.create_if_missing = false;
2820
0
    LOG_WITH_PREFIX(INFO) << "Opening the test RocksDB at " << checkpoint_dir_for_test
2821
0
        << ", expecting to see flushed frontier of " << frontier.ToString();
2822
0
    std::unique_ptr<rocksdb::DB> test_db = VERIFY_RESULT(
2823
0
        rocksdb::DB::Open(rocksdb_options, checkpoint_dir_for_test));
2824
0
    LOG_WITH_PREFIX(INFO) << "Getting flushed frontier from test RocksDB at "
2825
0
                          << checkpoint_dir_for_test;
2826
0
    auto restored_flushed_frontier = test_db->GetFlushedFrontier();
2827
0
    if (!restored_flushed_frontier) {
2828
0
      LOG_WITH_PREFIX(FATAL) << LogPrefix() << "Restored flushed frontier not present";
2829
0
    }
2830
0
    CHECK_EQ(
2831
0
        frontier,
2832
0
        down_cast<docdb::ConsensusFrontier&>(*restored_flushed_frontier));
2833
0
    LOG_WITH_PREFIX(INFO) << "Successfully verified persistently stored flushed frontier: "
2834
0
        << frontier.ToString();
2835
0
  }
2836
2837
160k
  if (intents_db_) {
2838
    // It is OK to flush intents even if the regular DB is not yet flushed,
2839
    // because it would wait for flush of regular DB if we have unflushed intents.
2840
    // Otherwise it does not matter which flushed op id is stored.
2841
82.1k
    RETURN_NOT_OK(intents_db_->ModifyFlushedFrontier(frontier.Clone(), mode));
2842
82.1k
  }
2843
2844
160k
  return Flush(FlushMode::kAsync);
2845
160k
}
2846
2847
159k
Status Tablet::Truncate(TruncateOperation* operation) {
2848
159k
  if (metadata_->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
2849
    // We use only Raft log for transaction status table.
2850
0
    return Status::OK();
2851
0
  }
2852
2853
159k
  auto op_pauses = VERIFY_RESULT(StartShutdownRocksDBs(DisableFlushOnShutdown::kTrue));
2854
2855
  // Check if tablet is in shutdown mode.
2856
159k
  if (IsShutdownRequested()) {
2857
1
    return STATUS(IllegalState, "Tablet was shut down");
2858
1
  }
2859
2860
159k
  const rocksdb::SequenceNumber sequence_number = regular_db_->GetLatestSequenceNumber();
2861
159k
  const string db_dir = regular_db_->GetName();
2862
2863
159k
  auto s = CompleteShutdownRocksDBs(Destroy::kTrue, &op_pauses);
2864
159k
  if (PREDICT_FALSE(!s.ok())) {
2865
0
    LOG_WITH_PREFIX(WARNING) << "Failed to clean up db dir " << db_dir << ": " << s;
2866
0
    return STATUS(IllegalState, "Failed to clean up db dir", s.ToString());
2867
0
  }
2868
2869
  // Create a new database.
2870
  // Note: db_dir == metadata()->rocksdb_dir() is still valid db dir.
2871
159k
  s = OpenKeyValueTablet();
2872
159k
  if (PREDICT_FALSE(!s.ok())) {
2873
0
    LOG_WITH_PREFIX(WARNING) << "Failed to create a new db: " << s;
2874
0
    return s;
2875
0
  }
2876
2877
159k
  docdb::ConsensusFrontier frontier;
2878
159k
  frontier.set_op_id(operation->op_id());
2879
159k
  frontier.set_hybrid_time(operation->hybrid_time());
2880
  // We use the kUpdate mode here, because unlike the case of restoring a snapshot to a completely
2881
  // different tablet in an arbitrary Raft group, here there is no possibility of the flushed
2882
  // frontier needing to go backwards.
2883
159k
  RETURN_NOT_OK(ModifyFlushedFrontier(frontier, rocksdb::FrontierModificationMode::kUpdate));
2884
2885
159k
  LOG_WITH_PREFIX(INFO) << "Created new db for truncated tablet";
2886
159k
  LOG_WITH_PREFIX(INFO) << "Sequence numbers: old=" << sequence_number
2887
159k
                        << ", new=" << regular_db_->GetLatestSequenceNumber();
2888
  // Ensure that op_pauses stays in scope throughout this function.
2889
319k
  for (auto* op_pause : op_pauses.AsArray()) {
2890
319k
    DFATAL_OR_RETURN_NOT_OK(op_pause->status());
2891
319k
  }
2892
159k
  return DoEnableCompactions();
2893
159k
}
2894
2895
6.07M
void Tablet::UpdateMonotonicCounter(int64_t value) {
2896
6.07M
  int64_t counter = monotonic_counter_;
2897
6.07M
  while (true) {
2898
6.07M
    if (counter >= value) {
2899
6.07M
      break;
2900
6.07M
    }
2901
120
    if (monotonic_counter_.compare_exchange_weak(counter, value)) {
2902
120
      break;
2903
120
    }
2904
72
  }
2905
6.07M
}
2906
2907
////////////////////////////////////////////////////////////
2908
// Tablet
2909
////////////////////////////////////////////////////////////
2910
2911
89.1k
Result<bool> Tablet::HasSSTables() const {
2912
89.1k
  if (!regular_db_) {
2913
29.1k
    return false;
2914
29.1k
  }
2915
2916
60.0k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
2917
60.0k
  RETURN_NOT_OK(scoped_read_operation);
2918
2919
60.0k
  std::vector<rocksdb::LiveFileMetaData> live_files_metadata;
2920
60.0k
  regular_db_->GetLiveFilesMetaData(&live_files_metadata);
2921
60.0k
  return !live_files_metadata.empty();
2922
60.0k
}
2923
2924
7.43M
yb::OpId MaxPersistentOpIdForDb(rocksdb::DB* db, bool invalid_if_no_new_data) {
2925
  // A possible race condition could happen, when data is written between this query and
2926
  // actual log gc. But it is not a problem as long as we are reading committed op id
2927
  // before MaxPersistentOpId, since we always keep last committed entry in the log during garbage
2928
  // collection.
2929
  // See TabletPeer::GetEarliestNeededLogIndex
2930
7.43M
  if (db == nullptr ||
2931
5.44M
      (invalid_if_no_new_data &&
2932
5.44M
       db->GetFlushAbility() == rocksdb::FlushAbility::kNoNewData)) {
2933
5.44M
    return yb::OpId::Invalid();
2934
5.44M
  }
2935
2936
1.99M
  rocksdb::UserFrontierPtr frontier = db->GetFlushedFrontier();
2937
1.99M
  if (!frontier) {
2938
1.57M
    return yb::OpId();
2939
1.57M
  }
2940
2941
413k
  return down_cast<docdb::ConsensusFrontier*>(frontier.get())->op_id();
2942
413k
}
2943
2944
3.71M
Result<DocDbOpIds> Tablet::MaxPersistentOpId(bool invalid_if_no_new_data) const {
2945
3.71M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
2946
3.71M
  RETURN_NOT_OK(scoped_read_operation);
2947
2948
3.71M
  return DocDbOpIds{
2949
3.71M
      MaxPersistentOpIdForDb(regular_db_.get(), invalid_if_no_new_data),
2950
3.71M
      MaxPersistentOpIdForDb(intents_db_.get(), invalid_if_no_new_data)
2951
3.71M
  };
2952
3.71M
}
2953
2954
3.71M
void Tablet::FlushIntentsDbIfNecessary(const yb::OpId& lastest_log_entry_op_id) {
2955
3.71M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
2956
3.71M
  if (!scoped_read_operation.ok()) {
2957
1
    return;
2958
1
  }
2959
2960
3.71M
  auto intents_frontier = intents_db_
2961
1.99M
      ? MemTableFrontierFromDb(intents_db_.get(), rocksdb::UpdateUserValueType::kLargest) : nullptr;
2962
3.71M
  if (intents_frontier) {
2963
426k
    auto index_delta =
2964
426k
        lastest_log_entry_op_id.index -
2965
426k
        down_cast<docdb::ConsensusFrontier*>(intents_frontier.get())->op_id().index;
2966
426k
    if (index_delta > FLAGS_num_raft_ops_to_force_idle_intents_db_to_flush) {
2967
2
      auto intents_flush_ability = intents_db_->GetFlushAbility();
2968
2
      if (intents_flush_ability == rocksdb::FlushAbility::kHasNewData) {
2969
2
        LOG_WITH_PREFIX(INFO)
2970
2
            << "Force flushing intents DB since it was not flushed for " << index_delta
2971
2
            << " operations, while only "
2972
2
            << FLAGS_num_raft_ops_to_force_idle_intents_db_to_flush << " is allowed";
2973
2
        rocksdb::FlushOptions options;
2974
2
        options.wait = false;
2975
2
        WARN_NOT_OK(intents_db_->Flush(options), "Flush intents db failed");
2976
2
      }
2977
2
    }
2978
426k
  }
2979
3.71M
}
2980
2981
8.22M
bool Tablet::IsTransactionalRequest(bool is_ysql_request) const {
2982
  // We consider all YSQL tables within the sys catalog transactional.
2983
8.22M
  return txns_enabled_ && (
2984
8.22M
      schema()->table_properties().is_transactional() ||
2985
7.78M
          (is_sys_catalog_ && is_ysql_request));
2986
8.22M
}
2987
2988
1.73k
Result<HybridTime> Tablet::MaxPersistentHybridTime() const {
2989
1.73k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
2990
1.73k
  RETURN_NOT_OK(scoped_read_operation);
2991
2992
1.73k
  if (!regular_db_) {
2993
192
    return HybridTime::kMin;
2994
192
  }
2995
2996
1.53k
  HybridTime result = HybridTime::kMin;
2997
1.53k
  auto temp = regular_db_->GetFlushedFrontier();
2998
1.53k
  if (temp) {
2999
686
    result.MakeAtLeast(down_cast<docdb::ConsensusFrontier*>(temp.get())->hybrid_time());
3000
686
  }
3001
1.53k
  if (intents_db_) {
3002
133
    temp = intents_db_->GetFlushedFrontier();
3003
133
    if (temp) {
3004
64
      result.MakeAtLeast(down_cast<docdb::ConsensusFrontier*>(temp.get())->hybrid_time());
3005
64
    }
3006
133
  }
3007
1.53k
  return result;
3008
1.53k
}
3009
3010
2.53k
Result<HybridTime> Tablet::OldestMutableMemtableWriteHybridTime() const {
3011
2.53k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
3012
2.53k
  RETURN_NOT_OK(scoped_read_operation);
3013
3014
2.53k
  HybridTime result = HybridTime::kMax;
3015
5.07k
  for (auto* db : { regular_db_.get(), intents_db_.get() }) {
3016
5.07k
    if (db) {
3017
1.70k
      auto mem_frontier = MemTableFrontierFromDb(db, rocksdb::UpdateUserValueType::kSmallest);
3018
1.70k
      if (mem_frontier) {
3019
820
        const auto hybrid_time =
3020
820
            static_cast<const docdb::ConsensusFrontier&>(*mem_frontier).hybrid_time();
3021
820
        result = std::min(result, hybrid_time);
3022
820
      }
3023
1.70k
    }
3024
5.07k
  }
3025
2.53k
  return result;
3026
2.53k
}
3027
3028
8.52M
const yb::SchemaPtr Tablet::schema() const {
3029
8.52M
  return metadata_->schema();
3030
8.52M
}
3031
3032
0
Status Tablet::DebugDump(vector<string> *lines) {
3033
0
  switch (table_type_) {
3034
0
    case TableType::PGSQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
3035
0
    case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
3036
0
    case TableType::REDIS_TABLE_TYPE:
3037
0
      DocDBDebugDump(lines);
3038
0
      return Status::OK();
3039
0
    case TableType::TRANSACTION_STATUS_TABLE_TYPE:
3040
0
      return Status::OK();
3041
0
  }
3042
0
  FATAL_INVALID_ENUM_VALUE(TableType, table_type_);
3043
0
}
3044
3045
0
void Tablet::DocDBDebugDump(vector<string> *lines) {
3046
0
  LOG_STRING(INFO, lines) << "Dumping tablet:";
3047
0
  LOG_STRING(INFO, lines) << "---------------------------";
3048
0
  docdb::DocDBDebugDump(regular_db_.get(), LOG_STRING(INFO, lines), docdb::StorageDbType::kRegular);
3049
0
}
3050
3051
0
Status Tablet::TEST_SwitchMemtable() {
3052
0
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3053
0
  RETURN_NOT_OK(scoped_operation);
3054
3055
0
  if (regular_db_) {
3056
0
    regular_db_->TEST_SwitchMemtable();
3057
0
  } else {
3058
0
    LOG_WITH_PREFIX(INFO) << "Ignoring TEST_SwitchMemtable: no regular RocksDB";
3059
0
  }
3060
0
  return Status::OK();
3061
0
}
3062
3063
Result<HybridTime> Tablet::DoGetSafeTime(
3064
4.90M
    RequireLease require_lease, HybridTime min_allowed, CoarseTimePoint deadline) const {
3065
4.90M
  if (require_lease == RequireLease::kFalse) {
3066
118k
    return mvcc_.SafeTimeForFollower(min_allowed, deadline);
3067
118k
  }
3068
4.78M
  FixedHybridTimeLease ht_lease;
3069
4.78M
  if (ht_lease_provider_) {
3070
    // This will block until a leader lease reaches the given value or a timeout occurs.
3071
4.76M
    auto ht_lease_result = ht_lease_provider_(min_allowed, deadline);
3072
4.76M
    if (!ht_lease_result.ok()) {
3073
5
      if (require_lease == RequireLease::kFallbackToFollower &&
3074
0
          ht_lease_result.status().IsIllegalState()) {
3075
0
        return mvcc_.SafeTimeForFollower(min_allowed, deadline);
3076
0
      }
3077
5
      return ht_lease_result.status();
3078
5
    }
3079
4.76M
    ht_lease = *ht_lease_result;
3080
4.76M
    if (min_allowed > ht_lease.time) {
3081
0
      return STATUS_FORMAT(
3082
0
          InternalError, "Read request hybrid time after current time: $0, lease: $1",
3083
0
          min_allowed, ht_lease);
3084
0
    }
3085
14.6k
  } else if (min_allowed) {
3086
1.01k
    RETURN_NOT_OK(WaitUntil(clock_.get(), min_allowed, deadline));
3087
1.01k
  }
3088
4.78M
  if (min_allowed > ht_lease.lease) {
3089
0
    return STATUS_FORMAT(
3090
0
        InternalError, "Read request hybrid time after leader lease: $0, lease: $1",
3091
0
        min_allowed, ht_lease);
3092
0
  }
3093
4.78M
  return mvcc_.SafeTime(min_allowed, deadline, ht_lease);
3094
4.78M
}
3095
3096
18.0k
ScopedRWOperationPause Tablet::PauseWritePermits(CoarseTimePoint deadline) {
3097
18.0k
  TRACE("Blocking write permit(s)");
3098
18.2k
  auto se = ScopeExit([] { TRACE("Blocking write permit(s) done"); });
3099
  // Prevent new write ops from being submitted.
3100
18.0k
  return ScopedRWOperationPause(&write_ops_being_submitted_counter_, deadline, Stop::kFalse);
3101
18.0k
}
3102
3103
1.74M
ScopedRWOperation Tablet::GetPermitToWrite(CoarseTimePoint deadline) {
3104
1.74M
  TRACE("Acquiring write permit");
3105
1.75M
  auto se = ScopeExit([] { TRACE("Acquiring write permit done"); });
3106
1.74M
  return ScopedRWOperation(&write_ops_being_submitted_counter_);
3107
1.74M
}
3108
3109
722k
Result<bool> Tablet::StillHasOrphanedPostSplitData() {
3110
722k
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3111
722k
  RETURN_NOT_OK(scoped_operation);
3112
722k
  return doc_db().key_bounds->IsInitialized() && !metadata()->has_been_fully_compacted();
3113
722k
}
3114
3115
295k
bool Tablet::MayHaveOrphanedPostSplitData() {
3116
295k
  auto res = StillHasOrphanedPostSplitData();
3117
295k
  if (!res.ok()) {
3118
223
    LOG(WARNING) << "Failed to call StillHasOrphanedPostSplitData: " << res.ToString();
3119
223
    return true;
3120
223
  }
3121
295k
  return res.get();
3122
295k
}
3123
3124
344k
bool Tablet::ShouldDisableLbMove() {
3125
344k
  auto still_has_parent_data_result = StillHasOrphanedPostSplitData();
3126
344k
  if (still_has_parent_data_result.ok()) {
3127
344k
    return still_has_parent_data_result.get();
3128
344k
  }
3129
  // If this call failed, one of three things may be true:
3130
  // 1. We are in the middle of a tablet shutdown.
3131
  //
3132
  // In this case, what we report is not of much consequence, as the load balancer shouldn't try to
3133
  // move us anyways. We choose to return false.
3134
  //
3135
  // 2. We are in the middle of a TRUNCATE.
3136
  //
3137
  // In this case, any concurrent attempted LB move should fail before trying to move data,
3138
  // since the RocksDB instances are destroyed. On top of that, we do want to allow the LB to move
3139
  // this tablet after the TRUNCATE completes, so we should return false.
3140
  //
3141
  // 3. We are in the middle of an AlterSchema operation. This is only true for tablets belonging to
3142
  //    colocated tables.
3143
  //
3144
  // In this case, we want to disable tablet moves. We conservatively return true for any failure
3145
  // if the tablet is part of a colocated table.
3146
3
  return metadata_->schema()->has_pgtable_id();
3147
3
}
3148
3149
6
void Tablet::ForceRocksDBCompactInTest() {
3150
6
  CHECK_OK(ForceFullRocksDBCompact());
3151
6
}
3152
3153
90
Status Tablet::ForceFullRocksDBCompact() {
3154
90
  auto scoped_operation = CreateAbortableScopedRWOperation();
3155
90
  RETURN_NOT_OK(scoped_operation);
3156
3157
90
  if (regular_db_) {
3158
90
    RETURN_NOT_OK(docdb::ForceRocksDBCompact(regular_db_.get()));
3159
90
  }
3160
90
  if (intents_db_) {
3161
44
    RETURN_NOT_OK_PREPEND(
3162
44
        intents_db_->Flush(rocksdb::FlushOptions()), "Pre-compaction flush of intents db failed");
3163
44
    RETURN_NOT_OK(docdb::ForceRocksDBCompact(intents_db_.get()));
3164
44
  }
3165
90
  return Status::OK();
3166
90
}
3167
3168
7
std::string Tablet::TEST_DocDBDumpStr(IncludeIntents include_intents) {
3169
7
  if (!regular_db_) return "";
3170
3171
7
  if (!include_intents) {
3172
0
    return docdb::DocDBDebugDumpToStr(doc_db().WithoutIntents());
3173
0
  }
3174
3175
7
  return docdb::DocDBDebugDumpToStr(doc_db());
3176
7
}
3177
3178
void Tablet::TEST_DocDBDumpToContainer(
3179
7
    IncludeIntents include_intents, std::unordered_set<std::string>* out) {
3180
7
  if (!regular_db_) return;
3181
3182
7
  if (!include_intents) {
3183
0
    return docdb::DocDBDebugDumpToContainer(doc_db().WithoutIntents(), out);
3184
0
  }
3185
3186
7
  return docdb::DocDBDebugDumpToContainer(doc_db(), out);
3187
7
}
3188
3189
0
void Tablet::TEST_DocDBDumpToLog(IncludeIntents include_intents) {
3190
0
  if (!regular_db_) {
3191
0
    LOG_WITH_PREFIX(INFO) << "No RocksDB to dump";
3192
0
    return;
3193
0
  }
3194
3195
0
  docdb::DumpRocksDBToLog(regular_db_.get(), StorageDbType::kRegular, LogPrefix());
3196
3197
0
  if (include_intents && intents_db_) {
3198
0
    docdb::DumpRocksDBToLog(intents_db_.get(), StorageDbType::kIntents, LogPrefix());
3199
0
  }
3200
0
}
3201
3202
0
size_t Tablet::TEST_CountRegularDBRecords() {
3203
0
  if (!regular_db_) return 0;
3204
0
  rocksdb::ReadOptions read_opts;
3205
0
  read_opts.query_id = rocksdb::kDefaultQueryId;
3206
0
  docdb::BoundedRocksDbIterator iter(regular_db_.get(), read_opts, &key_bounds_);
3207
3208
0
  size_t result = 0;
3209
0
  for (iter.SeekToFirst(); iter.Valid(); iter.Next()) {
3210
0
    ++result;
3211
0
  }
3212
0
  return result;
3213
0
}
3214
3215
template <class F>
3216
24.0M
auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const {
3217
24.0M
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3218
24.0M
  std::lock_guard<rw_spinlock> lock(component_lock_);
3219
3220
  // In order to get actual stats we would have to wait.
3221
  // This would give us correct stats but would make this request slower.
3222
24.0M
  if (!scoped_operation.ok() || !regular_db_) {
3223
5.73M
    return default_value;
3224
5.73M
  }
3225
18.2M
  return func();
3226
18.2M
}
tablet.cc:_ZNK2yb6tablet6Tablet16GetRegularDbStatIZNKS1_29GetCurrentVersionSstFilesSizeEvE3$_3EEDaRKT_RKDTclfL0p_EE
Line
Count
Source
3216
7.29k
auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const {
3217
7.29k
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3218
7.29k
  std::lock_guard<rw_spinlock> lock(component_lock_);
3219
3220
  // In order to get actual stats we would have to wait.
3221
  // This would give us correct stats but would make this request slower.
3222
7.29k
  if (!scoped_operation.ok() || !regular_db_) {
3223
2.75k
    return default_value;
3224
2.75k
  }
3225
4.54k
  return func();
3226
4.54k
}
tablet.cc:_ZNK2yb6tablet6Tablet16GetRegularDbStatIZNKS1_41GetCurrentVersionSstFilesUncompressedSizeEvE3$_4EEDaRKT_RKDTclfL0p_EE
Line
Count
Source
3216
7.29k
auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const {
3217
7.29k
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3218
7.29k
  std::lock_guard<rw_spinlock> lock(component_lock_);
3219
3220
  // In order to get actual stats we would have to wait.
3221
  // This would give us correct stats but would make this request slower.
3222
7.29k
  if (!scoped_operation.ok() || !regular_db_) {
3223
2.75k
    return default_value;
3224
2.75k
  }
3225
4.54k
  return func();
3226
4.54k
}
tablet.cc:_ZNK2yb6tablet6Tablet16GetRegularDbStatIZNKS1_33GetCurrentVersionSstFilesAllSizesEvE3$_5EEDaRKT_RKDTclfL0p_EE
Line
Count
Source
3216
295k
auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const {
3217
295k
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3218
295k
  std::lock_guard<rw_spinlock> lock(component_lock_);
3219
3220
  // In order to get actual stats we would have to wait.
3221
  // This would give us correct stats but would make this request slower.
3222
295k
  if (!scoped_operation.ok() || !regular_db_) {
3223
132k
    return default_value;
3224
132k
  }
3225
163k
  return func();
3226
163k
}
tablet.cc:_ZNK2yb6tablet6Tablet16GetRegularDbStatIZNKS1_28GetCurrentVersionNumSSTFilesEvE3$_6EEDaRKT_RKDTclfL0p_EE
Line
Count
Source
3216
23.7M
auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const {
3217
23.7M
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3218
23.7M
  std::lock_guard<rw_spinlock> lock(component_lock_);
3219
3220
  // In order to get actual stats we would have to wait.
3221
  // This would give us correct stats but would make this request slower.
3222
23.7M
  if (!scoped_operation.ok() || !regular_db_) {
3223
5.59M
    return default_value;
3224
5.59M
  }
3225
18.1M
  return func();
3226
18.1M
}
3227
3228
7.29k
uint64_t Tablet::GetCurrentVersionSstFilesSize() const {
3229
4.54k
  return GetRegularDbStat([this] {
3230
4.54k
    return regular_db_->GetCurrentVersionSstFilesSize();
3231
4.54k
  }, 0);
3232
7.29k
}
3233
3234
7.29k
uint64_t Tablet::GetCurrentVersionSstFilesUncompressedSize() const {
3235
4.54k
  return GetRegularDbStat([this] {
3236
4.54k
    return regular_db_->GetCurrentVersionSstFilesUncompressedSize();
3237
4.54k
  }, 0);
3238
7.29k
}
3239
3240
295k
std::pair<uint64_t, uint64_t> Tablet::GetCurrentVersionSstFilesAllSizes() const {
3241
163k
  return GetRegularDbStat([this] {
3242
163k
    return regular_db_->GetCurrentVersionSstFilesAllSizes();
3243
163k
  }, std::pair<uint64_t, uint64_t>(0, 0));
3244
295k
}
3245
3246
23.7M
uint64_t Tablet::GetCurrentVersionNumSSTFiles() const {
3247
18.1M
  return GetRegularDbStat([this] {
3248
18.1M
    return regular_db_->GetCurrentVersionNumSSTFiles();
3249
18.1M
  }, 0);
3250
23.7M
}
3251
3252
3.74k
std::pair<int, int> Tablet::GetNumMemtables() const {
3253
3.74k
  int intents_num_memtables = 0;
3254
3.74k
  int regular_num_memtables = 0;
3255
3256
3.74k
  {
3257
3.74k
    auto scoped_operation = CreateNonAbortableScopedRWOperation();
3258
3.74k
    std::lock_guard<rw_spinlock> lock(component_lock_);
3259
3.74k
    if (intents_db_) {
3260
      // NOTE: 1 is added on behalf of cfd->mem().
3261
1.04k
      intents_num_memtables = 1 + intents_db_->GetCfdImmNumNotFlushed();
3262
1.04k
    }
3263
3.74k
    if (regular_db_) {
3264
      // NOTE: 1 is added on behalf of cfd->mem().
3265
1.34k
      regular_num_memtables = 1 + regular_db_->GetCfdImmNumNotFlushed();
3266
1.34k
    }
3267
3.74k
  }
3268
3269
3.74k
  return std::make_pair(intents_num_memtables, regular_num_memtables);
3270
3.74k
}
3271
3272
// ------------------------------------------------------------------------------------------------
3273
3274
Result<TransactionOperationContext> Tablet::CreateTransactionOperationContext(
3275
    const TransactionMetadataPB& transaction_metadata,
3276
    bool is_ysql_catalog_table,
3277
6.86M
    const SubTransactionMetadataPB* subtransaction_metadata) const {
3278
6.86M
  if (!txns_enabled_)
3279
101k
    return TransactionOperationContext();
3280
3281
6.76M
  if (transaction_metadata.has_transaction_id()) {
3282
1.70M
    Result<TransactionId> txn_id = FullyDecodeTransactionId(
3283
1.70M
        transaction_metadata.transaction_id());
3284
1.70M
    RETURN_NOT_OK(txn_id);
3285
1.70M
    return CreateTransactionOperationContext(
3286
1.70M
        boost::make_optional(*txn_id), is_ysql_catalog_table, subtransaction_metadata);
3287
5.06M
  } else {
3288
5.06M
    return CreateTransactionOperationContext(
3289
5.06M
        /* transaction_id */ boost::none, is_ysql_catalog_table, subtransaction_metadata);
3290
5.06M
  }
3291
6.76M
}
3292
3293
Result<TransactionOperationContext> Tablet::CreateTransactionOperationContext(
3294
    const boost::optional<TransactionId>& transaction_id,
3295
    bool is_ysql_catalog_table,
3296
6.89M
    const SubTransactionMetadataPB* subtransaction_metadata) const {
3297
6.89M
  if (!txns_enabled_) {
3298
263
    return TransactionOperationContext();
3299
263
  }
3300
3301
6.89M
  const TransactionId* txn_id = nullptr;
3302
3303
6.89M
  if (transaction_id.is_initialized()) {
3304
1.70M
    txn_id = transaction_id.get_ptr();
3305
5.19M
  } else if (metadata_->schema()->table_properties().is_transactional() || is_ysql_catalog_table) {
3306
    // deadbeef-dead-beef-dead-beef00000075
3307
530k
    static const TransactionId kArbitraryTxnIdForNonTxnReads(
3308
530k
        17275436393656397278ULL, 8430738506459819486ULL);
3309
    // We still need context with transaction participant in order to resolve intents during
3310
    // possible reads.
3311
530k
    txn_id = &kArbitraryTxnIdForNonTxnReads;
3312
4.66M
  } else {
3313
4.66M
    return TransactionOperationContext();
3314
4.66M
  }
3315
3316
2.23M
  if (!subtransaction_metadata) {
3317
129k
    return TransactionOperationContext(*txn_id, transaction_participant());
3318
129k
  }
3319
3320
2.10M
  auto subtxn = VERIFY_RESULT(SubTransactionMetadata::FromPB(*subtransaction_metadata));
3321
2.10M
  return TransactionOperationContext(*txn_id, std::move(subtxn), transaction_participant());
3322
2.10M
}
3323
3324
Status Tablet::CreateReadIntents(
3325
    const TransactionMetadataPB& transaction_metadata,
3326
    const SubTransactionMetadataPB& subtransaction_metadata,
3327
    const google::protobuf::RepeatedPtrField<QLReadRequestPB>& ql_batch,
3328
    const google::protobuf::RepeatedPtrField<PgsqlReadRequestPB>& pgsql_batch,
3329
140k
    docdb::KeyValueWriteBatchPB* write_batch) {
3330
140k
  auto txn_op_ctx = VERIFY_RESULT(CreateTransactionOperationContext(
3331
140k
      transaction_metadata,
3332
140k
      /* is_ysql_catalog_table */ pgsql_batch.size() > 0 && is_sys_catalog_,
3333
140k
      &subtransaction_metadata));
3334
3335
0
  for (const auto& ql_read : ql_batch) {
3336
0
    docdb::QLReadOperation doc_op(ql_read, txn_op_ctx);
3337
0
    RETURN_NOT_OK(doc_op.GetIntents(*GetSchema(), write_batch));
3338
0
  }
3339
3340
1.33M
  for (const auto& pgsql_read : pgsql_batch) {
3341
1.33M
    docdb::PgsqlReadOperation doc_op(pgsql_read, txn_op_ctx);
3342
1.33M
    RETURN_NOT_OK(doc_op.GetIntents(*GetSchema(pgsql_read.table_id()), write_batch));
3343
1.33M
  }
3344
3345
140k
  return Status::OK();
3346
140k
}
3347
3348
2.93M
bool Tablet::ShouldApplyWrite() {
3349
2.93M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
3350
2.93M
  if (!scoped_read_operation.ok()) {
3351
0
    return false;
3352
0
  }
3353
3354
2.93M
  return !regular_db_->NeedsDelay();
3355
2.93M
}
3356
3357
676k
Result<IsolationLevel> Tablet::GetIsolationLevel(const TransactionMetadataPB& transaction) {
3358
676k
  if (transaction.has_isolation()) {
3359
443k
    return transaction.isolation();
3360
443k
  }
3361
232k
  return VERIFY_RESULT(transaction_participant_->PrepareMetadata(transaction)).isolation;
3362
232k
}
3363
3364
Result<RaftGroupMetadataPtr> Tablet::CreateSubtablet(
3365
    const TabletId& tablet_id, const Partition& partition, const docdb::KeyBounds& key_bounds,
3366
93
    const yb::OpId& split_op_id, const HybridTime& split_op_hybrid_time) {
3367
93
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
3368
93
  RETURN_NOT_OK(scoped_read_operation);
3369
3370
93
  RETURN_NOT_OK(Flush(FlushMode::kSync));
3371
3372
93
  auto metadata = VERIFY_RESULT(metadata_->CreateSubtabletMetadata(
3373
93
      tablet_id, partition, key_bounds.lower.ToStringBuffer(), key_bounds.upper.ToStringBuffer()));
3374
3375
93
  RETURN_NOT_OK(snapshots_->CreateCheckpoint(
3376
93
      metadata->rocksdb_dir(), CreateIntentsCheckpointIn::kSubDir));
3377
3378
  // We want flushed frontier to cover split_op_id, so during bootstrap of after-split tablets
3379
  // we don't replay split operation.
3380
93
  docdb::ConsensusFrontier frontier;
3381
93
  frontier.set_op_id(split_op_id);
3382
93
  frontier.set_hybrid_time(split_op_hybrid_time);
3383
3384
93
  struct RocksDbDirWithType {
3385
93
    std::string db_dir;
3386
93
    docdb::StorageDbType db_type;
3387
93
  };
3388
93
  boost::container::static_vector<RocksDbDirWithType, 2> subtablet_rocksdbs(
3389
93
      {{ metadata->rocksdb_dir(), docdb::StorageDbType::kRegular }});
3390
93
  if (intents_db_) {
3391
47
    subtablet_rocksdbs.push_back(
3392
47
        { metadata->intents_rocksdb_dir(), docdb::StorageDbType::kIntents });
3393
47
  }
3394
140
  for (auto rocksdb : subtablet_rocksdbs) {
3395
140
    rocksdb::Options rocksdb_options;
3396
140
    docdb::InitRocksDBOptions(
3397
140
        &rocksdb_options, MakeTabletLogPrefix(tablet_id, log_prefix_suffix_, rocksdb.db_type),
3398
140
        /* statistics */ nullptr, tablet_options_);
3399
140
    rocksdb_options.create_if_missing = false;
3400
    // Disable background compactions, we only need to update flushed frontier.
3401
140
    rocksdb_options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone;
3402
140
    std::unique_ptr<rocksdb::DB> db =
3403
140
        VERIFY_RESULT(rocksdb::DB::Open(rocksdb_options, rocksdb.db_dir));
3404
140
    RETURN_NOT_OK(
3405
140
        db->ModifyFlushedFrontier(frontier.Clone(), rocksdb::FrontierModificationMode::kUpdate));
3406
140
  }
3407
93
  return metadata;
3408
93
}
3409
3410
0
Result<int64_t> Tablet::CountIntents() {
3411
0
  auto pending_op = CreateNonAbortableScopedRWOperation();
3412
0
  RETURN_NOT_OK(pending_op);
3413
3414
0
  if (!intents_db_) {
3415
0
    return 0;
3416
0
  }
3417
0
  rocksdb::ReadOptions read_options;
3418
0
  auto intent_iter = std::unique_ptr<rocksdb::Iterator>(
3419
0
      intents_db_->NewIterator(read_options));
3420
0
  int64_t num_intents = 0;
3421
0
  intent_iter->SeekToFirst();
3422
0
  while (intent_iter->Valid()) {
3423
0
    num_intents++;
3424
0
    intent_iter->Next();
3425
0
  }
3426
0
  return num_intents;
3427
0
}
3428
3429
136k
void Tablet::ListenNumSSTFilesChanged(std::function<void()> listener) {
3430
136k
  std::lock_guard<std::mutex> lock(num_sst_files_changed_listener_mutex_);
3431
136k
  bool has_new_listener = listener != nullptr;
3432
136k
  bool has_old_listener = num_sst_files_changed_listener_ != nullptr;
3433
28
  LOG_IF_WITH_PREFIX(DFATAL, has_new_listener == has_old_listener)
3434
28
      << __func__ << " in wrong state, has_old_listener: " << has_old_listener;
3435
136k
  num_sst_files_changed_listener_ = std::move(listener);
3436
136k
}
3437
3438
void Tablet::InitRocksDBOptions(
3439
    rocksdb::Options* options, const std::string& log_prefix,
3440
382k
    rocksdb::BlockBasedTableOptions table_options) {
3441
382k
  docdb::InitRocksDBOptions(
3442
382k
      options, log_prefix, regulardb_statistics_, tablet_options_, std::move(table_options));
3443
382k
}
3444
3445
1.72k
rocksdb::Env& Tablet::rocksdb_env() const {
3446
1.72k
  return *tablet_options_.rocksdb_env;
3447
1.72k
}
3448
3449
2.86M
const std::string& Tablet::tablet_id() const {
3450
2.86M
  return metadata_->raft_group_id();
3451
2.86M
}
3452
3453
47
Result<std::string> Tablet::GetEncodedMiddleSplitKey() const {
3454
0
  auto error_prefix = [this]() {
3455
0
    return Format(
3456
0
        "Failed to detect middle key for tablet $0 (key_bounds: $1 - $2)",
3457
0
        tablet_id(),
3458
0
        Slice(key_bounds_.lower).ToDebugHexString(),
3459
0
        Slice(key_bounds_.upper).ToDebugHexString());
3460
0
  };
3461
3462
  // TODO(tsplit): should take key_bounds_ into account.
3463
46
  auto middle_key = VERIFY_RESULT(regular_db_->GetMiddleKey());
3464
3465
  // In some rare cases middle key can point to a special internal record which is not visible
3466
  // for a user, but tablet splitting routines expect the specific structure for partition keys
3467
  // that does not match the struct of the internally used records. Moreover, it is expected
3468
  // to have two child tablets with alive user records after the splitting, but the split
3469
  // by the internal record will lead to a case when one tablet will consist of internal records
3470
  // only and these records will be compacted out at some point making an empty tablet.
3471
46
  if (PREDICT_FALSE(docdb::IsInternalRecordKeyType(docdb::DecodeValueType(middle_key[0])))) {
3472
0
    return STATUS_FORMAT(
3473
0
        IllegalState, "$0: got internal record \"$1\"",
3474
0
        error_prefix(), Slice(middle_key).ToDebugHexString());
3475
0
  }
3476
3477
46
  const auto key_part = metadata()->partition_schema()->IsHashPartitioning()
3478
46
                            ? docdb::DocKeyPart::kUpToHashCode
3479
0
                            : docdb::DocKeyPart::kWholeDocKey;
3480
46
  const auto split_key_size = VERIFY_RESULT(DocKey::EncodedSize(middle_key, key_part));
3481
46
  if (PREDICT_FALSE(split_key_size == 0)) {
3482
    // Using this verification just to have a more sensible message. The below verification will
3483
    // not pass with split_key_size == 0 also, but its message is not accurate enough. This failure
3484
    // may happen when a key cannot be decoded with key_part inside DocKey::EncodedSize and the key
3485
    // still valid for any reason (e.g. gettining non-hash key for hash partitioning).
3486
0
    return STATUS_FORMAT(
3487
0
        IllegalState, "$0: got unexpected key \"$1\"",
3488
0
        error_prefix(), Slice(middle_key).ToDebugHexString());
3489
0
  }
3490
3491
46
  middle_key.resize(split_key_size);
3492
46
  const Slice middle_key_slice(middle_key);
3493
46
  if (middle_key_slice.compare(key_bounds_.lower) <= 0 ||
3494
46
      (!key_bounds_.upper.empty() && middle_key_slice.compare(key_bounds_.upper) >= 0)) {
3495
0
    return STATUS_FORMAT(
3496
0
        IllegalState,
3497
0
        "$0: got \"$1\". This can happen if post-split tablet wasn't fully compacted after split",
3498
0
        error_prefix(), middle_key_slice.ToDebugHexString());
3499
0
  }
3500
46
  return middle_key;
3501
46
}
3502
3503
Status Tablet::TriggerPostSplitCompactionIfNeeded(
3504
83.3k
    std::function<std::unique_ptr<ThreadPoolToken>()> get_token_for_compaction) {
3505
83.3k
  if (post_split_compaction_task_pool_token_) {
3506
0
    return STATUS(
3507
0
        IllegalState, "Already triggered post split compaction for this tablet instance.");
3508
0
  }
3509
83.3k
  if (VERIFY_RESULT(StillHasOrphanedPostSplitData())) {
3510
83
    post_split_compaction_task_pool_token_ = get_token_for_compaction();
3511
83
    return post_split_compaction_task_pool_token_->SubmitFunc(
3512
83
        std::bind(&Tablet::TriggerPostSplitCompactionSync, this));
3513
83
  }
3514
83.3k
  return Status::OK();
3515
83.3k
}
3516
3517
84
void Tablet::TriggerPostSplitCompactionSync() {
3518
84
  TEST_PAUSE_IF_FLAG(TEST_pause_before_post_split_compaction);
3519
84
  WARN_WITH_PREFIX_NOT_OK(
3520
84
      ForceFullRocksDBCompact(), LogPrefix() + "Failed to compact post-split tablet.");
3521
84
}
3522
3523
3
Status Tablet::VerifyDataIntegrity() {
3524
3
  LOG_WITH_PREFIX(INFO) << "Beginning data integrity checks on this tablet";
3525
3526
  // Verify regular db.
3527
3
  if (regular_db_) {
3528
3
    const auto& db_dir = metadata()->rocksdb_dir();
3529
3
    RETURN_NOT_OK(OpenDbAndCheckIntegrity(db_dir));
3530
3
  }
3531
3532
  // Verify intents db.
3533
1
  if (intents_db_) {
3534
0
    const auto& db_dir = metadata()->intents_rocksdb_dir();
3535
0
    RETURN_NOT_OK(OpenDbAndCheckIntegrity(db_dir));
3536
0
  }
3537
3538
1
  return Status::OK();
3539
1
}
3540
3541
3
Status Tablet::OpenDbAndCheckIntegrity(const std::string& db_dir) {
3542
  // Similar to ldb's CheckConsistency, we open db as read-only with paranoid checks on.
3543
  // If any corruption is detected then the open will fail with a Corruption status.
3544
3
  rocksdb::Options db_opts;
3545
3
  InitRocksDBOptions(&db_opts, LogPrefix());
3546
3
  db_opts.paranoid_checks = true;
3547
3548
3
  std::unique_ptr<rocksdb::DB> db;
3549
3
  rocksdb::DB* db_raw = nullptr;
3550
3
  rocksdb::Status st = rocksdb::DB::OpenForReadOnly(db_opts, db_dir, &db_raw);
3551
3
  if (db_raw != nullptr) {
3552
1
    db.reset(db_raw);
3553
1
  }
3554
3
  if (!st.ok()) {
3555
2
    if (st.IsCorruption()) {
3556
2
      LOG_WITH_PREFIX(WARNING) << "Detected rocksdb data corruption: " << st;
3557
      // TODO: should we bump metric here or in top-level validation or both?
3558
2
      metrics()->tablet_data_corruptions->Increment();
3559
2
      return st;
3560
2
    }
3561
3562
0
    LOG_WITH_PREFIX(WARNING) << "Failed to open read-only RocksDB in directory " << db_dir
3563
0
                             << ": " << st;
3564
0
    return Status::OK();
3565
0
  }
3566
3567
  // TODO: we can add more checks here to verify block contents/checksums
3568
3569
1
  return Status::OK();
3570
1
}
3571
3572
44
void Tablet::SplitDone() {
3573
44
  {
3574
44
    std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
3575
44
    if (completed_split_operation_filter_) {
3576
0
      LOG_WITH_PREFIX(DFATAL) << "Already have split operation filter";
3577
0
      return;
3578
0
    }
3579
3580
44
    completed_split_operation_filter_ = MakeFunctorOperationFilter(
3581
14
        [this](const OpId& op_id, consensus::OperationType op_type) -> Status {
3582
14
          if (SplitOperation::ShouldAllowOpAfterSplitTablet(op_type)) {
3583
3
            return Status::OK();
3584
3
          }
3585
3586
11
          auto children = metadata_->split_child_tablet_ids();
3587
11
          return SplitOperation::RejectionStatus(OpId(), op_id, op_type, children[0], children[1]);
3588
11
        });
3589
44
    operation_filters_.push_back(*completed_split_operation_filter_);
3590
3591
44
    completed_split_log_anchor_ = std::make_unique<log::LogAnchor>();
3592
3593
44
    log_anchor_registry_->Register(
3594
44
        metadata_->split_op_id().index, "Splitted tablet", completed_split_log_anchor_.get());
3595
44
  }
3596
44
}
3597
3598
89.0k
void Tablet::SyncRestoringOperationFilter(ResetSplit reset_split) {
3599
89.0k
  std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
3600
3601
89.0k
  if (reset_split) {
3602
0
    if (completed_split_log_anchor_) {
3603
0
      WARN_NOT_OK(log_anchor_registry_->Unregister(completed_split_log_anchor_.get()),
3604
0
                  "Unregister split anchor");
3605
0
      completed_split_log_anchor_ = nullptr;
3606
0
    }
3607
3608
0
    if (completed_split_operation_filter_) {
3609
0
      UnregisterOperationFilterUnlocked(completed_split_operation_filter_.get());
3610
0
      completed_split_operation_filter_ = nullptr;
3611
0
    }
3612
0
  }
3613
3614
89.0k
  if (metadata_->has_active_restoration()) {
3615
0
    if (restoring_operation_filter_) {
3616
0
      return;
3617
0
    }
3618
0
    restoring_operation_filter_ = MakeFunctorOperationFilter(
3619
0
        [](const OpId& op_id, consensus::OperationType op_type) -> Status {
3620
0
      if (SnapshotOperation::ShouldAllowOpDuringRestore(op_type)) {
3621
0
        return Status::OK();
3622
0
      }
3623
3624
0
      return SnapshotOperation::RejectionStatus(op_id, op_type);
3625
0
    });
3626
0
    operation_filters_.push_back(*restoring_operation_filter_);
3627
89.0k
  } else {
3628
89.1k
    if (!restoring_operation_filter_) {
3629
89.1k
      return;
3630
89.1k
    }
3631
3632
18.4E
    UnregisterOperationFilterUnlocked(restoring_operation_filter_.get());
3633
18.4E
    restoring_operation_filter_ = nullptr;
3634
18.4E
  }
3635
89.0k
}
3636
3637
0
Status Tablet::RestoreStarted(const TxnSnapshotRestorationId& restoration_id) {
3638
0
  metadata_->RegisterRestoration(restoration_id);
3639
0
  RETURN_NOT_OK(metadata_->Flush());
3640
3641
0
  SyncRestoringOperationFilter(ResetSplit::kTrue);
3642
3643
0
  return Status::OK();
3644
0
}
3645
3646
Status Tablet::RestoreFinished(
3647
0
    const TxnSnapshotRestorationId& restoration_id, HybridTime restoration_hybrid_time) {
3648
0
  metadata_->UnregisterRestoration(restoration_id);
3649
0
  if (restoration_hybrid_time) {
3650
0
    metadata_->SetRestorationHybridTime(restoration_hybrid_time);
3651
0
    if (transaction_participant_ && FLAGS_consistent_restore) {
3652
0
      transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time);
3653
0
    }
3654
0
  }
3655
0
  RETURN_NOT_OK(metadata_->Flush());
3656
3657
0
  SyncRestoringOperationFilter(ResetSplit::kFalse);
3658
3659
0
  return Status::OK();
3660
0
}
3661
3662
1.41k
Status Tablet::CheckRestorations(const RestorationCompleteTimeMap& restoration_complete_time) {
3663
1.41k
  auto restoration_hybrid_time = metadata_->CheckCompleteRestorations(restoration_complete_time);
3664
1.41k
  if (restoration_hybrid_time != HybridTime::kMin
3665
0
      && transaction_participant_
3666
0
      && FLAGS_consistent_restore) {
3667
0
    transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time);
3668
0
  }
3669
3670
  // We cannot do it in a single shot, because should update transaction participant before
3671
  // removing active transactions.
3672
1.41k
  if (!metadata_->CleanupRestorations(restoration_complete_time)) {
3673
1.41k
    return Status::OK();
3674
1.41k
  }
3675
3676
0
  RETURN_NOT_OK(metadata_->Flush());
3677
0
  SyncRestoringOperationFilter(ResetSplit::kFalse);
3678
3679
0
  return Status::OK();
3680
0
}
3681
3682
2.73M
Status Tablet::CheckOperationAllowed(const OpId& op_id, consensus::OperationType op_type) {
3683
2.73M
  std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
3684
405
  for (const auto& filter : operation_filters_) {
3685
405
    RETURN_NOT_OK(filter.CheckOperationAllowed(op_id, op_type));
3686
405
  }
3687
3688
2.73M
  return Status::OK();
3689
2.73M
}
3690
3691
912
void Tablet::RegisterOperationFilter(OperationFilter* filter) {
3692
912
  std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
3693
912
  operation_filters_.push_back(*filter);
3694
912
}
3695
3696
906
void Tablet::UnregisterOperationFilter(OperationFilter* filter) {
3697
906
  std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
3698
906
  UnregisterOperationFilterUnlocked(filter);
3699
906
}
3700
3701
927
void Tablet::UnregisterOperationFilterUnlocked(OperationFilter* filter) {
3702
927
  operation_filters_.erase(operation_filters_.iterator_to(*filter));
3703
927
}
3704
3705
6.65M
SchemaPtr Tablet::GetSchema(const std::string& table_id) const {
3706
6.65M
  if (table_id.empty()) {
3707
3.66M
    return metadata_->schema();
3708
3.66M
  }
3709
2.98M
  auto table_info = CHECK_RESULT(metadata_->GetTableInfo(table_id));
3710
2.98M
  return SchemaPtr(table_info, table_info->schema.get());
3711
2.98M
}
3712
3713
50.5k
Schema Tablet::GetKeySchema(const std::string& table_id) const {
3714
50.5k
  if (table_id.empty()) {
3715
2
    return *key_schema_;
3716
2
  }
3717
50.5k
  auto table_info = CHECK_RESULT(metadata_->GetTableInfo(table_id));
3718
50.5k
  return table_info->schema->CreateKeyProjection();
3719
50.5k
}
3720
3721
// ------------------------------------------------------------------------------------------------
3722
3723
Result<ScopedReadOperation> ScopedReadOperation::Create(
3724
    AbstractTablet* tablet,
3725
    RequireLease require_lease,
3726
4.87M
    ReadHybridTime read_time) {
3727
4.87M
  if (!read_time) {
3728
2.88k
    read_time = ReadHybridTime::SingleTime(VERIFY_RESULT(tablet->SafeTime(require_lease)));
3729
2.88k
  }
3730
4.87M
  auto* retention_policy = tablet->RetentionPolicy();
3731
4.87M
  if (retention_policy) {
3732
4.61M
    RETURN_NOT_OK(retention_policy->RegisterReaderTimestamp(read_time.read));
3733
4.61M
  }
3734
4.87M
  return ScopedReadOperation(tablet, read_time);
3735
4.87M
}
3736
3737
ScopedReadOperation::ScopedReadOperation(
3738
    AbstractTablet* tablet, const ReadHybridTime& read_time)
3739
4.87M
    : tablet_(tablet), read_time_(read_time) {
3740
4.87M
}
3741
3742
20.6M
ScopedReadOperation::~ScopedReadOperation() {
3743
20.6M
  Reset();
3744
20.6M
}
3745
3746
4.64M
void ScopedReadOperation::operator=(ScopedReadOperation&& rhs) {
3747
4.64M
  Reset();
3748
4.64M
  tablet_ = rhs.tablet_;
3749
4.64M
  read_time_ = rhs.read_time_;
3750
4.64M
  rhs.tablet_ = nullptr;
3751
4.64M
}
3752
3753
25.3M
void ScopedReadOperation::Reset() {
3754
25.3M
  if (tablet_) {
3755
4.86M
    auto* retention_policy = tablet_->RetentionPolicy();
3756
4.86M
    if (retention_policy) {
3757
4.58M
      retention_policy->UnregisterReaderTimestamp(read_time_.read);
3758
4.58M
    }
3759
4.86M
    tablet_ = nullptr;
3760
4.86M
  }
3761
25.3M
}
3762
3763
}  // namespace tablet
3764
}  // namespace yb