YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/tablet.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/tablet.h"
34
35
#include <boost/container/static_vector.hpp>
36
37
#include "yb/client/client.h"
38
#include "yb/client/error.h"
39
#include "yb/client/meta_data_cache.h"
40
#include "yb/client/session.h"
41
#include "yb/client/table.h"
42
#include "yb/client/transaction.h"
43
#include "yb/client/transaction_manager.h"
44
#include "yb/client/yb_op.h"
45
46
#include "yb/common/index_column.h"
47
#include "yb/common/pgsql_error.h"
48
#include "yb/common/ql_rowblock.h"
49
#include "yb/common/row_mark.h"
50
#include "yb/common/schema.h"
51
#include "yb/common/transaction_error.h"
52
#include "yb/common/wire_protocol.h"
53
54
#include "yb/consensus/consensus.pb.h"
55
#include "yb/consensus/log_anchor_registry.h"
56
#include "yb/consensus/opid_util.h"
57
58
#include "yb/docdb/compaction_file_filter.h"
59
#include "yb/docdb/conflict_resolution.h"
60
#include "yb/docdb/consensus_frontier.h"
61
#include "yb/docdb/cql_operation.h"
62
#include "yb/docdb/doc_rowwise_iterator.h"
63
#include "yb/docdb/doc_write_batch.h"
64
#include "yb/docdb/docdb.h"
65
#include "yb/docdb/docdb_compaction_filter.h"
66
#include "yb/docdb/docdb_compaction_filter_intents.h"
67
#include "yb/docdb/docdb_debug.h"
68
#include "yb/docdb/docdb_rocksdb_util.h"
69
#include "yb/docdb/pgsql_operation.h"
70
#include "yb/docdb/ql_rocksdb_storage.h"
71
#include "yb/docdb/redis_operation.h"
72
#include "yb/docdb/rocksdb_writer.h"
73
74
#include "yb/gutil/casts.h"
75
76
#include "yb/rocksdb/db/memtable.h"
77
#include "yb/rocksdb/utilities/checkpoint.h"
78
79
#include "yb/rocksutil/yb_rocksdb.h"
80
81
#include "yb/server/hybrid_clock.h"
82
83
#include "yb/tablet/operations/change_metadata_operation.h"
84
#include "yb/tablet/operations/operation.h"
85
#include "yb/tablet/operations/snapshot_operation.h"
86
#include "yb/tablet/operations/split_operation.h"
87
#include "yb/tablet/operations/truncate_operation.h"
88
#include "yb/tablet/operations/write_operation.h"
89
#include "yb/tablet/read_result.h"
90
#include "yb/tablet/snapshot_coordinator.h"
91
#include "yb/tablet/tablet_bootstrap_if.h"
92
#include "yb/tablet/tablet_metadata.h"
93
#include "yb/tablet/tablet_metrics.h"
94
#include "yb/tablet/tablet_retention_policy.h"
95
#include "yb/tablet/tablet_snapshots.h"
96
#include "yb/tablet/transaction_coordinator.h"
97
#include "yb/tablet/transaction_participant.h"
98
#include "yb/tablet/write_query.h"
99
100
#include "yb/tserver/tserver.pb.h"
101
102
#include "yb/util/debug-util.h"
103
#include "yb/util/debug/trace_event.h"
104
#include "yb/util/flag_tags.h"
105
#include "yb/util/format.h"
106
#include "yb/util/logging.h"
107
#include "yb/util/mem_tracker.h"
108
#include "yb/util/metrics.h"
109
#include "yb/util/net/net_util.h"
110
#include "yb/util/pg_util.h"
111
#include "yb/util/scope_exit.h"
112
#include "yb/util/status_format.h"
113
#include "yb/util/status_log.h"
114
#include "yb/util/stopwatch.h"
115
#include "yb/util/trace.h"
116
#include "yb/util/yb_pg_errcodes.h"
117
118
#include "yb/yql/pgwrapper/libpq_utils.h"
119
120
DEFINE_bool(tablet_do_dup_key_checks, true,
121
            "Whether to check primary keys for duplicate on insertion. "
122
            "Use at your own risk!");
123
TAG_FLAG(tablet_do_dup_key_checks, unsafe);
124
125
DEFINE_bool(tablet_do_compaction_cleanup_for_intents, true,
126
            "Whether to clean up intents for aborted transactions in compaction.");
127
128
DEFINE_int32(tablet_bloom_block_size, 4096,
129
             "Block size of the bloom filters used for tablet keys.");
130
TAG_FLAG(tablet_bloom_block_size, advanced);
131
132
DEFINE_double(tablet_bloom_target_fp_rate, 0.01f,
133
              "Target false-positive rate (between 0 and 1) to size tablet key bloom filters. "
134
              "A lower false positive rate may reduce the number of disk seeks required "
135
              "in heavy insert workloads, at the expense of more space and RAM "
136
              "required for bloom filters.");
137
TAG_FLAG(tablet_bloom_target_fp_rate, advanced);
138
139
METRIC_DEFINE_entity(table);
140
METRIC_DEFINE_entity(tablet);
141
142
// TODO: use a lower default for truncate / snapshot restore Raft operations. The one-minute timeout
143
// is probably OK for shutdown.
144
DEFINE_int32(tablet_rocksdb_ops_quiet_down_timeout_ms, 60000,
145
             "Max amount of time we can wait for read/write operations on RocksDB to finish "
146
             "so that we can perform exclusive-ownership operations on RocksDB, such as removing "
147
             "all data in the tablet by replacing the RocksDB instance with an empty one.");
148
149
DEFINE_int32(intents_flush_max_delay_ms, 2000,
150
             "Max time to wait for regular db to flush during flush of intents. "
151
             "After this time flush of regular db will be forced.");
152
153
DEFINE_int32(num_raft_ops_to_force_idle_intents_db_to_flush, 1000,
154
             "When writes to intents RocksDB are stopped and the number of Raft operations after "
155
             "the last write to the intents RocksDB "
156
             "is greater than this value, the intents RocksDB would be requested to flush.");
157
158
DEFINE_bool(delete_intents_sst_files, true,
159
            "Delete whole intents .SST files when possible.");
160
161
DEFINE_uint64(backfill_index_write_batch_size, 128, "The batch size for backfilling the index.");
162
TAG_FLAG(backfill_index_write_batch_size, advanced);
163
TAG_FLAG(backfill_index_write_batch_size, runtime);
164
165
DEFINE_int32(backfill_index_rate_rows_per_sec, 0, "Rate of at which the "
166
             "indexed table's entries are populated into the index table during index "
167
             "backfill. This is a per-tablet flag, i.e. a tserver responsible for "
168
             "multiple tablets could be processing more than this.");
169
TAG_FLAG(backfill_index_rate_rows_per_sec, advanced);
170
TAG_FLAG(backfill_index_rate_rows_per_sec, runtime);
171
172
DEFINE_uint64(verify_index_read_batch_size, 128, "The batch size for reading the index.");
173
TAG_FLAG(verify_index_read_batch_size, advanced);
174
TAG_FLAG(verify_index_read_batch_size, runtime);
175
176
DEFINE_int32(verify_index_rate_rows_per_sec, 0,
177
    "Rate of at which the indexed table's entries are read during index consistency checks."
178
    "This is a per-tablet flag, i.e. a tserver responsible for multiple tablets could be "
179
    "processing more than this.");
180
TAG_FLAG(verify_index_rate_rows_per_sec, advanced);
181
TAG_FLAG(verify_index_rate_rows_per_sec, runtime);
182
183
DEFINE_int32(backfill_index_timeout_grace_margin_ms, -1,
184
             "The time we give the backfill process to wrap up the current set "
185
             "of writes and return successfully the RPC with the information about "
186
             "how far we have processed the rows.");
187
TAG_FLAG(backfill_index_timeout_grace_margin_ms, advanced);
188
TAG_FLAG(backfill_index_timeout_grace_margin_ms, runtime);
189
190
DEFINE_bool(yql_allow_compatible_schema_versions, true,
191
            "Allow YCQL requests to be accepted even if they originate from a client who is ahead "
192
            "of the server's schema, but is determined to be compatible with the current version.");
193
TAG_FLAG(yql_allow_compatible_schema_versions, advanced);
194
TAG_FLAG(yql_allow_compatible_schema_versions, runtime);
195
196
DEFINE_bool(disable_alter_vs_write_mutual_exclusion, false,
197
             "A safety switch to disable the changes from D8710 which makes a schema "
198
             "operation take an exclusive lock making all write operations wait for it.");
199
TAG_FLAG(disable_alter_vs_write_mutual_exclusion, advanced);
200
TAG_FLAG(disable_alter_vs_write_mutual_exclusion, runtime);
201
202
DEFINE_bool(cleanup_intents_sst_files, true,
203
            "Cleanup intents files that are no more relevant to any running transaction.");
204
205
DEFINE_int32(ysql_transaction_abort_timeout_ms, 15 * 60 * 1000,  // 15 minutes
206
             "Max amount of time we can wait for active transactions to abort on a tablet "
207
             "after DDL (eg. DROP TABLE) is executed. This deadline is same as "
208
             "unresponsive_ts_rpc_timeout_ms");
209
210
DEFINE_test_flag(int32, backfill_sabotage_frequency, 0,
211
    "If set to value greater than 0, every nth row will be corrupted in the backfill process "
212
    "to create an inconsistency between the index and the indexed tables where n is the "
213
    "input parameter given.");
214
215
DEFINE_test_flag(int32, backfill_drop_frequency, 0,
216
    "If set to value greater than 0, every nth row will be dropped in the backfill process "
217
    "to create an inconsistency between the index and the indexed tables where n is the "
218
    "input parameter given.");
219
220
DEFINE_bool(tablet_enable_ttl_file_filter, false,
221
            "Enables compaction to directly delete files that have expired based on TTL, "
222
            "rather than removing them via the normal compaction process.");
223
224
DEFINE_test_flag(int32, slowdown_backfill_by_ms, 0,
225
                 "If set > 0, slows down the backfill process by this amount.");
226
227
DEFINE_test_flag(uint64, backfill_paging_size, 0,
228
                 "If set > 0, returns early after processing this number of rows.");
229
230
DEFINE_test_flag(bool, tablet_verify_flushed_frontier_after_modifying, false,
231
                 "After modifying the flushed frontier in RocksDB, verify that the restored value "
232
                 "of it is as expected. Used for testing.");
233
234
DEFINE_test_flag(bool, docdb_log_write_batches, false,
235
                 "Dump write batches being written to RocksDB");
236
237
DEFINE_test_flag(bool, export_intentdb_metrics, false,
238
                 "Dump intentsdb statistics to prometheus metrics");
239
240
DEFINE_test_flag(bool, pause_before_post_split_compaction, false,
241
                 "Pause before triggering post split compaction.");
242
243
DEFINE_test_flag(bool, disable_adding_user_frontier_to_sst, false,
244
                 "Prevents adding the UserFrontier to SST file in order to mimic older files.");
245
246
// FLAGS_TEST_disable_getting_user_frontier_from_mem_table is used in conjunction with
247
// FLAGS_TEST_disable_adding_user_frontier_to_sst.  Two flags are needed for the case in which
248
// we're writing a mixture of SST files with and without UserFrontiers, to ensure that we're
249
// not attempting to read the UserFrontier from the MemTable in either case.
250
DEFINE_test_flag(bool, disable_getting_user_frontier_from_mem_table, false,
251
                 "Prevents checking the MemTable for a UserFrontier for test cases where we are "
252
                 "generating SST files without UserFrontiers.");
253
254
DECLARE_int32(client_read_write_timeout_ms);
255
DECLARE_bool(consistent_restore);
256
DECLARE_int32(rocksdb_level0_slowdown_writes_trigger);
257
DECLARE_int32(rocksdb_level0_stop_writes_trigger);
258
DECLARE_uint64(rocksdb_max_file_size_for_compaction);
259
DECLARE_int64(apply_intents_task_injected_delay_ms);
260
DECLARE_string(regular_tablets_data_block_key_value_encoding);
261
262
DEFINE_test_flag(uint64, inject_sleep_before_applying_intents_ms, 0,
263
                 "Sleep before applying intents to docdb after transaction commit");
264
265
using namespace std::placeholders;
266
267
using std::shared_ptr;
268
using std::make_shared;
269
using std::string;
270
using std::unordered_set;
271
using std::vector;
272
using std::unique_ptr;
273
using namespace std::literals;  // NOLINT
274
275
using rocksdb::WriteBatch;
276
using rocksdb::SequenceNumber;
277
using yb::tserver::WriteRequestPB;
278
using yb::tserver::WriteResponsePB;
279
using yb::docdb::KeyValueWriteBatchPB;
280
using yb::tserver::ReadRequestPB;
281
using yb::docdb::DocOperation;
282
using yb::docdb::RedisWriteOperation;
283
using yb::docdb::QLWriteOperation;
284
using yb::docdb::PgsqlWriteOperation;
285
using yb::docdb::DocDBCompactionFilterFactory;
286
using yb::docdb::InitMarkerBehavior;
287
288
namespace yb {
289
namespace tablet {
290
291
using consensus::MaximumOpId;
292
using log::LogAnchorRegistry;
293
using strings::Substitute;
294
using base::subtle::Barrier_AtomicIncrement;
295
296
using client::ChildTransactionData;
297
using client::TransactionManager;
298
using client::YBSession;
299
using client::YBTransaction;
300
using client::YBTablePtr;
301
302
using docdb::DocKey;
303
using docdb::DocPath;
304
using docdb::DocRowwiseIterator;
305
using docdb::DocWriteBatch;
306
using docdb::SubDocKey;
307
using docdb::PrimitiveValue;
308
using docdb::StorageDbType;
309
310
////////////////////////////////////////////////////////////
311
// Tablet
312
////////////////////////////////////////////////////////////
313
314
namespace {
315
316
std::string MakeTabletLogPrefix(
317
3.02M
    const TabletId& tablet_id, const std::string& log_prefix_suffix) {
318
3.02M
  return Format("T $0$1: ", tablet_id, log_prefix_suffix);
319
3.02M
}
320
321
docdb::ConsensusFrontiers* InitFrontiers(
322
    const OpId op_id,
323
    const HybridTime log_ht,
324
11.8M
    docdb::ConsensusFrontiers* frontiers) {
325
11.8M
  if (FLAGS_TEST_disable_adding_user_frontier_to_sst) {
326
0
    return nullptr;
327
0
  }
328
11.8M
  set_op_id(op_id, frontiers);
329
11.8M
  set_hybrid_time(log_ht, frontiers);
330
11.8M
  return frontiers;
331
11.8M
}
332
333
template <class Data>
334
2.97M
docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) {
335
2.97M
  return InitFrontiers(data.op_id, data.log_ht, frontiers);
336
2.97M
}
tablet.cc:rocksdb::UserFrontiersBase<yb::docdb::ConsensusFrontier>* yb::tablet::(anonymous namespace)::InitFrontiers<yb::tablet::TransactionApplyData>(yb::tablet::TransactionApplyData const&, rocksdb::UserFrontiersBase<yb::docdb::ConsensusFrontier>*)
Line
Count
Source
334
1.30M
docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) {
335
1.30M
  return InitFrontiers(data.op_id, data.log_ht, frontiers);
336
1.30M
}
tablet.cc:rocksdb::UserFrontiersBase<yb::docdb::ConsensusFrontier>* yb::tablet::(anonymous namespace)::InitFrontiers<yb::tablet::RemoveIntentsData>(yb::tablet::RemoveIntentsData const&, rocksdb::UserFrontiersBase<yb::docdb::ConsensusFrontier>*)
Line
Count
Source
334
1.67M
docdb::ConsensusFrontiers* InitFrontiers(const Data& data, docdb::ConsensusFrontiers* frontiers) {
335
1.67M
  return InitFrontiers(data.op_id, data.log_ht, frontiers);
336
1.67M
}
337
338
rocksdb::UserFrontierPtr MemTableFrontierFromDb(
339
    rocksdb::DB* db,
340
19.0M
    rocksdb::UpdateUserValueType type) {
341
19.0M
  if (FLAGS_TEST_disable_getting_user_frontier_from_mem_table) {
342
0
    return nullptr;
343
0
  }
344
19.0M
  return db->GetMutableMemTableFrontier(type);
345
19.0M
}
346
347
} // namespace
348
349
class Tablet::RegularRocksDbListener : public rocksdb::EventListener {
350
 public:
351
  RegularRocksDbListener(Tablet* tablet, const std::string& log_prefix)
352
      : tablet_(*CHECK_NOTNULL(tablet)),
353
276k
        log_prefix_(log_prefix) {}
354
355
482
  void OnCompactionCompleted(rocksdb::DB* db, const rocksdb::CompactionJobInfo& ci) override {
356
482
    if (ci.is_full_compaction) {
357
301
      auto& metadata = *CHECK_NOTNULL(tablet_.metadata());
358
301
      if (!metadata.has_been_fully_compacted()) {
359
155
        metadata.set_has_been_fully_compacted(true);
360
155
        ERROR_NOT_OK(metadata.Flush(), log_prefix_);
361
155
      }
362
301
    }
363
482
  }
364
365
 private:
366
  Tablet& tablet_;
367
  const std::string log_prefix_;
368
};
369
370
Tablet::Tablet(const TabletInitData& data)
371
    : key_schema_(std::make_unique<Schema>(data.metadata->schema()->CreateKeyProjection())),
372
      metadata_(data.metadata),
373
      table_type_(data.metadata->table_type()),
374
      log_anchor_registry_(data.log_anchor_registry),
375
      mem_tracker_(MemTracker::CreateTracker(
376
          Format("tablet-$0", tablet_id()), data.parent_mem_tracker, AddToParent::kTrue,
377
          CreateMetrics::kFalse)),
378
      block_based_table_mem_tracker_(data.block_based_table_mem_tracker),
379
      clock_(data.clock),
380
      mvcc_(
381
          MakeTabletLogPrefix(data.metadata->raft_group_id(), data.log_prefix_suffix), data.clock),
382
      tablet_options_(data.tablet_options),
383
      pending_non_abortable_op_counter_("RocksDB non-abortable read/write operations"),
384
      pending_abortable_op_counter_("RocksDB abortable read/write operations"),
385
      write_ops_being_submitted_counter_("Tablet schema"),
386
      client_future_(data.client_future),
387
      local_tablet_filter_(data.local_tablet_filter),
388
      log_prefix_suffix_(data.log_prefix_suffix),
389
      is_sys_catalog_(data.is_sys_catalog),
390
      txns_enabled_(data.txns_enabled),
391
      retention_policy_(std::make_shared<TabletRetentionPolicy>(
392
150k
          clock_, data.allowed_history_cutoff_provider, metadata_.get())) {
393
150k
  CHECK(schema()->has_column_ids());
394
150k
  LOG_WITH_PREFIX(INFO) << "Schema version for " << metadata_->table_name() << " is "
395
150k
                        << metadata_->schema_version();
396
397
150k
  if (data.metric_registry) {
398
150k
    MetricEntity::AttributeMap attrs;
399
    // TODO(KUDU-745): table_id is apparently not set in the metadata.
400
150k
    attrs["table_id"] = metadata_->table_id();
401
150k
    attrs["table_name"] = metadata_->table_name();
402
150k
    attrs["namespace_name"] = metadata_->namespace_name();
403
150k
    table_metrics_entity_ =
404
150k
        METRIC_ENTITY_table.Instantiate(data.metric_registry, metadata_->table_id(), attrs);
405
150k
    tablet_metrics_entity_ =
406
150k
        METRIC_ENTITY_tablet.Instantiate(data.metric_registry, tablet_id(), attrs);
407
    // If we are creating a KV table create the metrics callback.
408
150k
    regulardb_statistics_ =
409
150k
        rocksdb::CreateDBStatistics(table_metrics_entity_, tablet_metrics_entity_);
410
150k
    intentsdb_statistics_ =
411
150k
        (GetAtomicFlag(&FLAGS_TEST_export_intentdb_metrics)
412
150k
             ? 
rocksdb::CreateDBStatistics(table_metrics_entity_, tablet_metrics_entity_, true)186
413
150k
             : 
rocksdb::CreateDBStatistics(table_metrics_entity_, nullptr, true)150k
);
414
415
150k
    metrics_.reset(new TabletMetrics(table_metrics_entity_, tablet_metrics_entity_));
416
417
150k
    mem_tracker_->SetMetricEntity(tablet_metrics_entity_);
418
150k
  }
419
420
150k
  auto table_info = metadata_->primary_table_info();
421
150k
  bool has_index = !table_info->index_map->empty();
422
150k
  bool transactional = data.metadata->schema()->table_properties().is_transactional();
423
150k
  if (transactional) {
424
48.9k
    server::HybridClock::EnableClockSkewControl();
425
48.9k
  }
426
150k
  if (txns_enabled_ &&
427
150k
      
data.transaction_participant_context150k
&&
428
150k
      
(149k
is_sys_catalog_149k
||
transactional142k
)) {
429
56.8k
    transaction_participant_ = std::make_unique<TransactionParticipant>(
430
56.8k
        data.transaction_participant_context, this, tablet_metrics_entity_);
431
    // Create transaction manager for secondary index update.
432
56.8k
    if (has_index) {
433
10
      transaction_manager_ = std::make_unique<client::TransactionManager>(
434
10
          client_future_.get(), scoped_refptr<server::Clock>(clock_), local_tablet_filter_);
435
10
    }
436
56.8k
  }
437
438
  // Create index table metadata cache for secondary index update.
439
150k
  if (has_index) {
440
10
    CreateNewYBMetaDataCache();
441
10
  }
442
443
  // If this is a unique index tablet, set up the index primary key schema.
444
150k
  if (table_info->index_info && 
table_info->index_info->is_unique()14.7k
) {
445
3.87k
    unique_index_key_schema_ = std::make_unique<Schema>();
446
3.87k
    const auto ids = table_info->index_info->index_key_column_ids();
447
3.87k
    CHECK_OK(table_info->schema->CreateProjectionByIdsIgnoreMissing(
448
3.87k
        ids, unique_index_key_schema_.get()));
449
3.87k
  }
450
451
150k
  if (data.transaction_coordinator_context &&
452
150k
      
table_info->table_type == TableType::TRANSACTION_STATUS_TABLE_TYPE150k
) {
453
48.0k
    transaction_coordinator_ = std::make_unique<TransactionCoordinator>(
454
48.0k
        metadata_->fs_manager()->uuid(),
455
48.0k
        data.transaction_coordinator_context,
456
48.0k
        metrics_->expired_transactions.get());
457
48.0k
  }
458
459
150k
  snapshots_ = std::make_unique<TabletSnapshots>(this);
460
461
150k
  snapshot_coordinator_ = data.snapshot_coordinator;
462
463
150k
  if (metadata_->tablet_data_state() == TabletDataState::TABLET_DATA_SPLIT_COMPLETED) {
464
1
    SplitDone();
465
1
  }
466
150k
  auto restoration_hybrid_time = metadata_->restoration_hybrid_time();
467
150k
  if (restoration_hybrid_time && 
transaction_participant_150k
&&
FLAGS_consistent_restore56.9k
) {
468
0
    transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time);
469
0
  }
470
150k
  SyncRestoringOperationFilter(ResetSplit::kFalse);
471
150k
}
472
473
76.0k
Tablet::~Tablet() {
474
76.0k
  if (StartShutdown()) {
475
490
    CompleteShutdown();
476
75.5k
  } else {
477
75.5k
    auto state = state_;
478
18.4E
    LOG_IF_WITH_PREFIX(DFATAL, state != kShutdown)
479
18.4E
        << "Destroying Tablet that did not complete shutdown: " << state;
480
75.5k
  }
481
76.0k
  if (block_based_table_mem_tracker_) {
482
75.5k
    block_based_table_mem_tracker_->UnregisterFromParent();
483
75.5k
  }
484
76.0k
  mem_tracker_->UnregisterFromParent();
485
76.0k
}
486
487
150k
Status Tablet::Open() {
488
150k
  TRACE_EVENT0("tablet", "Tablet::Open");
489
150k
  std::lock_guard<rw_spinlock> lock(component_lock_);
490
150k
  CHECK_EQ
(state_, kInitialized) << "already open"0
;
491
150k
  CHECK(schema()->has_column_ids());
492
493
150k
  switch (table_type_) {
494
39.6k
    case TableType::PGSQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
495
90.6k
    case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
496
102k
    case TableType::REDIS_TABLE_TYPE:
497
102k
      RETURN_NOT_OK(OpenKeyValueTablet());
498
102k
      state_ = kBootstrapping;
499
102k
      return Status::OK();
500
47.9k
    case TableType::TRANSACTION_STATUS_TABLE_TYPE:
501
47.9k
      state_ = kBootstrapping;
502
47.9k
      return Status::OK();
503
150k
  }
504
0
  FATAL_INVALID_ENUM_VALUE(TableType, table_type_);
505
506
0
  return Status::OK();
507
150k
}
508
509
276k
Status Tablet::CreateTabletDirectories(const string& db_dir, FsManager* fs) {
510
276k
  LOG_WITH_PREFIX(INFO) << "Creating RocksDB database in dir " << db_dir;
511
512
  // Create the directory table-uuid first.
513
276k
  RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(DirName(db_dir)),
514
276k
                        Format("Failed to create RocksDB table directory $0", DirName(db_dir)));
515
516
276k
  RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(db_dir),
517
276k
                        Format("Failed to create RocksDB tablet directory $0", db_dir));
518
519
276k
  RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(db_dir + kIntentsDBSuffix),
520
276k
                        Format("Failed to create RocksDB tablet intents directory $0", db_dir));
521
522
276k
  RETURN_NOT_OK(snapshots_->CreateDirectories(db_dir, fs));
523
524
276k
  return Status::OK();
525
276k
}
526
527
66.9k
void Tablet::ResetYBMetaDataCache() {
528
66.9k
  std::atomic_store_explicit(&metadata_cache_, {}, std::memory_order_release);
529
66.9k
}
530
531
53.2k
void Tablet::CreateNewYBMetaDataCache() {
532
53.2k
  std::atomic_store_explicit(&metadata_cache_,
533
53.2k
      std::make_shared<client::YBMetaDataCache>(client_future_.get(),
534
53.2k
                                                false /* Update permissions cache */),
535
53.2k
      std::memory_order_release);
536
53.2k
}
537
538
41.3k
std::shared_ptr<client::YBMetaDataCache> Tablet::YBMetaDataCache() {
539
41.3k
  return std::atomic_load_explicit(&metadata_cache_, std::memory_order_acquire);
540
41.3k
}
541
542
template <class F>
543
421k
auto MakeMemTableFlushFilterFactory(const F& f) {
544
  // Trick to get type of mem_table_flush_filter_factory field.
545
421k
  typedef typename decltype(
546
421k
      static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type
547
421k
      MemTableFlushFilterFactoryType;
548
421k
  return std::make_shared<MemTableFlushFilterFactoryType>(f);
549
421k
}
tablet.cc:auto yb::tablet::MakeMemTableFlushFilterFactory<yb::tablet::Tablet::OpenKeyValueTablet()::$_0>(yb::tablet::Tablet::OpenKeyValueTablet()::$_0 const&)
Line
Count
Source
543
276k
auto MakeMemTableFlushFilterFactory(const F& f) {
544
  // Trick to get type of mem_table_flush_filter_factory field.
545
276k
  typedef typename decltype(
546
276k
      static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type
547
276k
      MemTableFlushFilterFactoryType;
548
276k
  return std::make_shared<MemTableFlushFilterFactoryType>(f);
549
276k
}
tablet.cc:auto yb::tablet::MakeMemTableFlushFilterFactory<yb::tablet::Tablet::OpenKeyValueTablet()::$_2>(yb::tablet::Tablet::OpenKeyValueTablet()::$_2 const&)
Line
Count
Source
543
145k
auto MakeMemTableFlushFilterFactory(const F& f) {
544
  // Trick to get type of mem_table_flush_filter_factory field.
545
145k
  typedef typename decltype(
546
145k
      static_cast<rocksdb::Options*>(nullptr)->mem_table_flush_filter_factory)::element_type
547
145k
      MemTableFlushFilterFactoryType;
548
145k
  return std::make_shared<MemTableFlushFilterFactoryType>(f);
549
145k
}
550
551
template <class F>
552
276k
auto MakeMaxFileSizeWithTableTTLFunction(const F& f) {
553
  // Trick to get type of max_file_size_for_compaction field.
554
276k
  typedef typename decltype(
555
276k
      static_cast<rocksdb::Options*>(nullptr)->max_file_size_for_compaction)::element_type
556
276k
      MaxFileSizeWithTableTTLFunction;
557
276k
  return std::make_shared<MaxFileSizeWithTableTTLFunction>(f);
558
276k
}
559
560
31.9k
Result<bool> Tablet::IntentsDbFlushFilter(const rocksdb::MemTable& memtable) {
561
31.9k
  
VLOG_WITH_PREFIX0
(4) << __func__0
;
562
563
31.9k
  auto frontiers = memtable.Frontiers();
564
31.9k
  if (frontiers) {
565
31.9k
    const auto& intents_largest =
566
31.9k
        down_cast<const docdb::ConsensusFrontier&>(frontiers->Largest());
567
568
    // We allow to flush intents DB only after regular DB.
569
    // Otherwise we could lose applied intents when corresponding regular records were not
570
    // flushed.
571
31.9k
    auto regular_flushed_frontier = regular_db_->GetFlushedFrontier();
572
31.9k
    if (regular_flushed_frontier) {
573
15.9k
      const auto& regular_flushed_largest =
574
15.9k
          static_cast<const docdb::ConsensusFrontier&>(*regular_flushed_frontier);
575
15.9k
      if (regular_flushed_largest.op_id().index >= intents_largest.op_id().index) {
576
330
        
VLOG_WITH_PREFIX0
(4) << __func__ << ", regular already flushed"0
;
577
330
        return true;
578
330
      }
579
15.9k
    }
580
31.9k
  } else {
581
5
    
VLOG_WITH_PREFIX0
(4) << __func__ << ", no frontiers"0
;
582
5
  }
583
584
  // If regular db does not have anything to flush, it means that we have just added intents,
585
  // without apply, so it is OK to flush the intents RocksDB.
586
31.6k
  auto flush_intention = regular_db_->GetFlushAbility();
587
31.6k
  if (flush_intention == rocksdb::FlushAbility::kNoNewData) {
588
418
    
VLOG_WITH_PREFIX0
(4) << __func__ << ", no new data"0
;
589
418
    return true;
590
418
  }
591
592
  // Force flush of regular DB if we were not able to flush for too long.
593
31.2k
  auto timeout = std::chrono::milliseconds(FLAGS_intents_flush_max_delay_ms);
594
31.2k
  if (flush_intention != rocksdb::FlushAbility::kAlreadyFlushing &&
595
31.2k
      
(5.34k
shutdown_requested_.load(std::memory_order_acquire)5.34k
||
596
5.34k
       
std::chrono::steady_clock::now() > memtable.FlushStartTime() + timeout5.34k
)) {
597
70
    
VLOG_WITH_PREFIX0
(2) << __func__ << ", force flush"0
;
598
599
70
    rocksdb::FlushOptions options;
600
70
    options.wait = false;
601
70
    RETURN_NOT_OK(regular_db_->Flush(options));
602
70
  }
603
604
31.2k
  return false;
605
31.2k
}
606
607
2.44M
std::string Tablet::LogPrefix() const {
608
2.44M
  return MakeTabletLogPrefix(tablet_id(), log_prefix_suffix_);
609
2.44M
}
610
611
namespace {
612
613
421k
std::string LogDbTypePrefix(docdb::StorageDbType db_type) {
614
421k
  switch (db_type) {
615
276k
    case docdb::StorageDbType::kRegular:
616
276k
      return "R";
617
145k
    case docdb::StorageDbType::kIntents:
618
145k
      return "I";
619
421k
  }
620
0
  FATAL_INVALID_ENUM_VALUE(docdb::StorageDbType, db_type);
621
0
}
622
623
std::string MakeTabletLogPrefix(
624
421k
    const TabletId& tablet_id, const std::string& log_prefix_suffix, docdb::StorageDbType db_type) {
625
421k
  return MakeTabletLogPrefix(
626
421k
      tablet_id, Format("$0 [$1]", log_prefix_suffix, LogDbTypePrefix(db_type)));
627
421k
}
628
629
} // namespace
630
631
421k
std::string Tablet::LogPrefix(docdb::StorageDbType db_type) const {
632
421k
  return MakeTabletLogPrefix(tablet_id(), log_prefix_suffix_, db_type);
633
421k
}
634
635
276k
Status Tablet::OpenKeyValueTablet() {
636
276k
  static const std::string kRegularDB = "RegularDB"s;
637
276k
  static const std::string kIntentsDB = "IntentsDB"s;
638
639
276k
  rocksdb::BlockBasedTableOptions table_options;
640
276k
  if (!metadata()->primary_table_info()->index_info || 
metadata()->colocated()96.8k
) {
641
    // This tablet is not dedicated to the index table, so it should be effective to use
642
    // advanced key-value encoding algorithm optimized for docdb keys structure.
643
179k
    table_options.use_delta_encoding = true;
644
179k
    table_options.data_block_key_value_encoding_format =
645
179k
        VERIFY_RESULT(docdb::GetConfiguredKeyValueEncodingFormat(
646
179k
            FLAGS_regular_tablets_data_block_key_value_encoding));
647
179k
  }
648
276k
  rocksdb::Options rocksdb_options;
649
276k
  InitRocksDBOptions(
650
276k
      &rocksdb_options, LogPrefix(docdb::StorageDbType::kRegular), std::move(table_options));
651
276k
  rocksdb_options.mem_tracker = MemTracker::FindOrCreateTracker(kRegularDB, mem_tracker_);
652
276k
  rocksdb_options.block_based_table_mem_tracker =
653
276k
      MemTracker::FindOrCreateTracker(
654
276k
          Format("$0-$1", kRegularDB, tablet_id()), block_based_table_mem_tracker_,
655
276k
          AddToParent::kTrue, CreateMetrics::kFalse);
656
  // We may not have a metrics_entity_ instantiated in tests.
657
276k
  if (tablet_metrics_entity_) {
658
275k
    rocksdb_options.block_based_table_mem_tracker->SetMetricEntity(
659
275k
        tablet_metrics_entity_, Format("$0_$1", "BlockBasedTable", kRegularDB));
660
275k
  }
661
662
276k
  key_bounds_ = docdb::KeyBounds(metadata()->lower_bound_key(), metadata()->upper_bound_key());
663
664
  // Install the history cleanup handler. Note that TabletRetentionPolicy is going to hold a raw ptr
665
  // to this tablet. So, we ensure that rocksdb_ is reset before this tablet gets destroyed.
666
276k
  rocksdb_options.compaction_filter_factory = make_shared<DocDBCompactionFilterFactory>(
667
276k
      retention_policy_, &key_bounds_);
668
669
276k
  rocksdb_options.mem_table_flush_filter_factory = MakeMemTableFlushFilterFactory([this] {
670
3.62k
    if (mem_table_flush_filter_factory_) {
671
3.06k
      return mem_table_flush_filter_factory_();
672
3.06k
    }
673
560
    return rocksdb::MemTableFilter();
674
3.62k
  });
675
276k
  if (FLAGS_tablet_enable_ttl_file_filter) {
676
0
    rocksdb_options.compaction_file_filter_factory =
677
0
        std::make_shared<docdb::DocDBCompactionFileFilterFactory>(retention_policy_, clock());
678
0
  }
679
680
  // Use a function that checks the table TTL before returning a value for max file size
681
  // for compactions.
682
1.11M
  rocksdb_options.max_file_size_for_compaction = MakeMaxFileSizeWithTableTTLFunction([this] {
683
1.11M
    if (FLAGS_rocksdb_max_file_size_for_compaction > 0 &&
684
1.11M
        retention_policy_->GetRetentionDirective().table_ttl !=
685
0
            docdb::ValueControlFields::kMaxTtl) {
686
0
      return FLAGS_rocksdb_max_file_size_for_compaction;
687
0
    }
688
1.11M
    return std::numeric_limits<uint64_t>::max();
689
1.11M
  });
690
691
276k
  rocksdb_options.disable_auto_compactions = true;
692
276k
  rocksdb_options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
693
276k
  rocksdb_options.level0_stop_writes_trigger = std::numeric_limits<int>::max();
694
695
276k
  rocksdb::Options regular_rocksdb_options(rocksdb_options);
696
276k
  regular_rocksdb_options.listeners.push_back(
697
276k
      std::make_shared<RegularRocksDbListener>(this, regular_rocksdb_options.log_prefix));
698
699
276k
  const string db_dir = metadata()->rocksdb_dir();
700
276k
  RETURN_NOT_OK(CreateTabletDirectories(db_dir, metadata()->fs_manager()));
701
702
276k
  LOG(INFO) << "Opening RocksDB at: " << db_dir;
703
276k
  rocksdb::DB* db = nullptr;
704
276k
  rocksdb::Status rocksdb_open_status = rocksdb::DB::Open(regular_rocksdb_options, db_dir, &db);
705
276k
  if (!rocksdb_open_status.ok()) {
706
5
    LOG_WITH_PREFIX(ERROR) << "Failed to open a RocksDB database in directory " << db_dir << ": "
707
5
                           << rocksdb_open_status;
708
5
    if (db != nullptr) {
709
0
      delete db;
710
0
    }
711
5
    return STATUS(IllegalState, rocksdb_open_status.ToString());
712
5
  }
713
276k
  regular_db_.reset(db);
714
276k
  regular_db_->ListenFilesChanged(std::bind(&Tablet::RegularDbFilesChanged, this));
715
716
276k
  if (transaction_participant_) {
717
145k
    LOG_WITH_PREFIX(INFO) << "Opening intents DB at: " << db_dir + kIntentsDBSuffix;
718
145k
    rocksdb::Options intents_rocksdb_options(rocksdb_options);
719
145k
    docdb::SetLogPrefix(&intents_rocksdb_options, LogPrefix(docdb::StorageDbType::kIntents));
720
721
145k
    intents_rocksdb_options.mem_table_flush_filter_factory = MakeMemTableFlushFilterFactory([this] {
722
31.8k
      return std::bind(&Tablet::IntentsDbFlushFilter, this, _1);
723
31.8k
    });
724
725
145k
    intents_rocksdb_options.compaction_filter_factory =
726
145k
        FLAGS_tablet_do_compaction_cleanup_for_intents ?
727
18.4E
        
std::make_shared<docdb::DocDBIntentsCompactionFilterFactory>(this, &key_bounds_)145k
: nullptr;
728
729
145k
    intents_rocksdb_options.mem_tracker = MemTracker::FindOrCreateTracker(kIntentsDB, mem_tracker_);
730
145k
    intents_rocksdb_options.block_based_table_mem_tracker =
731
145k
        MemTracker::FindOrCreateTracker(
732
145k
            Format("$0-$1", kIntentsDB, tablet_id()), block_based_table_mem_tracker_,
733
145k
            AddToParent::kTrue, CreateMetrics::kFalse);
734
    // We may not have a metrics_entity_ instantiated in tests.
735
145k
    if (
tablet_metrics_entity_145k
) {
736
145k
      intents_rocksdb_options.block_based_table_mem_tracker->SetMetricEntity(
737
145k
          tablet_metrics_entity_, Format("$0_$1", "BlockBasedTable", kIntentsDB));
738
145k
    }
739
145k
    intents_rocksdb_options.statistics = intentsdb_statistics_;
740
741
145k
    rocksdb::DB* intents_db = nullptr;
742
145k
    RETURN_NOT_OK(
743
145k
        rocksdb::DB::Open(intents_rocksdb_options, db_dir + kIntentsDBSuffix, &intents_db));
744
145k
    intents_db_.reset(intents_db);
745
145k
    intents_db_->ListenFilesChanged(std::bind(&Tablet::CleanupIntentFiles, this));
746
145k
  }
747
748
276k
  ql_storage_.reset(new docdb::QLRocksDBStorage(doc_db()));
749
276k
  if (transaction_participant_) {
750
145k
    transaction_participant_->SetDB(doc_db(), &key_bounds_, &pending_non_abortable_op_counter_);
751
145k
  }
752
753
  // Don't allow reads at timestamps lower than the highest history cutoff of a past compaction.
754
276k
  auto regular_flushed_frontier = regular_db_->GetFlushedFrontier();
755
276k
  if (regular_flushed_frontier) {
756
3.05k
    retention_policy_->UpdateCommittedHistoryCutoff(
757
3.05k
        static_cast<const docdb::ConsensusFrontier&>(*regular_flushed_frontier).history_cutoff());
758
3.05k
  }
759
760
276k
  LOG_WITH_PREFIX(INFO) << "Successfully opened a RocksDB database at " << db_dir
761
276k
                        << ", obj: " << db;
762
763
276k
  return Status::OK();
764
276k
}
765
766
4.10k
void Tablet::RegularDbFilesChanged() {
767
4.10k
  std::lock_guard<std::mutex> lock(num_sst_files_changed_listener_mutex_);
768
4.10k
  if (num_sst_files_changed_listener_) {
769
3.41k
    num_sst_files_changed_listener_();
770
3.41k
  }
771
4.10k
}
772
773
150k
void Tablet::SetCleanupPool(ThreadPool* thread_pool) {
774
150k
  if (!transaction_participant_) {
775
93.1k
    return;
776
93.1k
  }
777
778
57.0k
  cleanup_intent_files_token_ = thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL);
779
780
57.0k
  CleanupIntentFiles();
781
57.0k
}
782
783
88.7k
void Tablet::CleanupIntentFiles() {
784
88.7k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
785
88.7k
  if (!scoped_read_operation.ok() || 
state_ != State::kOpen88.7k
||
!FLAGS_delete_intents_sst_files88.7k
||
786
88.7k
      
!cleanup_intent_files_token_88.7k
) {
787
0
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Skip";
788
0
    return;
789
0
  }
790
791
88.7k
  WARN_NOT_OK(
792
88.7k
      cleanup_intent_files_token_->SubmitFunc(std::bind(&Tablet::DoCleanupIntentFiles, this)),
793
88.7k
      "Submit cleanup intent files failed");
794
88.7k
}
795
796
88.7k
void Tablet::DoCleanupIntentFiles() {
797
88.7k
  if (metadata_->is_under_twodc_replication()) {
798
0
    VLOG_WITH_PREFIX_AND_FUNC(4) << "Exit because of TwoDC replication";
799
0
    return;
800
0
  }
801
88.7k
  HybridTime best_file_max_ht = HybridTime::kMax;
802
88.7k
  std::vector<rocksdb::LiveFileMetaData> files;
803
  // Stops when there are no more files to delete.
804
88.7k
  std::string previous_name;
805
89.0k
  while (GetAtomicFlag(&FLAGS_cleanup_intents_sst_files)) {
806
88.9k
    auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
807
88.9k
    if (!scoped_read_operation.ok()) {
808
0
      VLOG_WITH_PREFIX_AND_FUNC(4) << "Failed to acquire scoped read operation";
809
0
      break;
810
0
    }
811
812
88.9k
    best_file_max_ht = HybridTime::kMax;
813
88.9k
    const rocksdb::LiveFileMetaData* best_file = nullptr;
814
88.9k
    files.clear();
815
88.9k
    intents_db_->GetLiveFilesMetaData(&files);
816
88.9k
    auto min_largest_seq_no = std::numeric_limits<rocksdb::SequenceNumber>::max();
817
818
88.9k
    
VLOG_WITH_PREFIX_AND_FUNC4
(5) << "Files: " << AsString(files)4
;
819
820
88.9k
    for (const auto& file : files) {
821
6.28k
      if (file.largest.seqno < min_largest_seq_no) {
822
6.28k
        min_largest_seq_no = file.largest.seqno;
823
6.28k
        if (
file.largest.user_frontier6.28k
) {
824
6.28k
          auto& frontier = down_cast<docdb::ConsensusFrontier&>(*file.largest.user_frontier);
825
6.28k
          best_file_max_ht = frontier.hybrid_time();
826
18.4E
        } else {
827
18.4E
          best_file_max_ht = HybridTime::kMax;
828
18.4E
        }
829
6.28k
        best_file = &file;
830
6.28k
      }
831
6.28k
    }
832
833
88.9k
    auto min_running_start_ht = transaction_participant_->MinRunningHybridTime();
834
88.9k
    if (!min_running_start_ht.is_valid() || 
min_running_start_ht <= best_file_max_ht88.8k
) {
835
18.4E
      VLOG_WITH_PREFIX_AND_FUNC(4)
836
18.4E
          << "Cannot delete because of running transactions: " << min_running_start_ht
837
18.4E
          << ", best file max ht: " << best_file_max_ht;
838
88.7k
      break;
839
88.7k
    }
840
260
    if (best_file->name == previous_name) {
841
0
      LOG_WITH_PREFIX_AND_FUNC(INFO)
842
0
          << "Attempt to delete same file: " << previous_name << ", stopping cleanup";
843
0
      break;
844
0
    }
845
260
    previous_name = best_file->name;
846
847
260
    LOG_WITH_PREFIX_AND_FUNC(INFO)
848
260
        << "Intents SST file will be deleted: " << best_file->ToString()
849
260
        << ", max ht: " << best_file_max_ht << ", min running transaction start ht: "
850
260
        << min_running_start_ht;
851
260
    auto flush_status = regular_db_->Flush(rocksdb::FlushOptions());
852
260
    if (!flush_status.ok()) {
853
0
      LOG_WITH_PREFIX_AND_FUNC(WARNING) << "Failed to flush regular db: " << flush_status;
854
0
      break;
855
0
    }
856
260
    auto delete_status = intents_db_->DeleteFile(best_file->name);
857
260
    if (!delete_status.ok()) {
858
0
      LOG_WITH_PREFIX_AND_FUNC(WARNING)
859
0
          << "Failed to delete " << best_file->ToString() << ", all files " << AsString(files)
860
0
          << ": " << delete_status;
861
0
      break;
862
0
    }
863
260
  }
864
865
88.7k
  if (best_file_max_ht != HybridTime::kMax) {
866
3.53k
    
VLOG_WITH_PREFIX_AND_FUNC0
(4) << "Wait min running hybrid time: " << best_file_max_ht0
;
867
3.53k
    transaction_participant_->WaitMinRunningHybridTime(best_file_max_ht);
868
3.53k
  }
869
88.7k
}
870
871
152k
Status Tablet::EnableCompactions(ScopedRWOperationPause* non_abortable_ops_pause) {
872
152k
  if (!non_abortable_ops_pause) {
873
150k
    auto operation = CreateNonAbortableScopedRWOperation();
874
150k
    RETURN_NOT_OK(operation);
875
150k
    return DoEnableCompactions();
876
150k
  }
877
878
1.79k
  return DoEnableCompactions();
879
152k
}
880
881
323k
Status Tablet::DoEnableCompactions() {
882
323k
  Status regular_db_status;
883
323k
  std::unordered_map<std::string, std::string> new_options = {
884
323k
      { "level0_slowdown_writes_trigger"s,
885
323k
        std::to_string(max_if_negative(FLAGS_rocksdb_level0_slowdown_writes_trigger))},
886
323k
      { "level0_stop_writes_trigger"s,
887
323k
        std::to_string(max_if_negative(FLAGS_rocksdb_level0_stop_writes_trigger))},
888
323k
  };
889
323k
  if (regular_db_) {
890
275k
    WARN_WITH_PREFIX_NOT_OK(
891
275k
        regular_db_->SetOptions(new_options, /* dump_options= */ false),
892
275k
        "Failed to set options on regular DB");
893
275k
    regular_db_status =
894
275k
        regular_db_->EnableAutoCompaction({regular_db_->DefaultColumnFamily()});
895
275k
    if (!regular_db_status.ok()) {
896
0
      LOG_WITH_PREFIX(WARNING) << "Failed to enable compactions on regular DB: "
897
0
                               << regular_db_status;
898
0
    }
899
275k
  }
900
323k
  if (intents_db_) {
901
145k
    WARN_WITH_PREFIX_NOT_OK(
902
145k
        intents_db_->SetOptions(new_options, /* dump_options= */ false),
903
145k
        "Failed to set options on provisional records DB");
904
145k
    Status intents_db_status =
905
145k
        intents_db_->EnableAutoCompaction({intents_db_->DefaultColumnFamily()});
906
145k
    if (!intents_db_status.ok()) {
907
0
      LOG_WITH_PREFIX(WARNING)
908
0
          << "Failed to enable compactions on provisional records DB: " << intents_db_status;
909
0
      return intents_db_status;
910
0
    }
911
145k
  }
912
323k
  return regular_db_status;
913
323k
}
914
915
150k
void Tablet::MarkFinishedBootstrapping() {
916
150k
  CHECK_EQ(state_, kBootstrapping);
917
150k
  state_ = kOpen;
918
150k
}
919
920
227k
bool Tablet::StartShutdown(const IsDropTable is_drop_table) {
921
227k
  LOG_WITH_PREFIX(INFO) << __func__;
922
923
227k
  bool expected = false;
924
227k
  if (!shutdown_requested_.compare_exchange_strong(expected, true)) {
925
151k
    return false;
926
151k
  }
927
928
76.0k
  if (transaction_participant_) {
929
44.7k
    transaction_participant_->StartShutdown();
930
44.7k
  }
931
932
76.0k
  return true;
933
227k
}
934
935
76.0k
void Tablet::CompleteShutdown(IsDropTable is_drop_table) {
936
76.0k
  LOG_WITH_PREFIX(INFO) << __func__ << "(" << is_drop_table << ")";
937
938
76.0k
  StartShutdown();
939
940
76.0k
  auto op_pauses = StartShutdownRocksDBs(DisableFlushOnShutdown(is_drop_table), Stop::kTrue);
941
76.0k
  if (!op_pauses.ok()) {
942
0
    LOG_WITH_PREFIX(DFATAL) << "Failed to shut down: " << op_pauses.status();
943
0
    return;
944
0
  }
945
946
76.0k
  cleanup_intent_files_token_.reset();
947
948
76.0k
  if (transaction_coordinator_) {
949
432
    transaction_coordinator_->Shutdown();
950
432
  }
951
952
76.0k
  if (transaction_participant_) {
953
44.7k
    transaction_participant_->CompleteShutdown();
954
44.7k
  }
955
956
76.0k
  {
957
76.0k
    std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
958
959
76.0k
    if (completed_split_log_anchor_) {
960
44
      WARN_NOT_OK(log_anchor_registry_->Unregister(completed_split_log_anchor_.get()),
961
44
                  "Unregister split anchor");
962
44
    }
963
964
76.0k
    if (completed_split_operation_filter_) {
965
44
      UnregisterOperationFilterUnlocked(completed_split_operation_filter_.get());
966
44
    }
967
968
76.0k
    if (restoring_operation_filter_) {
969
0
      UnregisterOperationFilterUnlocked(restoring_operation_filter_.get());
970
0
    }
971
76.0k
  }
972
973
76.0k
  std::lock_guard<rw_spinlock> lock(component_lock_);
974
975
  // Shutdown the RocksDB instance for this tablet, if present.
976
  // Destroy intents and regular DBs in reverse order to their creation.
977
  // Also it makes sure that regular DB is alive during flush filter of intents db.
978
76.0k
  WARN_NOT_OK(CompleteShutdownRocksDBs(Destroy::kFalse, &(*op_pauses)),
979
76.0k
              "Failed to reset rocksdb during shutdown");
980
981
76.0k
  if (post_split_compaction_task_pool_token_) {
982
49
    post_split_compaction_task_pool_token_->Shutdown();
983
49
  }
984
985
76.0k
  state_ = kShutdown;
986
987
151k
  for (auto* op_pause : op_pauses->AsArray()) {
988
    // Release the mutex that prevents snapshot restore / truncate operations from running. Such
989
    // operations are no longer possible because the tablet has shut down. When we start the
990
    // "read/write operation pause", we incremented the "exclusive operation" counter. This will
991
    // prevent us from decrementing that counter back, disabling read/write operations permanently.
992
151k
    op_pause->ReleaseMutexButKeepDisabled();
993
    // Ensure that op_pause stays in scope throughout this function.
994
18.4E
    LOG_IF(DFATAL, !op_pause->status().ok()) << op_pause->status();
995
151k
  }
996
76.0k
}
997
998
CHECKED_STATUS ResetRocksDB(
999
498k
    bool destroy, const rocksdb::Options& options, std::unique_ptr<rocksdb::DB>* db) {
1000
498k
  if (!*db) {
1001
116k
    return Status::OK();
1002
116k
  }
1003
1004
381k
  auto dir = (**db).GetName();
1005
381k
  db->reset();
1006
381k
  if (!destroy) {
1007
120k
    return Status::OK();
1008
120k
  }
1009
1010
261k
  return rocksdb::DestroyDB(dir, options);
1011
381k
}
1012
1013
Result<TabletScopedRWOperationPauses> Tablet::StartShutdownRocksDBs(
1014
248k
    DisableFlushOnShutdown disable_flush_on_shutdown, Stop stop) {
1015
248k
  TabletScopedRWOperationPauses op_pauses;
1016
1017
498k
  auto pause = [this, stop](const Abortable abortable) -> Result<ScopedRWOperationPause> {
1018
498k
    auto op_pause = PauseReadWriteOperations(abortable, stop);
1019
498k
    if (!op_pause.ok()) {
1020
0
      return op_pause.status().CloneAndPrepend("Failed to stop read/write operations: ");
1021
0
    }
1022
498k
    return std::move(op_pause);
1023
498k
  };
1024
1025
248k
  op_pauses.non_abortable = VERIFY_RESULT(pause(Abortable::kFalse));
1026
1027
0
  bool expected = false;
1028
  // If shutdown has been already requested, we still might need to wait for all pending read/write
1029
  // operations to complete here, because caller is not holding ScopedRWOperationPause.
1030
249k
  if (
rocksdb_shutdown_requested_.compare_exchange_strong(expected, true)248k
) {
1031
498k
    for (auto* db : {regular_db_.get(), intents_db_.get()}) {
1032
498k
      if (db) {
1033
382k
        db->SetDisableFlushOnShutdown(disable_flush_on_shutdown);
1034
382k
        db->StartShutdown();
1035
382k
      }
1036
498k
    }
1037
249k
  }
1038
1039
248k
  op_pauses.abortable = VERIFY_RESULT(pause(Abortable::kTrue));
1040
1041
0
  return op_pauses;
1042
248k
}
1043
1044
Status Tablet::CompleteShutdownRocksDBs(
1045
249k
    Destroy destroy, TabletScopedRWOperationPauses* ops_pauses) {
1046
  // We need non-null ops_pauses just to guarantee that PauseReadWriteOperations has been called.
1047
249k
  RSTATUS_DCHECK(
1048
249k
      ops_pauses != nullptr, InvalidArgument,
1049
249k
      "ops_pauses could not be null, StartRocksDbShutdown should be called before "
1050
249k
      "ShutdownRocksDBs.");
1051
1052
249k
  if (intents_db_) {
1053
133k
    intents_db_->ListenFilesChanged(nullptr);
1054
133k
  }
1055
1056
249k
  rocksdb::Options rocksdb_options;
1057
249k
  if (destroy) {
1058
173k
    InitRocksDBOptions(&rocksdb_options, LogPrefix());
1059
173k
  }
1060
1061
249k
  Status intents_status = ResetRocksDB(destroy, rocksdb_options, &intents_db_);
1062
249k
  Status regular_status = ResetRocksDB(destroy, rocksdb_options, &regular_db_);
1063
249k
  key_bounds_ = docdb::KeyBounds();
1064
  // Reset rocksdb_shutdown_requested_ to the initial state like RocksDBs were never opened,
1065
  // so we don't have to reset it on RocksDB open (we potentially can have several places in the
1066
  // code doing opening RocksDB while RocksDB shutdown is always going through
1067
  // Tablet::ShutdownRocksDBs).
1068
249k
  rocksdb_shutdown_requested_ = false;
1069
1070
18.4E
  return 
regular_status.ok()249k
?
intents_status249k
: regular_status;
1071
249k
}
1072
1073
Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::NewRowIterator(
1074
    const Schema &projection,
1075
    const ReadHybridTime read_hybrid_time,
1076
    const TableId& table_id,
1077
    CoarseTimePoint deadline,
1078
    AllowBootstrappingState allow_bootstrapping_state,
1079
421k
    const Slice& sub_doc_key) const {
1080
421k
  if (state_ != kOpen && 
(178
!allow_bootstrapping_state178
||
state_ != kBootstrapping178
)) {
1081
0
    return STATUS_FORMAT(IllegalState, "Tablet in wrong state: $0", state_);
1082
0
  }
1083
1084
421k
  if (table_type_ != TableType::YQL_TABLE_TYPE && 
table_type_ != TableType::PGSQL_TABLE_TYPE12
) {
1085
0
    return STATUS_FORMAT(NotSupported, "Invalid table type: $0", table_type_);
1086
0
  }
1087
1088
421k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
1089
421k
  RETURN_NOT_OK(scoped_read_operation);
1090
1091
421k
  
VLOG_WITH_PREFIX40
(2) << "Created new Iterator reading at " << read_hybrid_time.ToString()40
;
1092
1093
421k
  const std::shared_ptr<tablet::TableInfo> table_info =
1094
421k
      VERIFY_RESULT(metadata_->GetTableInfo(table_id));
1095
0
  const Schema& schema = *table_info->schema;
1096
421k
  auto mapped_projection = std::make_unique<Schema>();
1097
421k
  RETURN_NOT_OK(schema.GetMappedReadProjection(projection, mapped_projection.get()));
1098
1099
421k
  auto txn_op_ctx = VERIFY_RESULT(CreateTransactionOperationContext(
1100
421k
      /* transaction_id */ boost::none,
1101
421k
      schema.table_properties().is_ysql_catalog_table()));
1102
421k
  const auto read_time = read_hybrid_time
1103
421k
      ? 
read_hybrid_time43.9k
1104
421k
      : 
ReadHybridTime::SingleTime(377k
VERIFY_RESULT377k
(SafeTime(RequireLease::kFalse)))377k
;
1105
0
  auto result = std::make_unique<DocRowwiseIterator>(
1106
421k
      std::move(mapped_projection), schema, txn_op_ctx, doc_db(),
1107
421k
      deadline, read_time, &pending_non_abortable_op_counter_);
1108
421k
  RETURN_NOT_OK(result->Init(table_type_, sub_doc_key));
1109
421k
  return std::move(result);
1110
421k
}
1111
1112
Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::NewRowIterator(
1113
0
    const TableId& table_id) const {
1114
0
  const std::shared_ptr<tablet::TableInfo> table_info =
1115
0
      VERIFY_RESULT(metadata_->GetTableInfo(table_id));
1116
0
  return NewRowIterator(*table_info->schema, {}, table_id);
1117
0
}
1118
1119
Status Tablet::ApplyRowOperations(
1120
8.83M
    WriteOperation* operation, AlreadyAppliedToRegularDB already_applied_to_regular_db) {
1121
8.83M
  const auto& write_request =
1122
8.83M
      operation->consensus_round() && 
operation->consensus_round()->replicate_msg()8.55M
1123
          // Online case.
1124
8.83M
          ? 
operation->consensus_round()->replicate_msg()->write()8.55M
1125
          // Bootstrap case.
1126
8.83M
          : 
*operation->request()277k
;
1127
8.83M
  const KeyValueWriteBatchPB& put_batch = write_request.write_batch();
1128
8.83M
  if (metrics_) {
1129
8.66M
    VLOG(3) << "Applying write batch (write_pairs=" << put_batch.write_pairs().size() << "): "
1130
112
            << put_batch.ShortDebugString();
1131
8.66M
    metrics_->rows_inserted->IncrementBy(put_batch.write_pairs().size());
1132
8.66M
  }
1133
1134
8.83M
  return ApplyOperation(
1135
8.83M
      *operation, write_request.batch_idx(), put_batch, already_applied_to_regular_db);
1136
8.83M
}
1137
1138
Status Tablet::ApplyOperation(
1139
    const Operation& operation, int64_t batch_idx,
1140
    const docdb::KeyValueWriteBatchPB& write_batch,
1141
8.83M
    AlreadyAppliedToRegularDB already_applied_to_regular_db) {
1142
8.83M
  auto hybrid_time = operation.WriteHybridTime();
1143
1144
8.83M
  docdb::ConsensusFrontiers frontiers;
1145
  // Even if we have an external hybrid time, use the local commit hybrid time in the consensus
1146
  // frontier.
1147
8.83M
  auto frontiers_ptr =
1148
8.83M
      InitFrontiers(operation.op_id(), operation.hybrid_time(), &frontiers);
1149
8.83M
  if (frontiers_ptr) {
1150
8.83M
    auto ttl = write_batch.has_ttl()
1151
8.83M
        ? 
MonoDelta::FromNanoseconds(write_batch.ttl())611
1152
8.83M
        : 
docdb::ValueControlFields::kMaxTtl8.82M
;
1153
8.83M
    frontiers_ptr->Largest().set_max_value_level_ttl_expiration_time(
1154
8.83M
        docdb::FileExpirationFromValueTTL(operation.hybrid_time(), ttl));
1155
8.83M
  }
1156
8.83M
  return ApplyKeyValueRowOperations(
1157
8.83M
      batch_idx, write_batch, frontiers_ptr, hybrid_time, already_applied_to_regular_db);
1158
8.83M
}
1159
1160
Status Tablet::WriteTransactionalBatch(
1161
    int64_t batch_idx,
1162
    const KeyValueWriteBatchPB& put_batch,
1163
    HybridTime hybrid_time,
1164
2.44M
    const rocksdb::UserFrontiers* frontiers) {
1165
2.44M
  auto transaction_id = CHECK_RESULT(
1166
2.44M
      FullyDecodeTransactionId(put_batch.transaction().transaction_id()));
1167
1168
2.44M
  bool store_metadata = false;
1169
2.44M
  if (put_batch.transaction().has_isolation()) {
1170
    // Store transaction metadata (status tablet, isolation level etc.)
1171
1.80M
    auto metadata = VERIFY_RESULT(TransactionMetadata::FromPB(put_batch.transaction()));
1172
0
    auto add_result = transaction_participant()->Add(metadata);
1173
1.80M
    if (!add_result.ok()) {
1174
130k
      return add_result.status();
1175
130k
    }
1176
1.67M
    store_metadata = add_result.get();
1177
1.67M
  }
1178
2.31M
  boost::container::small_vector<uint8_t, 16> encoded_replicated_batch_idx_set;
1179
2.31M
  auto prepare_batch_data = transaction_participant()->PrepareBatchData(
1180
2.31M
      transaction_id, batch_idx, &encoded_replicated_batch_idx_set);
1181
2.31M
  if (!prepare_batch_data) {
1182
    // If metadata is missing it could be caused by aborted and removed transaction.
1183
    // In this case we should not add new intents for it.
1184
481
    return STATUS(TryAgain,
1185
481
                  Format("Transaction metadata missing: $0, looks like it was just aborted",
1186
481
                         transaction_id), Slice(),
1187
481
                         PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE));
1188
481
  }
1189
1190
2.31M
  auto isolation_level = prepare_batch_data->first;
1191
2.31M
  auto& last_batch_data = prepare_batch_data->second;
1192
1193
2.31M
  docdb::TransactionalWriter writer(
1194
2.31M
      put_batch, hybrid_time, transaction_id, isolation_level,
1195
2.31M
      docdb::PartialRangeKeyIntents(metadata_->UsePartialRangeKeyIntents()),
1196
2.31M
      Slice(encoded_replicated_batch_idx_set.data(), encoded_replicated_batch_idx_set.size()),
1197
2.31M
      last_batch_data.next_write_id);
1198
2.31M
  if (store_metadata) {
1199
1.67M
    writer.SetMetadataToStore(&put_batch.transaction());
1200
1.67M
  }
1201
2.31M
  rocksdb::WriteBatch write_batch;
1202
2.31M
  write_batch.SetDirectWriter(&writer);
1203
2.31M
  RequestScope request_scope(transaction_participant_.get());
1204
1205
2.31M
  WriteToRocksDB(frontiers, &write_batch, StorageDbType::kIntents);
1206
1207
2.31M
  last_batch_data.hybrid_time = hybrid_time;
1208
2.31M
  last_batch_data.next_write_id = writer.intra_txn_write_id();
1209
2.31M
  transaction_participant()->BatchReplicated(transaction_id, last_batch_data);
1210
1211
2.31M
  return Status::OK();
1212
2.31M
}
1213
1214
Status Tablet::ApplyKeyValueRowOperations(
1215
    int64_t batch_idx,
1216
    const KeyValueWriteBatchPB& put_batch,
1217
    const rocksdb::UserFrontiers* frontiers,
1218
    const HybridTime hybrid_time,
1219
8.83M
    AlreadyAppliedToRegularDB already_applied_to_regular_db) {
1220
8.83M
  if (put_batch.write_pairs().empty() && 
put_batch.read_pairs().empty()810k
&&
1221
8.83M
      
put_batch.apply_external_transactions().empty()39.4k
) {
1222
39.4k
    return Status::OK();
1223
39.4k
  }
1224
1225
  // Could return failure only for cases where it is safe to skip applying operations to DB.
1226
  // For instance where aborted transaction intents are written.
1227
  // In all other cases we should crash instead of skipping apply.
1228
1229
8.79M
  if (put_batch.has_transaction()) {
1230
2.44M
    RETURN_NOT_OK(WriteTransactionalBatch(batch_idx, put_batch, hybrid_time, frontiers));
1231
6.35M
  } else {
1232
6.35M
    rocksdb::WriteBatch regular_write_batch;
1233
6.35M
    auto* regular_write_batch_ptr = !already_applied_to_regular_db ? 
&regular_write_batch6.34M
:
nullptr1.68k
;
1234
1235
    // See comments for PrepareExternalWriteBatch.
1236
6.35M
    rocksdb::WriteBatch intents_write_batch;
1237
6.35M
    bool has_non_exteranl_records = PrepareExternalWriteBatch(
1238
6.35M
        put_batch, hybrid_time, intents_db_.get(), regular_write_batch_ptr, &intents_write_batch);
1239
1240
6.35M
    if (intents_write_batch.Count() != 0) {
1241
0
      if (!metadata_->is_under_twodc_replication()) {
1242
0
        RETURN_NOT_OK(metadata_->SetIsUnderTwodcReplicationAndFlush(true));
1243
0
      }
1244
0
      WriteToRocksDB(frontiers, &intents_write_batch, StorageDbType::kIntents);
1245
0
    }
1246
1247
6.35M
    docdb::NonTransactionalWriter writer(put_batch, hybrid_time);
1248
6.35M
    if (!already_applied_to_regular_db && 
has_non_exteranl_records6.34M
) {
1249
6.34M
      regular_write_batch.SetDirectWriter(&writer);
1250
6.34M
    }
1251
6.35M
    if (regular_write_batch.Count() != 0 || 
regular_write_batch.HasDirectWriter()6.34M
) {
1252
6.34M
      WriteToRocksDB(frontiers, &regular_write_batch, StorageDbType::kRegular);
1253
6.34M
    }
1254
1255
6.35M
    if (snapshot_coordinator_) {
1256
99.2M
      for (const auto& pair : put_batch.write_pairs()) {
1257
99.2M
        WARN_NOT_OK(snapshot_coordinator_->ApplyWritePair(pair.key(), pair.value()),
1258
99.2M
                    "ApplyWritePair failed");
1259
99.2M
      }
1260
779k
    }
1261
6.35M
  }
1262
1263
8.66M
  return Status::OK();
1264
8.79M
}
1265
1266
void Tablet::WriteToRocksDB(
1267
    const rocksdb::UserFrontiers* frontiers,
1268
    rocksdb::WriteBatch* write_batch,
1269
11.6M
    docdb::StorageDbType storage_db_type) {
1270
11.6M
  rocksdb::DB* dest_db = nullptr;
1271
11.6M
  switch (storage_db_type) {
1272
7.65M
    case StorageDbType::kRegular: dest_db = regular_db_.get(); break;
1273
3.97M
    case StorageDbType::kIntents: dest_db = intents_db_.get(); break;
1274
11.6M
  }
1275
1276
  // Frontiers can be null for deferred apply operations.
1277
11.6M
  
if (11.6M
frontiers11.6M
) {
1278
11.6M
    write_batch->SetFrontiers(frontiers);
1279
11.6M
  }
1280
1281
  // We are using Raft replication index for the RocksDB sequence number for
1282
  // all members of this write batch.
1283
11.6M
  rocksdb::WriteOptions write_options;
1284
11.6M
  InitRocksDBWriteOptions(&write_options);
1285
1286
11.6M
  auto rocksdb_write_status = dest_db->Write(write_options, write_batch);
1287
11.6M
  if (!rocksdb_write_status.ok()) {
1288
0
    LOG_WITH_PREFIX(FATAL) << "Failed to write a batch with " << write_batch->Count()
1289
0
                           << " operations into RocksDB: " << rocksdb_write_status;
1290
0
  }
1291
1292
11.6M
  if (FLAGS_TEST_docdb_log_write_batches) {
1293
0
    LOG_WITH_PREFIX(INFO)
1294
0
        << "Wrote " << write_batch->Count() << " key/value pairs to " << storage_db_type
1295
0
        << " RocksDB:\n" << docdb::WriteBatchToString(
1296
0
            *write_batch,
1297
0
            storage_db_type,
1298
0
            BinaryOutputFormat::kEscapedAndHex,
1299
0
            WriteBatchOutputFormat::kArrow,
1300
0
            "  " + LogPrefix(storage_db_type));
1301
0
  }
1302
11.6M
}
1303
1304
//--------------------------------------------------------------------------------------------------
1305
// Redis Request Processing.
1306
Status Tablet::HandleRedisReadRequest(CoarseTimePoint deadline,
1307
                                      const ReadHybridTime& read_time,
1308
                                      const RedisReadRequestPB& redis_read_request,
1309
84.6k
                                      RedisResponsePB* response) {
1310
  // TODO: move this locking to the top-level read request handler in TabletService.
1311
84.6k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline);
1312
84.6k
  RETURN_NOT_OK(scoped_read_operation);
1313
1314
84.6k
  ScopedTabletMetricsTracker metrics_tracker(metrics_->redis_read_latency);
1315
1316
84.6k
  docdb::RedisReadOperation doc_op(redis_read_request, doc_db(), deadline, read_time);
1317
84.6k
  RETURN_NOT_OK(doc_op.Execute());
1318
84.6k
  *response = std::move(doc_op.response());
1319
84.6k
  return Status::OK();
1320
84.6k
}
1321
1322
bool IsSchemaVersionCompatible(
1323
11.7M
    uint32_t current_version, uint32_t request_version, bool compatible_with_previous_version) {
1324
11.7M
  if (request_version == current_version) {
1325
11.7M
    return true;
1326
11.7M
  }
1327
1328
1.66k
  if (compatible_with_previous_version && 
request_version == current_version + 11.11k
) {
1329
79
    DVLOG
(1) << (0
FLAGS_yql_allow_compatible_schema_versions0
?
"A"0
:
"Not a"0
)
1330
0
             << "ccepting request that is ahead of us by 1 version";
1331
79
    return FLAGS_yql_allow_compatible_schema_versions;
1332
79
  }
1333
1334
1.58k
  return false;
1335
1.66k
}
1336
1337
//--------------------------------------------------------------------------------------------------
1338
// CQL Request Processing.
1339
Status Tablet::HandleQLReadRequest(
1340
    CoarseTimePoint deadline,
1341
    const ReadHybridTime& read_time,
1342
    const QLReadRequestPB& ql_read_request,
1343
    const TransactionMetadataPB& transaction_metadata,
1344
7.28M
    QLReadRequestResult* result) {
1345
7.28M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline);
1346
7.28M
  RETURN_NOT_OK(scoped_read_operation);
1347
7.28M
  ScopedTabletMetricsTracker metrics_tracker(metrics_->ql_read_latency);
1348
1349
7.28M
  if (!IsSchemaVersionCompatible(
1350
7.28M
          metadata()->schema_version(), ql_read_request.schema_version(),
1351
7.28M
          ql_read_request.is_compatible_with_previous_version())) {
1352
140
    DVLOG
(1) << "Setting status for read as YQL_STATUS_SCHEMA_VERSION_MISMATCH"0
;
1353
140
    result->response.set_status(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH);
1354
140
    result->response.set_error_message(Format(
1355
140
        "schema version mismatch for table $0: expected $1, got $2 (compt with prev: $3)",
1356
140
        metadata()->table_id(),
1357
140
        metadata()->schema_version(),
1358
140
        ql_read_request.schema_version(),
1359
140
        ql_read_request.is_compatible_with_previous_version()));
1360
140
    return Status::OK();
1361
140
  }
1362
1363
7.28M
  Result<TransactionOperationContext> txn_op_ctx =
1364
7.28M
      CreateTransactionOperationContext(transaction_metadata, /* is_ysql_catalog_table */ false);
1365
7.28M
  RETURN_NOT_OK(txn_op_ctx);
1366
7.28M
  return AbstractTablet::HandleQLReadRequest(
1367
7.28M
      deadline, read_time, ql_read_request, *txn_op_ctx, result);
1368
7.28M
}
1369
1370
CHECKED_STATUS Tablet::CreatePagingStateForRead(const QLReadRequestPB& ql_read_request,
1371
                                                const size_t row_count,
1372
7.26M
                                                QLResponsePB* response) const {
1373
1374
  // If the response does not have a next partition key, it means we are done reading the current
1375
  // tablet. But, if the request does not have the hash columns set, this must be a table-scan,
1376
  // so we need to decide if we are done or if we need to move to the next tablet.
1377
  // If we did not reach the:
1378
  //   1. max number of results (LIMIT clause -- if set)
1379
  //   2. end of the table (this was the last tablet)
1380
  //   3. max partition key (upper bound condition using 'token' -- if set)
1381
  // we set the paging state to point to the exclusive end partition key of this tablet, which is
1382
  // the start key of the next tablet).
1383
7.26M
  if (ql_read_request.hashed_column_values().empty() &&
1384
7.26M
      
!response->paging_state().has_next_partition_key()43.8k
) {
1385
    // Check we did not reach the results limit.
1386
    // If return_paging_state is set, it means the request limit is actually just the page size.
1387
43.2k
    if (!ql_read_request.has_limit() ||
1388
43.2k
        
row_count < ql_read_request.limit()42.3k
||
1389
43.2k
        
ql_read_request.return_paging_state()119
) {
1390
1391
      // Check we did not reach the last tablet.
1392
43.2k
      const string& next_partition_key = metadata_->partition()->partition_key_end();
1393
43.2k
      if (!next_partition_key.empty()) {
1394
34.1k
        uint16_t next_hash_code = PartitionSchema::DecodeMultiColumnHashValue(next_partition_key);
1395
1396
        // Check we did not reach the max partition key.
1397
34.1k
        if (!ql_read_request.has_max_hash_code() ||
1398
34.1k
            
next_hash_code <= ql_read_request.max_hash_code()138
) {
1399
34.0k
          response->mutable_paging_state()->set_next_partition_key(next_partition_key);
1400
34.0k
        }
1401
34.1k
      }
1402
43.2k
    }
1403
43.2k
  }
1404
1405
  // If there is a paging state, update the total number of rows read so far.
1406
7.26M
  if (response->has_paging_state()) {
1407
35.1k
    response->mutable_paging_state()->set_total_num_rows_read(
1408
35.1k
        ql_read_request.paging_state().total_num_rows_read() + row_count);
1409
35.1k
  }
1410
7.26M
  return Status::OK();
1411
7.26M
}
1412
1413
//--------------------------------------------------------------------------------------------------
1414
// PGSQL Request Processing.
1415
//--------------------------------------------------------------------------------------------------
1416
Status Tablet::HandlePgsqlReadRequest(
1417
    CoarseTimePoint deadline,
1418
    const ReadHybridTime& read_time,
1419
    bool is_explicit_request_read_time,
1420
    const PgsqlReadRequestPB& pgsql_read_request,
1421
    const TransactionMetadataPB& transaction_metadata,
1422
    const SubTransactionMetadataPB& subtransaction_metadata,
1423
    PgsqlReadRequestResult* result,
1424
3.47M
    size_t* num_rows_read) {
1425
3.47M
  TRACE(LogPrefix());
1426
3.47M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation(deadline);
1427
3.47M
  RETURN_NOT_OK(scoped_read_operation);
1428
  // TODO(neil) Work on metrics for PGSQL.
1429
  // ScopedTabletMetricsTracker metrics_tracker(metrics_->pgsql_read_latency);
1430
1431
3.47M
  const shared_ptr<tablet::TableInfo> table_info =
1432
3.47M
      VERIFY_RESULT(metadata_->GetTableInfo(pgsql_read_request.table_id()));
1433
  // Assert the table is a Postgres table.
1434
0
  DCHECK_EQ(table_info->table_type, TableType::PGSQL_TABLE_TYPE);
1435
3.47M
  if (table_info->schema_version != pgsql_read_request.schema_version()) {
1436
62
    result->response.set_status(PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH);
1437
62
    result->response.set_error_message(
1438
62
        Format("schema version mismatch for table $0: expected $1, got $2",
1439
62
               table_info->table_id,
1440
62
               table_info->schema_version,
1441
62
               pgsql_read_request.schema_version()));
1442
62
    return Status::OK();
1443
62
  }
1444
1445
3.47M
  Result<TransactionOperationContext> txn_op_ctx =
1446
3.47M
      CreateTransactionOperationContext(
1447
3.47M
          transaction_metadata,
1448
3.47M
          table_info->schema->table_properties().is_ysql_catalog_table(),
1449
3.47M
          &subtransaction_metadata);
1450
3.47M
  RETURN_NOT_OK(txn_op_ctx);
1451
3.47M
  return AbstractTablet::HandlePgsqlReadRequest(
1452
3.47M
      deadline, read_time, is_explicit_request_read_time,
1453
3.47M
      pgsql_read_request, *txn_op_ctx, result, num_rows_read);
1454
3.47M
}
1455
1456
// Returns true if the query can be satisfied by rows present in current tablet.
1457
// Returns false if query requires other tablets to also be scanned. Examples of this include:
1458
//   (1) full table scan queries
1459
//   (2) queries that whose key conditions are such that the query will require a multi tablet
1460
//       scan.
1461
//
1462
// Requests that are of the form batched index lookups of ybctids are sent only to a single tablet.
1463
// However there can arise situations where tablets splitting occurs after such requests are being
1464
// prepared by the pggate layer (specifically pg_doc_op.cc). Under such circumstances, if tablets
1465
// are split into two sub-tablets, then such batched index lookups of ybctid requests should be sent
1466
// to multiple tablets (the two sub-tablets). Hence, the request ends up not being a single tablet
1467
// request.
1468
Result<bool> Tablet::IsQueryOnlyForTablet(
1469
3.46M
    const PgsqlReadRequestPB& pgsql_read_request, size_t row_count) const {
1470
3.46M
  if ((!pgsql_read_request.ybctid_column_value().value().binary_value().empty() &&
1471
3.46M
       
(16.3k
implicit_cast<size_t>(pgsql_read_request.batch_arguments_size()) == row_count16.3k
||
1472
16.3k
        
pgsql_read_request.batch_arguments_size() == 010.3k
)) ||
1473
3.46M
       
!pgsql_read_request.partition_column_values().empty()3.45M
) {
1474
2.05M
    return true;
1475
2.05M
  }
1476
1477
1.41M
  std::shared_ptr<const Schema> schema = metadata_->schema();
1478
1.41M
  if (schema->has_cotable_id() || 
schema->has_colocation_id()1.41M
) {
1479
    // This is a colocated table.
1480
0
    return true;
1481
0
  }
1482
1483
1.41M
  if (schema->num_hash_key_columns() == 0 &&
1484
1.41M
      schema->num_range_key_columns() ==
1485
1.24M
          implicit_cast<size_t>(pgsql_read_request.range_column_values_size())) {
1486
    // PK is contained within this tablet.
1487
81.1k
    return true;
1488
81.1k
  }
1489
1.33M
  return false;
1490
1.41M
}
1491
1492
Result<bool> Tablet::HasScanReachedMaxPartitionKey(
1493
    const PgsqlReadRequestPB& pgsql_read_request,
1494
    const string& partition_key,
1495
100k
    size_t row_count) const {
1496
100k
  auto schema = metadata_->schema();
1497
1498
100k
  if (schema->num_hash_key_columns() > 0) {
1499
100k
    uint16_t next_hash_code = PartitionSchema::DecodeMultiColumnHashValue(partition_key);
1500
    // For batched index lookup of ybctids, check if the current partition hash is lesser than
1501
    // upper bound. If it is, we can then avoid paging. Paging of batched index lookup of ybctids
1502
    // occur when tablets split after request is prepared.
1503
100k
    if (pgsql_read_request.has_ybctid_column_value() &&
1504
100k
        
implicit_cast<size_t>(pgsql_read_request.batch_arguments_size()) > row_count290
) {
1505
290
      if (!pgsql_read_request.upper_bound().has_key()) {
1506
0
          return false;
1507
0
      }
1508
290
      uint16_t upper_bound_hash =
1509
290
          PartitionSchema::DecodeMultiColumnHashValue(pgsql_read_request.upper_bound().key());
1510
290
      uint16_t partition_hash =
1511
290
          PartitionSchema::DecodeMultiColumnHashValue(partition_key);
1512
290
      return pgsql_read_request.upper_bound().is_inclusive() ?
1513
0
          partition_hash > upper_bound_hash :
1514
290
          partition_hash >= upper_bound_hash;
1515
290
    }
1516
100k
    if (pgsql_read_request.has_max_hash_code() &&
1517
100k
        
next_hash_code > pgsql_read_request.max_hash_code()14.1k
) {
1518
14.1k
      return true;
1519
14.1k
    }
1520
100k
  } else 
if (260
pgsql_read_request.has_upper_bound()260
) {
1521
154
    docdb::DocKey partition_doc_key(*schema);
1522
154
    VERIFY_RESULT(partition_doc_key.DecodeFrom(
1523
0
        partition_key, docdb::DocKeyPart::kWholeDocKey, docdb::AllowSpecial::kTrue));
1524
0
    docdb::DocKey max_partition_doc_key(*schema);
1525
154
    VERIFY_RESULT(max_partition_doc_key.DecodeFrom(
1526
0
        pgsql_read_request.upper_bound().key(), docdb::DocKeyPart::kWholeDocKey,
1527
0
        docdb::AllowSpecial::kTrue));
1528
1529
0
    auto cmp = partition_doc_key.CompareTo(max_partition_doc_key);
1530
154
    return pgsql_read_request.upper_bound().is_inclusive() ? 
cmp > 0118
:
cmp >= 036
;
1531
154
  }
1532
1533
86.2k
  return false;
1534
100k
}
1535
1536
namespace {
1537
1538
void SetBackfillSpecForYsqlBackfill(
1539
    const PgsqlReadRequestPB& pgsql_read_request,
1540
    const size_t& row_count,
1541
2.33k
    PgsqlResponsePB* response) {
1542
2.33k
  PgsqlBackfillSpecPB in_spec;
1543
2.33k
  in_spec.ParseFromString(a2b_hex(pgsql_read_request.backfill_spec()));
1544
1545
2.33k
  auto limit = in_spec.limit();
1546
2.33k
  PgsqlBackfillSpecPB out_spec;
1547
2.33k
  out_spec.set_limit(limit);
1548
2.33k
  out_spec.set_count(in_spec.count() + row_count);
1549
2.33k
  response->set_is_backfill_batch_done(!response->has_paging_state());
1550
18.4E
  VLOG(2) << " limit is " << limit << " set_count to " << out_spec.count();
1551
2.33k
  if (
limit >= 02.33k
&& out_spec.count() >= limit) {
1552
    // Hint postgres to stop scanning now. And set up the
1553
    // next_row_key based on the paging state.
1554
507
    if (response->has_paging_state()) {
1555
390
      out_spec.set_next_row_key(response->paging_state().next_row_key());
1556
390
    }
1557
507
    response->set_is_backfill_batch_done(true);
1558
507
  }
1559
1560
2.33k
  VLOG(2) << "Got input spec " << yb::ToString(in_spec)
1561
0
          << " set output spec " << yb::ToString(out_spec)
1562
0
          << " batch_done=" << response->is_backfill_batch_done();
1563
2.33k
  string serialized_pb;
1564
2.33k
  out_spec.SerializeToString(&serialized_pb);
1565
2.33k
  response->set_backfill_spec(b2a_hex(serialized_pb));
1566
2.33k
}
1567
1568
}  // namespace
1569
1570
CHECKED_STATUS Tablet::CreatePagingStateForRead(const PgsqlReadRequestPB& pgsql_read_request,
1571
                                                const size_t row_count,
1572
3.47M
                                                PgsqlResponsePB* response) const {
1573
  // If there is no hash column in the read request, this is a full-table query. And if there is no
1574
  // paging state in the response, we are done reading from the current tablet. In this case, we
1575
  // should return the exclusive end partition key of this tablet if not empty which is the start
1576
  // key of the next tablet. Do so only if the request has no row count limit, or there is and we
1577
  // haven't hit it, or we are asked to return paging state even when we have hit the limit.
1578
  // Otherwise, leave the paging state empty which means we are completely done reading for the
1579
  // whole SELECT statement.
1580
3.47M
  const bool single_tablet_query =
1581
3.47M
      VERIFY_RESULT(IsQueryOnlyForTablet(pgsql_read_request, row_count));
1582
3.47M
  if (!single_tablet_query &&
1583
3.47M
      
!response->has_paging_state()1.33M
&&
1584
3.47M
      
(1.27M
!pgsql_read_request.has_limit()1.27M
||
row_count < pgsql_read_request.limit()1.27M
||
1585
1.27M
       
pgsql_read_request.return_paging_state()162
)) {
1586
    // For backward scans partition_key_start must be used as next_partition_key.
1587
    // Client level logic will check it and route next request to the preceding tablet.
1588
1.27M
    const auto& next_partition_key =
1589
1.27M
        pgsql_read_request.has_hash_code() ||
1590
1.27M
        
pgsql_read_request.is_forward_scan()1.18M
1591
1.27M
            ? 
metadata_->partition()->partition_key_end()1.27M
1592
1.27M
            : 
metadata_->partition()->partition_key_start()49
;
1593
    // Check we did not reach the last tablet.
1594
1.27M
    const bool end_scan = next_partition_key.empty() ||
1595
1.27M
        VERIFY_RESULT(HasScanReachedMaxPartitionKey(
1596
1.27M
            pgsql_read_request, next_partition_key, row_count));
1597
1.27M
    if (!end_scan) {
1598
86.3k
      response->mutable_paging_state()->set_next_partition_key(next_partition_key);
1599
86.3k
    }
1600
1.27M
  }
1601
1602
  // If there is a paging state, update the total number of rows read so far.
1603
3.47M
  if (response->has_paging_state()) {
1604
149k
    response->mutable_paging_state()->set_total_num_rows_read(
1605
149k
        pgsql_read_request.paging_state().total_num_rows_read() + row_count);
1606
149k
  }
1607
1608
3.47M
  if (pgsql_read_request.is_for_backfill()) {
1609
    // BackfillSpec is used to implement "paging" across multiple BackfillIndex
1610
    // rpcs from the master.
1611
2.33k
    SetBackfillSpecForYsqlBackfill(pgsql_read_request, row_count, response);
1612
2.33k
  }
1613
3.47M
  return Status::OK();
1614
3.47M
}
1615
1616
//--------------------------------------------------------------------------------------------------
1617
1618
3.26M
void Tablet::AcquireLocksAndPerformDocOperations(std::unique_ptr<WriteQuery> query) {
1619
3.26M
  TRACE(__func__);
1620
3.26M
  if (table_type_ == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
1621
0
    query->Cancel(
1622
0
        STATUS(NotSupported, "Transaction status table does not support write"));
1623
0
    return;
1624
0
  }
1625
1626
3.26M
  if (!GetAtomicFlag(&FLAGS_disable_alter_vs_write_mutual_exclusion)) {
1627
3.25M
    auto write_permit = GetPermitToWrite(query->deadline());
1628
3.25M
    if (!write_permit.ok()) {
1629
16.3k
      TRACE("Could not get the write permit.");
1630
16.3k
      WriteQuery::StartSynchronization(std::move(query), MoveStatus(write_permit));
1631
16.3k
      return;
1632
16.3k
    }
1633
    // Save the write permit to be released after the operation is submitted
1634
    // to Raft queue.
1635
3.24M
    query->UseSubmitToken(std::move(write_permit));
1636
3.24M
  }
1637
1638
3.24M
  WriteQuery::Execute(std::move(query));
1639
3.24M
}
1640
1641
174k
Status Tablet::Flush(FlushMode mode, FlushFlags flags, int64_t ignore_if_flushed_after_tick) {
1642
174k
  TRACE_EVENT0("tablet", "Tablet::Flush");
1643
1644
174k
  ScopedRWOperation pending_op;
1645
174k
  if (!HasFlags(flags, FlushFlags::kNoScopedOperation)) {
1646
2.84k
    pending_op = CreateNonAbortableScopedRWOperation();
1647
2.84k
    LOG_IF
(DFATAL, !pending_op.ok()) << "CreateNonAbortableScopedRWOperation failed"1
;
1648
2.84k
    RETURN_NOT_OK(pending_op);
1649
2.84k
  }
1650
1651
174k
  rocksdb::FlushOptions options;
1652
174k
  options.ignore_if_flushed_after_tick = ignore_if_flushed_after_tick;
1653
174k
  bool flush_intents = intents_db_ && 
HasFlags(flags, FlushFlags::kIntents)89.1k
;
1654
174k
  if (flush_intents) {
1655
89.1k
    options.wait = false;
1656
89.1k
    WARN_NOT_OK(intents_db_->Flush(options), "Flush intents DB");
1657
89.1k
  }
1658
1659
174k
  if (HasFlags(flags, FlushFlags::kRegular) && 
regular_db_174k
) {
1660
174k
    options.wait = mode == FlushMode::kSync;
1661
174k
    WARN_NOT_OK(regular_db_->Flush(options), "Flush regular DB");
1662
174k
  }
1663
1664
174k
  if (flush_intents && 
mode == FlushMode::kSync89.1k
) {
1665
95
    RETURN_NOT_OK(intents_db_->WaitForFlush());
1666
95
  }
1667
1668
174k
  return Status::OK();
1669
174k
}
1670
1671
50
Status Tablet::WaitForFlush() {
1672
50
  TRACE_EVENT0("tablet", "Tablet::WaitForFlush");
1673
1674
50
  if (regular_db_) {
1675
50
    RETURN_NOT_OK(regular_db_->WaitForFlush());
1676
50
  }
1677
50
  if (intents_db_) {
1678
29
    RETURN_NOT_OK(intents_db_->WaitForFlush());
1679
29
  }
1680
1681
50
  return Status::OK();
1682
50
}
1683
1684
0
Status Tablet::ImportData(const std::string& source_dir) {
1685
  // We import only regular records, so don't have to deal with intents here.
1686
0
  return regular_db_->Import(source_dir);
1687
0
}
1688
1689
// We apply intents by iterating over whole transaction reverse index.
1690
// Using value of reverse index record we find original intent record and apply it.
1691
// After that we delete both intent record and reverse index record.
1692
1.30M
Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApplyData& data) {
1693
1.30M
  
VLOG_WITH_PREFIX796
(4) << __func__ << ": " << data.transaction_id796
;
1694
1695
  // This flag enables tests to induce a situation where a transaction has committed but its intents
1696
  // haven't yet moved to regular db for a sufficiently long period. For example, it can help a test
1697
  // to reliably assert that conflict resolution/ concurrency control with a conflicting committed
1698
  // transaction is done properly in the rare situation where the committed transaction's intents
1699
  // are still in intents db and not yet in regular db.
1700
1.30M
  AtomicFlagSleepMs(&FLAGS_TEST_inject_sleep_before_applying_intents_ms);
1701
1.30M
  docdb::ApplyIntentsContext context(
1702
1.30M
      data.transaction_id, data.apply_state, data.aborted, data.commit_ht, data.log_ht,
1703
1.30M
      &key_bounds_, intents_db_.get());
1704
1.30M
  docdb::IntentsWriter intents_writer(
1705
1.30M
      data.apply_state ? 
data.apply_state->key173
:
Slice()1.30M
, intents_db_.get(), &context);
1706
1.30M
  rocksdb::WriteBatch regular_write_batch;
1707
1.30M
  regular_write_batch.SetDirectWriter(&intents_writer);
1708
  // data.hybrid_time contains transaction commit time.
1709
  // We don't set transaction field of put_batch, otherwise we would write another bunch of intents.
1710
1.30M
  docdb::ConsensusFrontiers frontiers;
1711
1.30M
  auto frontiers_ptr = data.op_id.empty() ? 
nullptr0
: InitFrontiers(data, &frontiers);
1712
1.30M
  WriteToRocksDB(frontiers_ptr, &regular_write_batch, StorageDbType::kRegular);
1713
1.30M
  return context.apply_state();
1714
1.30M
}
1715
1716
template <class Ids>
1717
1.67M
CHECKED_STATUS Tablet::RemoveIntentsImpl(const RemoveIntentsData& data, const Ids& ids) {
1718
1.67M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
1719
1.67M
  RETURN_NOT_OK(scoped_read_operation);
1720
1721
1.67M
  rocksdb::WriteBatch intents_write_batch;
1722
1.67M
  for (const auto& id : ids) {
1723
1.67M
    boost::optional<docdb::ApplyTransactionState> apply_state;
1724
1.67M
    for (;;) {
1725
1.67M
      docdb::RemoveIntentsContext context(id);
1726
1.67M
      docdb::IntentsWriter writer(
1727
1.67M
          apply_state ? 
apply_state->key637
:
Slice()1.67M
, intents_db_.get(), &context);
1728
1.67M
      intents_write_batch.SetDirectWriter(&writer);
1729
1.67M
      docdb::ConsensusFrontiers frontiers;
1730
1.67M
      auto frontiers_ptr = InitFrontiers(data, &frontiers);
1731
1.67M
      WriteToRocksDB(frontiers_ptr, &intents_write_batch, StorageDbType::kIntents);
1732
1733
1.67M
      if (!context.apply_state().active()) {
1734
1.67M
        break;
1735
1.67M
      }
1736
1737
1.05k
      apply_state = std::move(context.apply_state());
1738
1.05k
      intents_write_batch.Clear();
1739
1740
1.05k
      AtomicFlagSleepMs(&FLAGS_apply_intents_task_injected_delay_ms);
1741
1.05k
    }
1742
1.67M
  }
1743
1744
1.67M
  return Status::OK();
1745
1.67M
}
yb::Status yb::tablet::Tablet::RemoveIntentsImpl<std::initializer_list<yb::StronglyTypedUuid<yb::TransactionId_Tag> > >(yb::tablet::RemoveIntentsData const&, std::initializer_list<yb::StronglyTypedUuid<yb::TransactionId_Tag> > const&)
Line
Count
Source
1717
1.67M
CHECKED_STATUS Tablet::RemoveIntentsImpl(const RemoveIntentsData& data, const Ids& ids) {
1718
1.67M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
1719
1.67M
  RETURN_NOT_OK(scoped_read_operation);
1720
1721
1.67M
  rocksdb::WriteBatch intents_write_batch;
1722
1.67M
  for (const auto& id : ids) {
1723
1.67M
    boost::optional<docdb::ApplyTransactionState> apply_state;
1724
1.67M
    for (;;) {
1725
1.67M
      docdb::RemoveIntentsContext context(id);
1726
1.67M
      docdb::IntentsWriter writer(
1727
1.67M
          apply_state ? 
apply_state->key637
:
Slice()1.67M
, intents_db_.get(), &context);
1728
1.67M
      intents_write_batch.SetDirectWriter(&writer);
1729
1.67M
      docdb::ConsensusFrontiers frontiers;
1730
1.67M
      auto frontiers_ptr = InitFrontiers(data, &frontiers);
1731
1.67M
      WriteToRocksDB(frontiers_ptr, &intents_write_batch, StorageDbType::kIntents);
1732
1733
1.67M
      if (!context.apply_state().active()) {
1734
1.67M
        break;
1735
1.67M
      }
1736
1737
1.05k
      apply_state = std::move(context.apply_state());
1738
1.05k
      intents_write_batch.Clear();
1739
1740
1.05k
      AtomicFlagSleepMs(&FLAGS_apply_intents_task_injected_delay_ms);
1741
1.05k
    }
1742
1.67M
  }
1743
1744
1.67M
  return Status::OK();
1745
1.67M
}
Unexecuted instantiation: yb::Status yb::tablet::Tablet::RemoveIntentsImpl<std::__1::unordered_set<yb::StronglyTypedUuid<yb::TransactionId_Tag>, boost::hash<yb::StronglyTypedUuid<yb::TransactionId_Tag> >, std::__1::equal_to<yb::StronglyTypedUuid<yb::TransactionId_Tag> >, std::__1::allocator<yb::StronglyTypedUuid<yb::TransactionId_Tag> > > >(yb::tablet::RemoveIntentsData const&, std::__1::unordered_set<yb::StronglyTypedUuid<yb::TransactionId_Tag>, boost::hash<yb::StronglyTypedUuid<yb::TransactionId_Tag> >, std::__1::equal_to<yb::StronglyTypedUuid<yb::TransactionId_Tag> >, std::__1::allocator<yb::StronglyTypedUuid<yb::TransactionId_Tag> > > const&)
1746
1747
1748
1.67M
Status Tablet::RemoveIntents(const RemoveIntentsData& data, const TransactionId& id) {
1749
1.67M
  return RemoveIntentsImpl(data, std::initializer_list<TransactionId>{id});
1750
1.67M
}
1751
1752
0
Status Tablet::RemoveIntents(const RemoveIntentsData& data, const TransactionIdSet& transactions) {
1753
0
  return RemoveIntentsImpl(data, transactions);
1754
0
}
1755
1756
// We batch this as some tx could be very large and may not fit in one batch
1757
CHECKED_STATUS Tablet::GetIntents(
1758
    const TransactionId& id,
1759
    std::vector<docdb::IntentKeyValueForCDC>* key_value_intents,
1760
147
    docdb::ApplyTransactionState* stream_state) {
1761
147
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
1762
147
  RETURN_NOT_OK(scoped_read_operation);
1763
1764
147
  docdb::ApplyTransactionState new_stream_state;
1765
1766
147
  new_stream_state = VERIFY_RESULT(
1767
0
      docdb::GetIntentsBatch(id, &key_bounds_, stream_state, intents_db_.get(), key_value_intents));
1768
0
  stream_state->key = new_stream_state.key;
1769
147
  stream_state->write_id = new_stream_state.write_id;
1770
1771
147
  return Status::OK();
1772
147
}
1773
1774
0
Result<HybridTime> Tablet::ApplierSafeTime(HybridTime min_allowed, CoarseTimePoint deadline) {
1775
  // We could not use mvcc_ directly, because correct lease should be passed to it.
1776
0
  return SafeTime(RequireLease::kFalse, min_allowed, deadline);
1777
0
}
1778
1779
Result<std::unique_ptr<docdb::YQLRowwiseIteratorIf>> Tablet::CreateCDCSnapshotIterator(
1780
12
    const Schema& projection, const ReadHybridTime& time, const string& next_key) {
1781
12
  
VLOG_WITH_PREFIX0
(2) << "The nextKey is " << next_key0
;
1782
1783
12
  Slice next_slice;
1784
12
  if (!next_key.empty()) {
1785
0
    SubDocKey start_sub_doc_key;
1786
0
    docdb::KeyBytes start_key_bytes(next_key);
1787
0
    RETURN_NOT_OK(start_sub_doc_key.FullyDecodeFrom(start_key_bytes.AsSlice()));
1788
0
    next_slice = start_sub_doc_key.doc_key().Encode().AsSlice();
1789
0
    VLOG_WITH_PREFIX(2) << "The nextKey doc is " << next_key;
1790
0
  }
1791
12
  return NewRowIterator(
1792
12
      projection, time, "", CoarseTimePoint::max(), AllowBootstrappingState::kFalse, next_slice);
1793
12
}
1794
1795
Status Tablet::CreatePreparedChangeMetadata(
1796
1.10M
    ChangeMetadataOperation *operation, const Schema* schema) {
1797
1.10M
  if (schema) {
1798
73.4k
    auto key_schema = GetKeySchema(operation->has_table_id() ? 
operation->table_id()72.7k
:
""726
);
1799
73.4k
    if (!key_schema.KeyEquals(*schema)) {
1800
0
      return STATUS_FORMAT(
1801
0
          InvalidArgument,
1802
0
          "Schema keys cannot be altered. New schema key: $0. Existing schema key: $1",
1803
0
          schema->CreateKeyProjection(),
1804
0
          key_schema);
1805
0
    }
1806
1807
73.4k
    if (!schema->has_column_ids()) {
1808
      // this probably means that the request is not from the Master
1809
0
      return STATUS(InvalidArgument, "Missing Column IDs");
1810
0
    }
1811
73.4k
  }
1812
1813
1.10M
  operation->set_schema(schema);
1814
1.10M
  return Status::OK();
1815
1.10M
}
1816
1817
1.02M
Status Tablet::AddTable(const TableInfoPB& table_info) {
1818
1.02M
  Schema schema;
1819
1.02M
  RETURN_NOT_OK(SchemaFromPB(table_info.schema(), &schema));
1820
1821
1.02M
  PartitionSchema partition_schema;
1822
1.02M
  RETURN_NOT_OK(PartitionSchema::FromPB(table_info.partition_schema(), schema, &partition_schema));
1823
1824
1.02M
  metadata_->AddTable(
1825
1.02M
      table_info.table_id(), table_info.namespace_name(), table_info.table_name(),
1826
1.02M
      table_info.table_type(), schema, IndexMap(), partition_schema, boost::none,
1827
1.02M
      table_info.schema_version());
1828
1829
1.02M
  RETURN_NOT_OK(metadata_->Flush());
1830
1831
1.02M
  return Status::OK();
1832
1.02M
}
1833
1834
237
Status Tablet::RemoveTable(const std::string& table_id) {
1835
237
  metadata_->RemoveTable(table_id);
1836
237
  RETURN_NOT_OK(metadata_->Flush());
1837
237
  return Status::OK();
1838
237
}
1839
1840
11.1k
Status Tablet::MarkBackfillDone(const TableId& table_id) {
1841
11.1k
  auto table_info = table_id.empty() ?
1842
11.1k
    
metadata_->primary_table_info()0
: VERIFY_RESULT(metadata_->GetTableInfo(table_id));
1843
11.1k
  LOG_WITH_PREFIX(INFO) << "Setting backfill as done. Current schema  "
1844
11.1k
                        << table_info->schema->ToString();
1845
11.1k
  const vector<DeletedColumn> empty_deleted_cols;
1846
11.1k
  Schema new_schema = *table_info->schema;
1847
11.1k
  new_schema.SetRetainDeleteMarkers(false);
1848
11.1k
  metadata_->SetSchema(
1849
11.1k
      new_schema, *table_info->index_map, empty_deleted_cols, table_info->schema_version, table_id);
1850
11.1k
  return metadata_->Flush();
1851
11.1k
}
1852
1853
68.1k
Status Tablet::AlterSchema(ChangeMetadataOperation *operation) {
1854
68.1k
  auto current_table_info = VERIFY_RESULT(metadata_->GetTableInfo(
1855
68.1k
        operation->request()->has_alter_table_id() ?
1856
68.1k
        operation->request()->alter_table_id() : ""));
1857
0
  auto key_schema = current_table_info->schema->CreateKeyProjection();
1858
1859
68.1k
  RSTATUS_DCHECK_NE(operation->schema(), static_cast<void*>(nullptr), InvalidArgument,
1860
68.1k
                    "Schema could not be null");
1861
68.1k
  RSTATUS_DCHECK(key_schema.KeyEquals(*DCHECK_NOTNULL(operation->schema())), InvalidArgument,
1862
68.1k
                 "Schema keys cannot be altered");
1863
1864
  // Abortable read/write operations could be long and they shouldn't access metadata_ without
1865
  // locks, so no need to wait for them here.
1866
68.1k
  auto op_pause = PauseReadWriteOperations(Abortable::kFalse);
1867
68.1k
  RETURN_NOT_OK(op_pause);
1868
1869
  // If the current version >= new version, there is nothing to do.
1870
68.1k
  if (current_table_info->schema_version >= operation->schema_version()) {
1871
1.49k
    LOG_WITH_PREFIX(INFO)
1872
1.49k
        << "Already running schema version " << current_table_info->schema_version
1873
1.49k
        << " got alter request for version " << operation->schema_version();
1874
1.49k
    return Status::OK();
1875
1.49k
  }
1876
1877
66.6k
  LOG_WITH_PREFIX(INFO) << "Alter schema from " << current_table_info->schema->ToString()
1878
66.6k
                        << " version " << current_table_info->schema_version
1879
66.6k
                        << " to " << operation->schema()->ToString()
1880
66.6k
                        << " version " << operation->schema_version();
1881
1882
  // Find out which columns have been deleted in this schema change, and add them to metadata.
1883
66.6k
  vector<DeletedColumn> deleted_cols;
1884
315k
  for (const auto& col : current_table_info->schema->column_ids()) {
1885
315k
    if (operation->schema()->find_column_by_id(col) == Schema::kColumnNotFound) {
1886
2.15k
      deleted_cols.emplace_back(col, clock_->Now());
1887
2.15k
      LOG_WITH_PREFIX(INFO) << "Column " << col << " recorded as deleted.";
1888
2.15k
    }
1889
315k
  }
1890
1891
66.6k
  metadata_->SetSchema(*operation->schema(), operation->index_map(), deleted_cols,
1892
66.6k
                      operation->schema_version(), current_table_info->table_id);
1893
66.7k
  if (
operation->has_new_table_name()66.6k
) {
1894
66.7k
    metadata_->SetTableName(current_table_info->namespace_name, operation->new_table_name());
1895
66.7k
    if (table_metrics_entity_) {
1896
66.7k
      table_metrics_entity_->SetAttribute("table_name", operation->new_table_name());
1897
66.7k
      table_metrics_entity_->SetAttribute("namespace_name", current_table_info->namespace_name);
1898
66.7k
    }
1899
66.8k
    if (
tablet_metrics_entity_66.7k
) {
1900
66.8k
      tablet_metrics_entity_->SetAttribute("table_name", operation->new_table_name());
1901
66.8k
      tablet_metrics_entity_->SetAttribute("namespace_name", current_table_info->namespace_name);
1902
66.8k
    }
1903
66.7k
  }
1904
1905
  // Clear old index table metadata cache.
1906
66.6k
  ResetYBMetaDataCache();
1907
1908
  // Create transaction manager and index table metadata cache for secondary index update.
1909
66.6k
  if (!operation->index_map().empty()) {
1910
53.2k
    if (current_table_info->schema->table_properties().is_transactional() &&
1911
53.2k
        
!transaction_manager_43.2k
) {
1912
8.73k
      transaction_manager_ = std::make_unique<client::TransactionManager>(
1913
8.73k
          client_future_.get(), scoped_refptr<server::Clock>(clock_), local_tablet_filter_);
1914
8.73k
    }
1915
53.2k
    CreateNewYBMetaDataCache();
1916
53.2k
  }
1917
1918
  // Flush the updated schema metadata to disk.
1919
66.6k
  return metadata_->Flush();
1920
68.1k
}
1921
1922
5.22k
Status Tablet::AlterWalRetentionSecs(ChangeMetadataOperation* operation) {
1923
5.22k
  if (operation->has_wal_retention_secs()) {
1924
5.22k
    LOG_WITH_PREFIX(INFO) << "Altering metadata wal_retention_secs from "
1925
5.22k
                          << metadata_->wal_retention_secs()
1926
5.22k
                          << " to " << operation->wal_retention_secs();
1927
5.22k
    metadata_->set_wal_retention_secs(operation->wal_retention_secs());
1928
    // Flush the updated schema metadata to disk.
1929
5.22k
    return metadata_->Flush();
1930
5.22k
  }
1931
0
  return STATUS_SUBSTITUTE(InvalidArgument, "Invalid ChangeMetadataOperation: $0",
1932
5.22k
                           operation->ToString());
1933
5.22k
}
1934
1935
namespace {
1936
1937
Result<pgwrapper::PGConnPtr> ConnectToPostgres(
1938
    const HostPort& pgsql_proxy_bind_address,
1939
    const std::string& database_name,
1940
1.92k
    const uint64_t postgres_auth_key) {
1941
  // Construct connection string.  Note that the plain password in the connection string will be
1942
  // sent over the wire, but since it only goes over a unix-domain socket, there should be no
1943
  // eavesdropping/tampering issues.
1944
1.92k
  std::string conn_str = Format(
1945
1.92k
      "user=$0 password=$1 host=$2 port=$3 dbname=$4",
1946
1.92k
      "postgres",
1947
1.92k
      postgres_auth_key,
1948
1.92k
      PgDeriveSocketDir(pgsql_proxy_bind_address.host()),
1949
1.92k
      pgsql_proxy_bind_address.port(),
1950
1.92k
      pgwrapper::PqEscapeLiteral(database_name));
1951
1.92k
  std::string conn_str_for_log = Format(
1952
1.92k
      "user=$0 password=$1 host=$2 port=$3 dbname=$4",
1953
1.92k
      "postgres",
1954
1.92k
      "<REDACTED>",
1955
1.92k
      PgDeriveSocketDir(pgsql_proxy_bind_address.host()),
1956
1.92k
      pgsql_proxy_bind_address.port(),
1957
1.92k
      pgwrapper::PqEscapeLiteral(database_name));
1958
1.92k
  VLOG
(1) << __func__ << ": libpq connection string: " << conn_str_for_log0
;
1959
1960
  // Connect.
1961
1.92k
  pgwrapper::PGConnPtr conn(PQconnectdb(conn_str.c_str()));
1962
1.92k
  if (!conn) {
1963
0
    return STATUS(IllegalState, "backfill failed to connect to DB");
1964
0
  }
1965
1.92k
  if (PQstatus(conn.get()) == CONNECTION_BAD) {
1966
0
    std::string msg(PQerrorMessage(conn.get()));
1967
1968
    // Avoid double newline (postgres adds a newline after the error message).
1969
0
    if (msg.back() == '\n') {
1970
0
      msg.resize(msg.size() - 1);
1971
0
    }
1972
0
    LOG(WARNING) << "libpq connection \"" << conn_str_for_log << "\" failed: " << msg;
1973
0
    return STATUS_FORMAT(IllegalState, "backfill connection to DB failed: $0", msg);
1974
0
  }
1975
1.92k
  return conn;
1976
1.92k
}
1977
1978
2.27k
string GenerateSerializedBackfillSpec(size_t batch_size, const string& next_row_to_backfill) {
1979
2.27k
  PgsqlBackfillSpecPB backfill_spec;
1980
2.27k
  std::string serialized_backfill_spec;
1981
  // Note that although we set the desired batch_size as the limit, postgres
1982
  // has its own internal paging size of 1024 (controlled by --ysql_prefetch_limit). So the actual
1983
  // rows processed could be larger than the limit set here; unless it happens
1984
  // to be a multiple of FLAGS_ysql_prefetch_limit
1985
2.27k
  backfill_spec.set_limit(batch_size);
1986
2.27k
  backfill_spec.set_next_row_key(next_row_to_backfill);
1987
2.27k
  backfill_spec.SerializeToString(&serialized_backfill_spec);
1988
2.27k
  VLOG(2) << "Generating backfill_spec " << yb::ToString(backfill_spec) << " encoded as "
1989
0
          << b2a_hex(serialized_backfill_spec) << " a string of length "
1990
0
          << serialized_backfill_spec.length();
1991
2.27k
  return serialized_backfill_spec;
1992
2.27k
}
1993
1994
Result<PgsqlBackfillSpecPB> QueryPostgresToDoBackfill(
1995
2.27k
    const pgwrapper::PGConnPtr& conn, const string& query_str) {
1996
  // Execute.
1997
2.27k
  pgwrapper::PGResultPtr res(PQexec(conn.get(), query_str.c_str()));
1998
2.27k
  if (!res) {
1999
0
    std::string msg(PQerrorMessage(conn.get()));
2000
2001
    // Avoid double newline (postgres adds a newline after the error message).
2002
0
    if (msg.back() == '\n') {
2003
0
      msg.resize(msg.size() - 1);
2004
0
    }
2005
0
    LOG(WARNING) << "libpq query \"" << query_str << "\" was not sent: " << msg;
2006
0
    return STATUS_FORMAT(IllegalState, "backfill query couldn't be sent: $0", msg);
2007
0
  }
2008
2009
2.27k
  ExecStatusType status = PQresultStatus(res.get());
2010
  // TODO(jason): more properly handle bad statuses
2011
2.27k
  if (status != PGRES_TUPLES_OK) {
2012
24
    std::string msg(PQresultErrorMessage(res.get()));
2013
2014
    // Avoid double newline (postgres adds a newline after the error message).
2015
24
    if (msg.back() == '\n') {
2016
24
      msg.resize(msg.size() - 1);
2017
24
    }
2018
24
    LOG(WARNING) << "libpq query \"" << query_str << "\" returned " << PQresStatus(status) << ": "
2019
24
                 << msg;
2020
24
    return STATUS(IllegalState, msg);
2021
24
  }
2022
2023
2.24k
  CHECK_EQ(PQntuples(res.get()), 1);
2024
2.24k
  CHECK_EQ(PQnfields(res.get()), 1);
2025
2.24k
  const std::string returned_spec = CHECK_RESULT(pgwrapper::GetString(res.get(), 0, 0));
2026
2.24k
  VLOG
(3) << "Got back " << returned_spec << " of length " << returned_spec.length()1
;
2027
2028
2.24k
  PgsqlBackfillSpecPB spec;
2029
2.24k
  spec.ParseFromString(a2b_hex(returned_spec));
2030
2.24k
  return spec;
2031
2.27k
}
2032
2033
struct BackfillParams {
2034
  explicit BackfillParams(const CoarseTimePoint deadline)
2035
      : start_time(CoarseMonoClock::Now()),
2036
        deadline(deadline),
2037
        rate_per_sec(GetAtomicFlag(&FLAGS_backfill_index_rate_rows_per_sec)),
2038
4.34k
        batch_size(GetAtomicFlag(&FLAGS_backfill_index_write_batch_size)) {
2039
4.34k
    auto grace_margin_ms = GetAtomicFlag(&FLAGS_backfill_index_timeout_grace_margin_ms);
2040
4.34k
    if (
grace_margin_ms < 04.34k
) {
2041
      // We need: grace_margin_ms >= 1000 * batch_size / rate_per_sec;
2042
      // By default, we will set it to twice the minimum value + 1s.
2043
4.34k
      grace_margin_ms = (rate_per_sec > 0 ? 
1000 * (1 + 2.0 * batch_size / rate_per_sec)42
:
10004.30k
);
2044
4.34k
      YB_LOG_EVERY_N_SECS(INFO, 10)
2045
1.44k
          << "Using grace margin of " << grace_margin_ms << "ms, original deadline: "
2046
1.44k
          << MonoDelta(deadline - start_time);
2047
4.34k
    }
2048
4.34k
    modified_deadline = deadline - grace_margin_ms * 1ms;
2049
4.34k
  }
2050
2051
  CoarseTimePoint start_time;
2052
  CoarseTimePoint deadline;
2053
  size_t rate_per_sec;
2054
  size_t batch_size;
2055
  CoarseTimePoint modified_deadline;
2056
};
2057
2058
// Slow down before the next batch to throttle the rate of processing.
2059
void MaybeSleepToThrottleBackfill(
2060
    const CoarseTimePoint& start_time,
2061
5.06k
    size_t number_of_rows_processed) {
2062
5.06k
  if (FLAGS_backfill_index_rate_rows_per_sec <= 0) {
2063
4.78k
    return;
2064
4.78k
  }
2065
2066
275
  auto now = CoarseMonoClock::Now();
2067
275
  auto duration_for_rows_processed = MonoDelta(now - start_time);
2068
275
  auto expected_time_for_processing_rows = MonoDelta::FromMilliseconds(
2069
275
      number_of_rows_processed * 1000 / FLAGS_backfill_index_rate_rows_per_sec);
2070
275
  DVLOG(3) << "Duration since last batch " << duration_for_rows_processed << " expected duration "
2071
3
           << expected_time_for_processing_rows << " extra time to sleep: "
2072
3
           << expected_time_for_processing_rows - duration_for_rows_processed;
2073
275
  if (duration_for_rows_processed < expected_time_for_processing_rows) {
2074
266
    SleepFor(expected_time_for_processing_rows - duration_for_rows_processed);
2075
266
  }
2076
275
}
2077
2078
bool CanProceedToBackfillMoreRows(
2079
    const BackfillParams& backfill_params,
2080
3.01k
    size_t number_of_rows_processed) {
2081
3.01k
  auto now = CoarseMonoClock::Now();
2082
3.01k
  if (now > backfill_params.modified_deadline ||
2083
3.01k
      
(3.00k
FLAGS_TEST_backfill_paging_size > 03.00k
&&
2084
3.00k
       
number_of_rows_processed >= FLAGS_TEST_backfill_paging_size320
)) {
2085
    // We are done if we are out of time.
2086
    // Or, if for testing purposes we have a bound on the size of batches to process.
2087
132
    return false;
2088
132
  }
2089
2.88k
  return true;
2090
3.01k
}
2091
2092
bool CanProceedToBackfillMoreRows(
2093
    const BackfillParams& backfill_params,
2094
    const string& backfilled_until,
2095
2.24k
    size_t number_of_rows_processed) {
2096
2.24k
  if (backfilled_until.empty()) {
2097
    // The backfill is done for this tablet. No need to do another batch.
2098
1.85k
    return false;
2099
1.85k
  }
2100
2101
390
  return CanProceedToBackfillMoreRows(backfill_params, number_of_rows_processed);
2102
2.24k
}
2103
2104
}  // namespace
2105
2106
// Assume that we are already in the Backfilling mode.
2107
Status Tablet::BackfillIndexesForYsql(
2108
    const std::vector<IndexInfo>& indexes,
2109
    const std::string& backfill_from,
2110
    const CoarseTimePoint deadline,
2111
    const HybridTime read_time,
2112
    const HostPort& pgsql_proxy_bind_address,
2113
    const std::string& database_name,
2114
    const uint64_t postgres_auth_key,
2115
    size_t* number_of_rows_processed,
2116
1.95k
    std::string* backfilled_until) {
2117
1.95k
  if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) {
2118
136
    TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms);
2119
136
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms));
2120
136
  }
2121
1.95k
  LOG(INFO) << "Begin " << __func__ << " at " << read_time << " from "
2122
1.95k
            << (backfill_from.empty() ? 
"<start-of-the-tablet>"1.89k
:
strings::b2a_hex(backfill_from)67
)
2123
1.95k
            << " for " << AsString(indexes);
2124
1.95k
  *backfilled_until = backfill_from;
2125
1.95k
  pgwrapper::PGConnPtr conn =
2126
1.95k
      VERIFY_RESULT(ConnectToPostgres(pgsql_proxy_bind_address, database_name, postgres_auth_key));
2127
2128
  // Construct query string.
2129
0
  std::string index_oids;
2130
1.95k
  {
2131
1.95k
    std::stringstream ss;
2132
1.95k
    for (auto& index : indexes) {
2133
      // Cannot use Oid type because for large OID such as 2147500041, it overflows Postgres
2134
      // lexer <ival> type. Use int to output as -2147467255 that is accepted by <ival>.
2135
1.91k
      int index_oid = VERIFY_RESULT(GetPgsqlTableOid(index.table_id()));
2136
0
      ss << index_oid << ",";
2137
1.91k
    }
2138
1.95k
    index_oids = ss.str();
2139
1.95k
    index_oids.pop_back();
2140
1.95k
  }
2141
0
  std::string partition_key = metadata_->partition()->partition_key_start();
2142
2143
1.95k
  BackfillParams backfill_params(deadline);
2144
1.95k
  *number_of_rows_processed = 0;
2145
2.31k
  do {
2146
2.31k
    std::string serialized_backfill_spec =
2147
2.31k
        GenerateSerializedBackfillSpec(backfill_params.batch_size, *backfilled_until);
2148
2149
    // This should be safe from injection attacks because the parameters only consist of characters
2150
    // [-,0-9a-f].
2151
2.31k
    std::string query_str = Format(
2152
2.31k
        "BACKFILL INDEX $0 WITH x'$1' READ TIME $2 PARTITION x'$3';",
2153
2.31k
        index_oids,
2154
2.31k
        b2a_hex(serialized_backfill_spec),
2155
2.31k
        read_time.ToUint64(),
2156
2.31k
        b2a_hex(partition_key));
2157
2.31k
    VLOG
(1) << __func__ << ": libpq query string: " << query_str40
;
2158
2159
2.31k
    PgsqlBackfillSpecPB spec = 
VERIFY_RESULT2.28k
(QueryPostgresToDoBackfill(conn, query_str));2.28k
2160
0
    *number_of_rows_processed += spec.count();
2161
2.28k
    *backfilled_until = spec.next_row_key();
2162
2163
2.28k
    VLOG(2) << "Backfilled " << *number_of_rows_processed << " rows. "
2164
41
            << "Setting backfilled_until to " << b2a_hex(*backfilled_until) << " of length "
2165
41
            << backfilled_until->length();
2166
2167
2.28k
    MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed);
2168
2.28k
  } while (CanProceedToBackfillMoreRows(
2169
2.28k
      backfill_params, *backfilled_until, *number_of_rows_processed));
2170
2171
1.93k
  VLOG(1) << "Backfilled " << *number_of_rows_processed << " rows. "
2172
41
          << "Set backfilled_until to "
2173
41
          << (backfilled_until->empty() ? 
"(empty)"0
: b2a_hex(*backfilled_until));
2174
1.93k
  return Status::OK();
2175
1.95k
}
2176
2177
std::vector<yb::ColumnSchema> Tablet::GetColumnSchemasForIndex(
2178
2.43k
    const std::vector<IndexInfo>& indexes) {
2179
2.43k
  std::unordered_set<yb::ColumnId> col_ids_set;
2180
2.43k
  std::vector<yb::ColumnSchema> columns;
2181
2182
12.6k
  for (auto idx : schema()->column_ids()) {
2183
12.6k
    if (schema()->is_key_column(idx)) {
2184
6.75k
      col_ids_set.insert(idx);
2185
6.75k
      auto res = schema()->column_by_id(idx);
2186
6.75k
      if (res) {
2187
6.73k
        columns.push_back(*res);
2188
6.73k
      } else {
2189
18
        LOG(DFATAL) << "Unexpected: cannot find the column in the main table for "
2190
18
                    << idx;
2191
18
      }
2192
6.75k
    }
2193
12.6k
  }
2194
2.46k
  for (const IndexInfo& idx : indexes) {
2195
10.0k
    for (const auto& idx_col : idx.columns()) {
2196
10.0k
      if (col_ids_set.find(idx_col.indexed_column_id) == col_ids_set.end()) {
2197
3.22k
        col_ids_set.insert(idx_col.indexed_column_id);
2198
3.22k
        auto res = schema()->column_by_id(idx_col.indexed_column_id);
2199
3.23k
        if (
res3.22k
) {
2200
3.23k
          columns.push_back(*res);
2201
18.4E
        } else {
2202
18.4E
          LOG(DFATAL) << "Unexpected: cannot find the column in the main table for "
2203
18.4E
                      << idx_col.indexed_column_id;
2204
18.4E
        }
2205
3.22k
      }
2206
10.0k
    }
2207
2.46k
    if (idx.where_predicate_spec()) {
2208
1.42k
      for (const auto col_in_pred : idx.where_predicate_spec()->column_ids()) {
2209
1.42k
        ColumnId col_id_in_pred(col_in_pred);
2210
1.42k
        if (col_ids_set.find(col_id_in_pred) == col_ids_set.end()) {
2211
214
          col_ids_set.insert(col_id_in_pred);
2212
214
          auto res = schema()->column_by_id(col_id_in_pred);
2213
216
          if (
res214
) {
2214
216
            columns.push_back(*res);
2215
18.4E
          } else {
2216
18.4E
            LOG(DFATAL) << "Unexpected: cannot find the column in the main table for " <<
2217
18.4E
              col_id_in_pred;
2218
18.4E
          }
2219
214
        }
2220
1.42k
      }
2221
1.13k
    }
2222
2.46k
  }
2223
2.43k
  return columns;
2224
2.43k
}
2225
2226
namespace {
2227
2228
2.43k
std::vector<TableId> GetIndexIds(const std::vector<IndexInfo>& indexes) {
2229
2.43k
  std::vector<TableId> index_ids;
2230
2.45k
  for (const IndexInfo& idx : indexes) {
2231
2.45k
    index_ids.push_back(idx.table_id());
2232
2.45k
  }
2233
2.43k
  return index_ids;
2234
2.43k
}
2235
2236
template <typename SomeVector>
2237
void SleepToThrottleRate(
2238
0
    SomeVector* index_requests, int32 row_access_rate_per_sec, CoarseTimePoint* last_flushed_at) {
2239
0
  auto now = CoarseMonoClock::Now();
2240
0
  if (row_access_rate_per_sec > 0) {
2241
0
    auto duration_since_last_batch = MonoDelta(now - *last_flushed_at);
2242
0
    auto expected_duration_ms =
2243
0
        MonoDelta::FromMilliseconds(index_requests->size() * 1000 / row_access_rate_per_sec);
2244
0
    DVLOG(3) << "Duration since last batch " << duration_since_last_batch << " expected duration "
2245
0
             << expected_duration_ms
2246
0
             << " extra time so sleep: " << expected_duration_ms - duration_since_last_batch;
2247
0
    if (duration_since_last_batch < expected_duration_ms) {
2248
0
      SleepFor(expected_duration_ms - duration_since_last_batch);
2249
0
    }
2250
0
  }
2251
0
}
2252
2253
Result<client::YBTablePtr> GetTable(
2254
4.91k
    const TableId& table_id, const std::shared_ptr<client::YBMetaDataCache>& metadata_cache) {
2255
  // TODO create async version of GetTable.
2256
  // It is ok to have sync call here, because we use cache and it should not take too long.
2257
4.91k
  client::YBTablePtr index_table;
2258
4.91k
  bool cache_used_ignored = false;
2259
4.91k
  RETURN_NOT_OK(metadata_cache->GetTable(table_id, &index_table, &cache_used_ignored));
2260
4.91k
  return index_table;
2261
4.91k
}
2262
2263
}  // namespace
2264
2265
// Should backfill the index with the information contained in this tablet.
2266
// Assume that we are already in the Backfilling mode.
2267
Status Tablet::BackfillIndexes(
2268
    const std::vector<IndexInfo>& indexes,
2269
    const std::string& backfill_from,
2270
    const CoarseTimePoint deadline,
2271
    const HybridTime read_time,
2272
    size_t* number_of_rows_processed,
2273
    std::string* backfilled_until,
2274
2.43k
    std::unordered_set<TableId>* failed_indexes) {
2275
2.43k
  TRACE(__func__);
2276
2.43k
  if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_by_ms > 0)) {
2277
155
    TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_by_ms);
2278
155
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_by_ms));
2279
155
  }
2280
2.43k
  VLOG
(2) << "Begin BackfillIndexes at " << read_time << " for " << AsString(indexes)2
;
2281
2282
2.43k
  std::vector<TableId> index_ids = GetIndexIds(indexes);
2283
2.43k
  std::vector<yb::ColumnSchema> columns = GetColumnSchemasForIndex(indexes);
2284
2285
2.43k
  Schema projection(columns, {}, schema()->num_key_columns());
2286
2.43k
  auto iter = VERIFY_RESULT(NewRowIterator(
2287
2.43k
      projection, ReadHybridTime::SingleTime(read_time), "" /* table_id */, deadline));
2288
0
  QLTableRow row;
2289
2.43k
  std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>> index_requests;
2290
2291
2.43k
  BackfillParams backfill_params{deadline};
2292
2.43k
  constexpr auto kProgressInterval = 1000;
2293
2294
2.43k
  if (!backfill_from.empty()) {
2295
97
    VLOG
(1) << "Resuming backfill from " << b2a_hex(backfill_from)0
;
2296
97
    *backfilled_until = backfill_from;
2297
97
    RETURN_NOT_OK(iter->SeekTuple(Slice(backfill_from)));
2298
97
  }
2299
2300
2.43k
  string resume_backfill_from;
2301
2.43k
  *number_of_rows_processed = 0;
2302
2.43k
  int TEST_number_rows_corrupted = 0;
2303
2.43k
  int TEST_number_rows_dropped = 0;
2304
2305
2.62k
  while (VERIFY_RESULT(iter->HasNext())) {
2306
2.62k
    if (index_requests.empty()) {
2307
390
      *backfilled_until = VERIFY_RESULT(iter->GetTupleId()).ToBuffer();
2308
0
      MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed);
2309
390
    }
2310
2311
2.62k
    if (!CanProceedToBackfillMoreRows(backfill_params, *number_of_rows_processed)) {
2312
96
      resume_backfill_from = VERIFY_RESULT(iter->GetTupleId()).ToBuffer();
2313
0
      break;
2314
96
    }
2315
2316
2.52k
    RETURN_NOT_OK(iter->NextRow(&row));
2317
2.52k
    if (FLAGS_TEST_backfill_sabotage_frequency > 0 &&
2318
2.52k
        
*number_of_rows_processed % FLAGS_TEST_backfill_sabotage_frequency == 00
) {
2319
0
      VLOG(1) << "Corrupting fetched row: " << row.ToString();
2320
      // Corrupt first key column, since index should not be built on primary key
2321
0
      row.MarkTombstoned(schema()->column_id(0));
2322
0
      TEST_number_rows_corrupted++;
2323
0
    }
2324
2325
2.52k
    if (FLAGS_TEST_backfill_drop_frequency > 0 &&
2326
2.52k
        
*number_of_rows_processed % FLAGS_TEST_backfill_drop_frequency == 00
) {
2327
0
      (*number_of_rows_processed)++;
2328
0
      VLOG(1) << "Dropping fetched row: " << row.ToString();
2329
0
      TEST_number_rows_dropped++;
2330
0
      continue;
2331
0
    }
2332
2333
18.4E
    DVLOG(2) << "Building index for fetched row: " << row.ToString();
2334
2.52k
    RETURN_NOT_OK(UpdateIndexInBatches(
2335
2.52k
        row, indexes, read_time, backfill_params.deadline, &index_requests,
2336
2.52k
        failed_indexes));
2337
2338
2.52k
    if (++(*number_of_rows_processed) % kProgressInterval == 0) {
2339
0
      VLOG(1) << "Processed " << *number_of_rows_processed << " rows";
2340
0
    }
2341
2.52k
  }
2342
2343
2.43k
  if (FLAGS_TEST_backfill_sabotage_frequency > 0) {
2344
0
    LOG(INFO) << "In total, " << TEST_number_rows_corrupted
2345
0
              << " rows were corrupted in index backfill.";
2346
0
  }
2347
2348
2.43k
  if (FLAGS_TEST_backfill_drop_frequency > 0) {
2349
0
    LOG(INFO) << "In total, " << TEST_number_rows_dropped
2350
0
              << " rows were dropped in index backfill.";
2351
0
  }
2352
2353
18.4E
  VLOG(1) << "Processed " << *number_of_rows_processed << " rows";
2354
2.43k
  RETURN_NOT_OK(FlushWriteIndexBatch(
2355
2.43k
      read_time, backfill_params.deadline, &index_requests, failed_indexes));
2356
2.42k
  MaybeSleepToThrottleBackfill(backfill_params.start_time, *number_of_rows_processed);
2357
2.42k
  *backfilled_until = resume_backfill_from;
2358
2.42k
  LOG(INFO) << "Done BackfillIndexes at " << read_time << " for " << AsString(index_ids)
2359
2.42k
            << " until "
2360
2.42k
            << (backfilled_until->empty() ? 
"<end of the tablet>"2.33k
:
b2a_hex(*backfilled_until)95
);
2361
2.42k
  return Status::OK();
2362
2.43k
}
2363
2364
Status Tablet::UpdateIndexInBatches(
2365
    const QLTableRow& row,
2366
    const std::vector<IndexInfo>& indexes,
2367
    const HybridTime write_time,
2368
    const CoarseTimePoint deadline,
2369
    std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests,
2370
2.53k
    std::unordered_set<TableId>* failed_indexes) {
2371
2.53k
  const QLTableRow& kEmptyRow = QLTableRow::empty_row();
2372
2.53k
  QLExprExecutor expr_executor;
2373
2374
4.96k
  for (const IndexInfo& index : indexes) {
2375
4.96k
    QLWriteRequestPB* const index_request = VERIFY_RESULT(
2376
4.96k
        docdb::CreateAndSetupIndexInsertRequest(
2377
4.96k
            &expr_executor, /* index_has_write_permission */ true,
2378
4.96k
            kEmptyRow, row, &index, index_requests));
2379
4.96k
    if (index_request)
2380
4.91k
      index_request->set_is_backfill(true);
2381
4.96k
  }
2382
2383
  // Update the index write op.
2384
2.53k
  return FlushWriteIndexBatchIfRequired(write_time, deadline, index_requests, failed_indexes);
2385
2.53k
}
2386
2387
Result<std::shared_ptr<YBSession>> Tablet::GetSessionForVerifyOrBackfill(
2388
2.50k
    const CoarseTimePoint deadline) {
2389
2.50k
  if (!client_future_.valid()) {
2390
0
    return STATUS_FORMAT(IllegalState, "Client future is not set up for $0", tablet_id());
2391
0
  }
2392
2393
2.50k
  auto client = client_future_.get();
2394
2.50k
  auto session = std::make_shared<YBSession>(client);
2395
2.50k
  session->SetDeadline(deadline);
2396
2.50k
  return session;
2397
2.50k
}
2398
2399
Status Tablet::FlushWriteIndexBatchIfRequired(
2400
    const HybridTime write_time,
2401
    const CoarseTimePoint deadline,
2402
    std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests,
2403
2.53k
    std::unordered_set<TableId>* failed_indexes) {
2404
2.53k
  if (index_requests->size() < FLAGS_backfill_index_write_batch_size) {
2405
2.45k
    return Status::OK();
2406
2.45k
  }
2407
73
  return FlushWriteIndexBatch(write_time, deadline, index_requests, failed_indexes);
2408
2.53k
}
2409
2410
Status Tablet::FlushWriteIndexBatch(
2411
    const HybridTime write_time,
2412
    const CoarseTimePoint deadline,
2413
    std::vector<std::pair<const IndexInfo*, QLWriteRequestPB>>* index_requests,
2414
2.50k
    std::unordered_set<TableId>* failed_indexes) {
2415
2.50k
  if (!client_future_.valid()) {
2416
0
    return STATUS_FORMAT(IllegalState, "Client future is not set up for $0", tablet_id());
2417
2.50k
  } else if (!YBMetaDataCache()) {
2418
0
    return STATUS(IllegalState, "Table metadata cache is not present for index update");
2419
0
  }
2420
2.50k
  std::shared_ptr<YBSession> session = VERIFY_RESULT(GetSessionForVerifyOrBackfill(deadline));
2421
2422
0
  std::unordered_set<
2423
2.50k
      client::YBqlWriteOpPtr, client::YBqlWritePrimaryKeyComparator,
2424
2.50k
      client::YBqlWritePrimaryKeyComparator>
2425
2.50k
      ops_by_primary_key;
2426
2.50k
  std::vector<shared_ptr<client::YBqlWriteOp>> write_ops;
2427
2428
2.50k
  constexpr int kMaxNumRetries = 10;
2429
2.50k
  auto metadata_cache = YBMetaDataCache();
2430
2431
4.91k
  for (auto& pair : *index_requests) {
2432
4.91k
    client::YBTablePtr index_table =
2433
4.91k
        VERIFY_RESULT(GetTable(pair.first->table_id(), metadata_cache));
2434
2435
0
    shared_ptr<client::YBqlWriteOp> index_op(index_table->NewQLWrite());
2436
4.91k
    index_op->set_write_time_for_backfill(write_time);
2437
4.91k
    index_op->mutable_request()->Swap(&pair.second);
2438
4.91k
    if (index_table->IsUniqueIndex()) {
2439
547
      if (ops_by_primary_key.count(index_op) > 0) {
2440
4
        VLOG(2) << "Splitting the batch of writes because " << index_op->ToString()
2441
0
                << " collides with an existing update in this batch.";
2442
4
        VLOG
(1) << "Flushing " << ops_by_primary_key.size() << " ops to the index"0
;
2443
4
        RETURN_NOT_OK(FlushWithRetries(session, write_ops, kMaxNumRetries, failed_indexes));
2444
3
        VLOG
(3) << "Done flushing ops to the index"0
;
2445
3
        ops_by_primary_key.clear();
2446
3
      }
2447
546
      ops_by_primary_key.insert(index_op);
2448
546
    }
2449
4.91k
    session->Apply(index_op);
2450
4.91k
    write_ops.push_back(index_op);
2451
4.91k
  }
2452
2453
2.50k
  VLOG(1) << Format("Flushing $0 ops to the index",
2454
4
                    (!ops_by_primary_key.empty() ? 
ops_by_primary_key.size()0
2455
4
                                                 : write_ops.size()));
2456
2.50k
  RETURN_NOT_OK(FlushWithRetries(session, write_ops, kMaxNumRetries, failed_indexes));
2457
2.50k
  index_requests->clear();
2458
2459
2.50k
  return Status::OK();
2460
2.50k
}
2461
2462
template <typename SomeYBqlOp>
2463
Status Tablet::FlushWithRetries(
2464
    shared_ptr<YBSession> session,
2465
    const std::vector<shared_ptr<SomeYBqlOp>>& index_ops,
2466
    int num_retries,
2467
2.50k
    std::unordered_set<TableId>* failed_indexes) {
2468
2.50k
  auto retries_left = num_retries;
2469
2.50k
  std::vector<std::shared_ptr<SomeYBqlOp>> pending_ops = index_ops;
2470
2.50k
  std::unordered_map<string, int32_t> error_msg_cnts;
2471
2.50k
  do {
2472
2.50k
    std::vector<std::shared_ptr<SomeYBqlOp>> failed_ops;
2473
2.50k
    RETURN_NOT_OK_PREPEND(session->Flush(), "Flush failed.");
2474
18.4E
    VLOG(3) << "Done flushing ops to the index";
2475
4.92k
    for (auto index_op : pending_ops) {
2476
4.92k
      if (index_op->response().status() == QLResponsePB::YQL_STATUS_OK) {
2477
4.91k
        continue;
2478
4.91k
      }
2479
2480
5
      VLOG(2) << "Got response " << AsString(index_op->response()) << " for "
2481
0
              << AsString(index_op->request());
2482
5
      if (index_op->response().status() != QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) {
2483
5
        failed_indexes->insert(index_op->table()->id());
2484
5
        const string& error_message = index_op->response().error_message();
2485
5
        error_msg_cnts[error_message]++;
2486
5
        
VLOG_WITH_PREFIX0
(3) << "Failing index " << index_op->table()->id()
2487
0
                            << " due to non-retryable errors " << error_message;
2488
5
        continue;
2489
5
      }
2490
2491
0
      failed_ops.push_back(index_op);
2492
0
      session->Apply(index_op);
2493
0
    }
2494
2495
2.50k
    if (!failed_ops.empty()) {
2496
0
      VLOG(1) << Format("Flushing $0 failed ops again to the index", failed_ops.size());
2497
0
    }
2498
2.50k
    pending_ops = std::move(failed_ops);
2499
2.50k
  } while (!pending_ops.empty() && 
--retries_left > 00
);
2500
2501
2.50k
  if (!failed_indexes->empty()) {
2502
5
    
VLOG_WITH_PREFIX0
(1) << "Failed due to non-retryable errors " << AsString(*failed_indexes)0
;
2503
5
  }
2504
2.50k
  if (!pending_ops.empty()) {
2505
0
    for (auto index_op : pending_ops) {
2506
0
      failed_indexes->insert(index_op->table()->id());
2507
0
      const string& error_message = index_op->response().error_message();
2508
0
      error_msg_cnts[error_message]++;
2509
0
    }
2510
0
    VLOG_WITH_PREFIX(1) << "Failed indexes including retryable and non-retryable errors are "
2511
0
                        << AsString(*failed_indexes);
2512
0
  }
2513
2.50k
  return (
2514
2.50k
      failed_indexes->empty()
2515
2.50k
          ? 
Status::OK()2.50k
2516
2.50k
          : 
STATUS_SUBSTITUTE2
(
2517
2.50k
                IllegalState, "Index op failed for $0 requests after $1 retries with errors: $2",
2518
2.50k
                pending_ops.size(), num_retries, AsString(error_msg_cnts)));
2519
2.50k
}
yb::Status yb::tablet::Tablet::FlushWithRetries<yb::client::YBqlWriteOp>(std::__1::shared_ptr<yb::client::YBSession>, std::__1::vector<std::__1::shared_ptr<yb::client::YBqlWriteOp>, std::__1::allocator<std::__1::shared_ptr<yb::client::YBqlWriteOp> > > const&, int, std::__1::unordered_set<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >*)
Line
Count
Source
2467
2.50k
    std::unordered_set<TableId>* failed_indexes) {
2468
2.50k
  auto retries_left = num_retries;
2469
2.50k
  std::vector<std::shared_ptr<SomeYBqlOp>> pending_ops = index_ops;
2470
2.50k
  std::unordered_map<string, int32_t> error_msg_cnts;
2471
2.50k
  do {
2472
2.50k
    std::vector<std::shared_ptr<SomeYBqlOp>> failed_ops;
2473
2.50k
    RETURN_NOT_OK_PREPEND(session->Flush(), "Flush failed.");
2474
18.4E
    VLOG(3) << "Done flushing ops to the index";
2475
4.92k
    for (auto index_op : pending_ops) {
2476
4.92k
      if (index_op->response().status() == QLResponsePB::YQL_STATUS_OK) {
2477
4.91k
        continue;
2478
4.91k
      }
2479
2480
5
      VLOG(2) << "Got response " << AsString(index_op->response()) << " for "
2481
0
              << AsString(index_op->request());
2482
5
      if (index_op->response().status() != QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) {
2483
5
        failed_indexes->insert(index_op->table()->id());
2484
5
        const string& error_message = index_op->response().error_message();
2485
5
        error_msg_cnts[error_message]++;
2486
5
        
VLOG_WITH_PREFIX0
(3) << "Failing index " << index_op->table()->id()
2487
0
                            << " due to non-retryable errors " << error_message;
2488
5
        continue;
2489
5
      }
2490
2491
0
      failed_ops.push_back(index_op);
2492
0
      session->Apply(index_op);
2493
0
    }
2494
2495
2.50k
    if (!failed_ops.empty()) {
2496
0
      VLOG(1) << Format("Flushing $0 failed ops again to the index", failed_ops.size());
2497
0
    }
2498
2.50k
    pending_ops = std::move(failed_ops);
2499
2.50k
  } while (!pending_ops.empty() && 
--retries_left > 00
);
2500
2501
2.50k
  if (!failed_indexes->empty()) {
2502
5
    
VLOG_WITH_PREFIX0
(1) << "Failed due to non-retryable errors " << AsString(*failed_indexes)0
;
2503
5
  }
2504
2.50k
  if (!pending_ops.empty()) {
2505
0
    for (auto index_op : pending_ops) {
2506
0
      failed_indexes->insert(index_op->table()->id());
2507
0
      const string& error_message = index_op->response().error_message();
2508
0
      error_msg_cnts[error_message]++;
2509
0
    }
2510
0
    VLOG_WITH_PREFIX(1) << "Failed indexes including retryable and non-retryable errors are "
2511
0
                        << AsString(*failed_indexes);
2512
0
  }
2513
2.50k
  return (
2514
2.50k
      failed_indexes->empty()
2515
2.50k
          ? 
Status::OK()2.50k
2516
2.50k
          : 
STATUS_SUBSTITUTE2
(
2517
2.50k
                IllegalState, "Index op failed for $0 requests after $1 retries with errors: $2",
2518
2.50k
                pending_ops.size(), num_retries, AsString(error_msg_cnts)));
2519
2.50k
}
Unexecuted instantiation: yb::Status yb::tablet::Tablet::FlushWithRetries<yb::client::YBqlReadOp>(std::__1::shared_ptr<yb::client::YBSession>, std::__1::vector<std::__1::shared_ptr<yb::client::YBqlReadOp>, std::__1::allocator<std::__1::shared_ptr<yb::client::YBqlReadOp> > > const&, int, std::__1::unordered_set<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::hash<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::equal_to<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >*)
2520
2521
Status Tablet::VerifyIndexTableConsistencyForCQL(
2522
    const std::vector<IndexInfo>& indexes,
2523
    const std::string& start_key,
2524
    const int num_rows,
2525
    const CoarseTimePoint deadline,
2526
    const HybridTime read_time,
2527
    std::unordered_map<TableId, uint64>* consistency_stats,
2528
0
    std::string* verified_until) {
2529
0
  std::vector<TableId> index_ids = GetIndexIds(indexes);
2530
0
  std::vector<yb::ColumnSchema> columns = GetColumnSchemasForIndex(indexes);
2531
0
  return VerifyTableConsistencyForCQL(
2532
0
      index_ids, columns, start_key, num_rows, deadline, read_time, false, consistency_stats,
2533
0
      verified_until);
2534
0
}
2535
2536
Status Tablet::VerifyMainTableConsistencyForCQL(
2537
    const TableId& main_table_id,
2538
    const std::string& start_key,
2539
    const int num_rows,
2540
    const CoarseTimePoint deadline,
2541
    const HybridTime read_time,
2542
    std::unordered_map<TableId, uint64>* consistency_stats,
2543
0
    std::string* verified_until) {
2544
0
  const std::vector<yb::ColumnSchema>& columns = schema()->columns();
2545
0
  const std::vector<TableId>& table_ids = {main_table_id};
2546
0
  return VerifyTableConsistencyForCQL(
2547
0
      table_ids, columns, start_key, num_rows, deadline, read_time, true, consistency_stats,
2548
0
      verified_until);
2549
0
}
2550
2551
Status Tablet::VerifyTableConsistencyForCQL(
2552
    const std::vector<TableId>& table_ids,
2553
    const std::vector<yb::ColumnSchema>& columns,
2554
    const std::string& start_key,
2555
    const int num_rows,
2556
    const CoarseTimePoint deadline,
2557
    const HybridTime read_time,
2558
    const bool is_main_table,
2559
    std::unordered_map<TableId, uint64>* consistency_stats,
2560
0
    std::string* verified_until) {
2561
0
  Schema projection(columns, {}, schema()->num_key_columns());
2562
0
  auto iter = VERIFY_RESULT(NewRowIterator(
2563
0
      projection, ReadHybridTime::SingleTime(read_time), "" /* table_id */, deadline));
2564
2565
0
  if (!start_key.empty()) {
2566
0
    VLOG(2) << "Starting verify index from " << b2a_hex(start_key);
2567
0
    RETURN_NOT_OK(iter->SeekTuple(Slice(start_key)));
2568
0
  }
2569
2570
0
  constexpr int kProgressInterval = 1000;
2571
0
  CoarseTimePoint last_flushed_at;
2572
2573
0
  QLTableRow row;
2574
0
  std::vector<std::pair<const TableId, QLReadRequestPB>> requests;
2575
0
  std::unordered_set<TableId> failed_indexes;
2576
0
  std::string resume_verified_from;
2577
2578
0
  int rows_verified = 0;
2579
0
  while (VERIFY_RESULT(iter->HasNext()) && rows_verified < num_rows &&
2580
0
         CoarseMonoClock::Now() < deadline) {
2581
0
    resume_verified_from = VERIFY_RESULT(iter->GetTupleId()).ToBuffer();
2582
0
    RETURN_NOT_OK(iter->NextRow(&row));
2583
0
    VLOG(1) << "Verifying index for main table row: " << row.ToString();
2584
2585
0
    RETURN_NOT_OK(VerifyTableInBatches(
2586
0
        row, table_ids, read_time, deadline, is_main_table, &requests, &last_flushed_at,
2587
0
        &failed_indexes, consistency_stats));
2588
0
    if (++rows_verified % kProgressInterval == 0) {
2589
0
      VLOG(1) << "Verified " << rows_verified << " rows";
2590
0
    }
2591
0
    *verified_until = resume_verified_from;
2592
0
  }
2593
0
  return FlushVerifyBatch(
2594
0
      read_time, deadline, &requests, &last_flushed_at, &failed_indexes, consistency_stats);
2595
0
}
2596
2597
namespace {
2598
2599
0
QLConditionPB* InitWhereOp(QLReadRequestPB* req) {
2600
  // Add the hash column values
2601
0
  DCHECK(req->hashed_column_values().empty());
2602
2603
  // Add the range column values to the where clause
2604
0
  QLConditionPB* where_pb = req->mutable_where_expr()->mutable_condition();
2605
0
  if (!where_pb->has_op()) {
2606
0
    where_pb->set_op(QL_OP_AND);
2607
0
  }
2608
0
  DCHECK_EQ(where_pb->op(), QL_OP_AND);
2609
0
  return where_pb;
2610
0
}
2611
2612
0
void SetSelectedExprToTrue(QLReadRequestPB* req) {
2613
  // Set TRUE as selected exprs helps reduce
2614
  // the need for row retrieval in the index read request
2615
0
  req->add_selected_exprs()->mutable_value()->set_bool_value(true);
2616
0
  QLRSRowDescPB* rsrow_desc = req->mutable_rsrow_desc();
2617
0
  QLRSColDescPB* rscol_desc = rsrow_desc->add_rscol_descs();
2618
0
  rscol_desc->set_name("1");
2619
0
  rscol_desc->mutable_ql_type()->set_main(yb::DataType::BOOL);
2620
0
}
2621
2622
Status WhereMainTableToPB(
2623
    const QLTableRow& key,
2624
    const IndexInfo& index_info,
2625
    const Schema& main_table_schema,
2626
0
    QLReadRequestPB* req) {
2627
0
  std::unordered_map<ColumnId, ColumnId> column_id_map;
2628
0
  for (const auto& col : index_info.columns()) {
2629
0
    column_id_map.insert({col.indexed_column_id, col.column_id});
2630
0
  }
2631
2632
0
  auto column_refs = req->mutable_column_refs();
2633
0
  QLConditionPB* where_pb = InitWhereOp(req);
2634
2635
0
  for (const auto& col_id : main_table_schema.column_ids()) {
2636
0
    if (main_table_schema.is_hash_key_column(col_id)) {
2637
0
      *req->add_hashed_column_values()->mutable_value() = *key.GetValue(column_id_map[col_id]);
2638
0
      column_refs->add_ids(col_id);
2639
0
    } else {
2640
0
      auto it = column_id_map.find(col_id);
2641
0
      if (it != column_id_map.end()) {
2642
0
        QLConditionPB* col_cond_pb = where_pb->add_operands()->mutable_condition();
2643
0
        col_cond_pb->set_op(QL_OP_EQUAL);
2644
0
        col_cond_pb->add_operands()->set_column_id(col_id);
2645
0
        *col_cond_pb->add_operands()->mutable_value() = *key.GetValue(it->second);
2646
0
        column_refs->add_ids(col_id);
2647
0
      }
2648
0
    }
2649
0
  }
2650
2651
0
  SetSelectedExprToTrue(req);
2652
0
  return Status::OK();
2653
0
}
2654
2655
// Schema is index schema while key is row from main table
2656
Status WhereIndexToPB(
2657
    const QLTableRow& key,
2658
    const IndexInfo& index_info,
2659
    const Schema& schema,
2660
0
    QLReadRequestPB* req) {
2661
0
  QLConditionPB* where_pb = InitWhereOp(req);
2662
0
  auto column_refs = req->mutable_column_refs();
2663
2664
0
  for (size_t idx = 0; idx < index_info.columns().size(); idx++) {
2665
0
    const ColumnId& column_id = index_info.column(idx).column_id;
2666
0
    const ColumnId& indexed_column_id = index_info.column(idx).indexed_column_id;
2667
0
    if (schema.is_hash_key_column(column_id)) {
2668
0
      *req->add_hashed_column_values()->mutable_value() = *key.GetValue(indexed_column_id);
2669
0
    } else {
2670
0
      QLConditionPB* col_cond_pb = where_pb->add_operands()->mutable_condition();
2671
0
      col_cond_pb->set_op(QL_OP_EQUAL);
2672
0
      col_cond_pb->add_operands()->set_column_id(column_id);
2673
0
      *col_cond_pb->add_operands()->mutable_value() = *key.GetValue(indexed_column_id);
2674
0
    }
2675
0
    column_refs->add_ids(column_id);
2676
0
  }
2677
2678
0
  SetSelectedExprToTrue(req);
2679
0
  return Status::OK();
2680
0
}
2681
2682
}  // namespace
2683
2684
Status Tablet::VerifyTableInBatches(
2685
    const QLTableRow& row,
2686
    const std::vector<TableId>& table_ids,
2687
    const HybridTime read_time,
2688
    const CoarseTimePoint deadline,
2689
    const bool is_main_table,
2690
    std::vector<std::pair<const TableId, QLReadRequestPB>>* requests,
2691
    CoarseTimePoint* last_flushed_at,
2692
    std::unordered_set<TableId>* failed_indexes,
2693
0
    std::unordered_map<TableId, uint64>* consistency_stats) {
2694
0
  auto client = client_future_.get();
2695
0
  auto local_index_info = metadata_->primary_table_info()->index_info.get();
2696
0
  for (const TableId& table_id : table_ids) {
2697
0
    std::shared_ptr<client::YBTable> table;
2698
0
    RETURN_NOT_OK(client->OpenTable(table_id, &table));
2699
0
    std::shared_ptr<client::YBqlReadOp> read_op(table->NewQLSelect());
2700
2701
0
    QLReadRequestPB* req = read_op->mutable_request();
2702
0
    if (is_main_table) {
2703
0
      RETURN_NOT_OK(WhereMainTableToPB(row, *local_index_info, table->InternalSchema(), req));
2704
0
    } else {
2705
0
      RETURN_NOT_OK(WhereIndexToPB(row, table->index_info(), table->InternalSchema(), req));
2706
0
    }
2707
2708
0
    requests->emplace_back(table_id, *req);
2709
0
  }
2710
2711
0
  return FlushVerifyBatchIfRequired(
2712
0
      read_time, deadline, requests, last_flushed_at, failed_indexes, consistency_stats);
2713
0
}
2714
2715
Status Tablet::FlushVerifyBatchIfRequired(
2716
    const HybridTime read_time,
2717
    const CoarseTimePoint deadline,
2718
    std::vector<std::pair<const TableId, QLReadRequestPB>>* requests,
2719
    CoarseTimePoint* last_flushed_at,
2720
    std::unordered_set<TableId>* failed_indexes,
2721
0
    std::unordered_map<TableId, uint64>* consistency_stats) {
2722
0
  if (requests->size() < FLAGS_verify_index_read_batch_size) {
2723
0
    return Status::OK();
2724
0
  }
2725
0
  return FlushVerifyBatch(
2726
0
      read_time, deadline, requests, last_flushed_at, failed_indexes, consistency_stats);
2727
0
}
2728
2729
Status Tablet::FlushVerifyBatch(
2730
    const HybridTime read_time,
2731
    const CoarseTimePoint deadline,
2732
    std::vector<std::pair<const TableId, QLReadRequestPB>>* requests,
2733
    CoarseTimePoint* last_flushed_at,
2734
    std::unordered_set<TableId>* failed_indexes,
2735
0
    std::unordered_map<TableId, uint64>* consistency_stats) {
2736
0
  std::vector<client::YBqlReadOpPtr> read_ops;
2737
0
  std::shared_ptr<YBSession> session = VERIFY_RESULT(GetSessionForVerifyOrBackfill(deadline));
2738
2739
0
  auto client = client_future_.get();
2740
0
  for (auto& pair : *requests) {
2741
0
    client::YBTablePtr table;
2742
0
    RETURN_NOT_OK(client->OpenTable(pair.first, &table));
2743
2744
0
    client::YBqlReadOpPtr read_op(table->NewQLRead());
2745
0
    read_op->mutable_request()->Swap(&pair.second);
2746
0
    read_op->SetReadTime(ReadHybridTime::SingleTime(read_time));
2747
2748
0
    session->Apply(read_op);
2749
2750
    // Note: always emplace at tail because row keys must
2751
    // correspond sequentially with the read_ops in the vector
2752
0
    read_ops.push_back(read_op);
2753
0
  }
2754
2755
0
  RETURN_NOT_OK(FlushWithRetries(session, read_ops, 0, failed_indexes));
2756
2757
0
  for (size_t idx = 0; idx < requests->size(); idx++) {
2758
0
    const client::YBqlReadOpPtr& read_op = read_ops[idx];
2759
0
    auto row_block = read_op->MakeRowBlock();
2760
0
    if (row_block && row_block->row_count() == 1) continue;
2761
0
    (*consistency_stats)[read_op->table()->id()]++;
2762
0
  }
2763
2764
0
  SleepToThrottleRate(requests, FLAGS_verify_index_rate_rows_per_sec, last_flushed_at);
2765
0
  *last_flushed_at = CoarseMonoClock::Now();
2766
0
  requests->clear();
2767
2768
0
  return Status::OK();
2769
0
}
2770
2771
ScopedRWOperationPause Tablet::PauseReadWriteOperations(
2772
566k
    const Abortable abortable, const Stop stop) {
2773
566k
  VTRACE(1, LogPrefix());
2774
566k
  LOG_SLOW_EXECUTION(WARNING, 1000,
2775
567k
                     Substitute("$0Waiting for pending ops to complete", LogPrefix())) {
2776
567k
    return ScopedRWOperationPause(
2777
567k
        abortable ? 
&pending_abortable_op_counter_249k
:
&pending_non_abortable_op_counter_317k
,
2778
567k
        CoarseMonoClock::Now() +
2779
567k
            MonoDelta::FromMilliseconds(FLAGS_tablet_rocksdb_ops_quiet_down_timeout_ms),
2780
567k
        stop);
2781
567k
  }
2782
18.4E
  FATAL_ERROR("Unreachable code -- the previous block must always return");
2783
18.4E
}
2784
2785
139
ScopedRWOperation Tablet::CreateAbortableScopedRWOperation(const CoarseTimePoint deadline) const {
2786
139
  return ScopedRWOperation(&pending_abortable_op_counter_, deadline);
2787
139
}
2788
2789
ScopedRWOperation Tablet::CreateNonAbortableScopedRWOperation(
2790
159M
    const CoarseTimePoint deadline) const {
2791
159M
  return ScopedRWOperation(&pending_non_abortable_op_counter_, deadline);
2792
159M
}
2793
2794
Status Tablet::ModifyFlushedFrontier(
2795
    const docdb::ConsensusFrontier& frontier,
2796
    rocksdb::FrontierModificationMode mode,
2797
173k
    FlushFlags flags) {
2798
173k
  const Status s = regular_db_->ModifyFlushedFrontier(frontier.Clone(), mode);
2799
173k
  if (PREDICT_FALSE(!s.ok())) {
2800
1
    auto status = STATUS(IllegalState, "Failed to set flushed frontier", s.ToString());
2801
1
    LOG_WITH_PREFIX(WARNING) << status;
2802
1
    return status;
2803
1
  }
2804
173k
  {
2805
173k
    auto flushed_frontier = regular_db_->GetFlushedFrontier();
2806
173k
    const auto& consensus_flushed_frontier = *down_cast<docdb::ConsensusFrontier*>(
2807
173k
        flushed_frontier.get());
2808
173k
    DCHECK_EQ(frontier.op_id(), consensus_flushed_frontier.op_id());
2809
173k
    DCHECK_EQ(frontier.hybrid_time(), consensus_flushed_frontier.hybrid_time());
2810
173k
  }
2811
2812
173k
  if (FLAGS_TEST_tablet_verify_flushed_frontier_after_modifying &&
2813
173k
      
mode == rocksdb::FrontierModificationMode::kForce0
) {
2814
0
    LOG(INFO) << "Verifying that flushed frontier was force-set successfully";
2815
0
    string test_data_dir = VERIFY_RESULT(Env::Default()->GetTestDirectory());
2816
0
    const string checkpoint_dir_for_test = Format(
2817
0
        "$0/test_checkpoint_$1_$2", test_data_dir, tablet_id(), MonoTime::Now().ToUint64());
2818
0
    RETURN_NOT_OK(
2819
0
        rocksdb::checkpoint::CreateCheckpoint(regular_db_.get(), checkpoint_dir_for_test));
2820
0
    auto se = ScopeExit([checkpoint_dir_for_test] {
2821
0
      CHECK_OK(Env::Default()->DeleteRecursively(checkpoint_dir_for_test));
2822
0
    });
2823
0
    rocksdb::Options rocksdb_options;
2824
0
    docdb::InitRocksDBOptions(
2825
0
        &rocksdb_options, LogPrefix(), /* statistics */ nullptr, tablet_options_);
2826
0
    rocksdb_options.create_if_missing = false;
2827
0
    LOG_WITH_PREFIX(INFO) << "Opening the test RocksDB at " << checkpoint_dir_for_test
2828
0
        << ", expecting to see flushed frontier of " << frontier.ToString();
2829
0
    std::unique_ptr<rocksdb::DB> test_db = VERIFY_RESULT(
2830
0
        rocksdb::DB::Open(rocksdb_options, checkpoint_dir_for_test));
2831
0
    LOG_WITH_PREFIX(INFO) << "Getting flushed frontier from test RocksDB at "
2832
0
                          << checkpoint_dir_for_test;
2833
0
    auto restored_flushed_frontier = test_db->GetFlushedFrontier();
2834
0
    if (!restored_flushed_frontier) {
2835
0
      LOG_WITH_PREFIX(FATAL) << LogPrefix() << "Restored flushed frontier not present";
2836
0
    }
2837
0
    CHECK_EQ(
2838
0
        frontier,
2839
0
        down_cast<docdb::ConsensusFrontier&>(*restored_flushed_frontier));
2840
0
    LOG_WITH_PREFIX(INFO) << "Successfully verified persistently stored flushed frontier: "
2841
0
        << frontier.ToString();
2842
0
  }
2843
2844
173k
  if (intents_db_) {
2845
    // It is OK to flush intents even if the regular DB is not yet flushed,
2846
    // because it would wait for flush of regular DB if we have unflushed intents.
2847
    // Otherwise it does not matter which flushed op id is stored.
2848
88.6k
    RETURN_NOT_OK(intents_db_->ModifyFlushedFrontier(frontier.Clone(), mode));
2849
88.6k
  }
2850
2851
173k
  return Flush(FlushMode::kAsync, flags);
2852
173k
}
2853
2854
171k
Status Tablet::Truncate(TruncateOperation* operation) {
2855
171k
  if (metadata_->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
2856
    // We use only Raft log for transaction status table.
2857
0
    return Status::OK();
2858
0
  }
2859
2860
171k
  auto op_pauses = VERIFY_RESULT(StartShutdownRocksDBs(DisableFlushOnShutdown::kTrue));
2861
2862
  // Check if tablet is in shutdown mode.
2863
171k
  if (IsShutdownRequested()) {
2864
0
    return STATUS(IllegalState, "Tablet was shut down");
2865
0
  }
2866
2867
171k
  const rocksdb::SequenceNumber sequence_number = regular_db_->GetLatestSequenceNumber();
2868
171k
  const string db_dir = regular_db_->GetName();
2869
2870
171k
  auto s = CompleteShutdownRocksDBs(Destroy::kTrue, &op_pauses);
2871
171k
  if (PREDICT_FALSE(!s.ok())) {
2872
0
    LOG_WITH_PREFIX(WARNING) << "Failed to clean up db dir " << db_dir << ": " << s;
2873
0
    return STATUS(IllegalState, "Failed to clean up db dir", s.ToString());
2874
0
  }
2875
2876
  // Create a new database.
2877
  // Note: db_dir == metadata()->rocksdb_dir() is still valid db dir.
2878
171k
  s = OpenKeyValueTablet();
2879
171k
  if (PREDICT_FALSE(!s.ok())) {
2880
0
    LOG_WITH_PREFIX(WARNING) << "Failed to create a new db: " << s;
2881
0
    return s;
2882
0
  }
2883
2884
171k
  docdb::ConsensusFrontier frontier;
2885
171k
  frontier.set_op_id(operation->op_id());
2886
171k
  frontier.set_hybrid_time(operation->hybrid_time());
2887
  // We use the kUpdate mode here, because unlike the case of restoring a snapshot to a completely
2888
  // different tablet in an arbitrary Raft group, here there is no possibility of the flushed
2889
  // frontier needing to go backwards.
2890
171k
  RETURN_NOT_OK(ModifyFlushedFrontier(frontier, rocksdb::FrontierModificationMode::kUpdate,
2891
171k
                                      FlushFlags::kAllDbs | FlushFlags::kNoScopedOperation));
2892
2893
171k
  LOG_WITH_PREFIX(INFO) << "Created new db for truncated tablet";
2894
171k
  LOG_WITH_PREFIX(INFO) << "Sequence numbers: old=" << sequence_number
2895
171k
                        << ", new=" << regular_db_->GetLatestSequenceNumber();
2896
  // Ensure that op_pauses stays in scope throughout this function.
2897
343k
  for (auto* op_pause : op_pauses.AsArray()) {
2898
343k
    DFATAL_OR_RETURN_NOT_OK(op_pause->status());
2899
343k
  }
2900
171k
  return DoEnableCompactions();
2901
171k
}
2902
2903
10.6M
void Tablet::UpdateMonotonicCounter(int64_t value) {
2904
10.6M
  int64_t counter = monotonic_counter_;
2905
10.6M
  while (
true10.6M
) {
2906
10.6M
    if (
counter >= value10.6M
) {
2907
10.6M
      break;
2908
10.6M
    }
2909
18.4E
    if (monotonic_counter_.compare_exchange_weak(counter, value)) {
2910
153
      break;
2911
153
    }
2912
18.4E
  }
2913
10.6M
}
2914
2915
////////////////////////////////////////////////////////////
2916
// Tablet
2917
////////////////////////////////////////////////////////////
2918
2919
150k
Result<bool> Tablet::HasSSTables() const {
2920
150k
  if (!regular_db_) {
2921
48.0k
    return false;
2922
48.0k
  }
2923
2924
102k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
2925
102k
  RETURN_NOT_OK(scoped_read_operation);
2926
2927
102k
  std::vector<rocksdb::LiveFileMetaData> live_files_metadata;
2928
102k
  regular_db_->GetLiveFilesMetaData(&live_files_metadata);
2929
102k
  return !live_files_metadata.empty();
2930
102k
}
2931
2932
79.4M
yb::OpId MaxPersistentOpIdForDb(rocksdb::DB* db, bool invalid_if_no_new_data) {
2933
  // A possible race condition could happen, when data is written between this query and
2934
  // actual log gc. But it is not a problem as long as we are reading committed op id
2935
  // before MaxPersistentOpId, since we always keep last committed entry in the log during garbage
2936
  // collection.
2937
  // See TabletPeer::GetEarliestNeededLogIndex
2938
79.4M
  if (db == nullptr ||
2939
79.4M
      
(58.7M
invalid_if_no_new_data58.7M
&&
2940
58.7M
       
db->GetFlushAbility() == rocksdb::FlushAbility::kNoNewData58.7M
)) {
2941
58.7M
    return yb::OpId::Invalid();
2942
58.7M
  }
2943
2944
20.6M
  rocksdb::UserFrontierPtr frontier = db->GetFlushedFrontier();
2945
20.6M
  if (!frontier) {
2946
17.6M
    return yb::OpId();
2947
17.6M
  }
2948
2949
3.01M
  return down_cast<docdb::ConsensusFrontier*>(frontier.get())->op_id();
2950
20.6M
}
2951
2952
39.7M
Result<DocDbOpIds> Tablet::MaxPersistentOpId(bool invalid_if_no_new_data) const {
2953
39.7M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
2954
39.7M
  RETURN_NOT_OK(scoped_read_operation);
2955
2956
39.7M
  return DocDbOpIds{
2957
39.7M
      MaxPersistentOpIdForDb(regular_db_.get(), invalid_if_no_new_data),
2958
39.7M
      MaxPersistentOpIdForDb(intents_db_.get(), invalid_if_no_new_data)
2959
39.7M
  };
2960
39.7M
}
2961
2962
39.7M
void Tablet::FlushIntentsDbIfNecessary(const yb::OpId& lastest_log_entry_op_id) {
2963
39.7M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
2964
39.7M
  if (!scoped_read_operation.ok()) {
2965
0
    return;
2966
0
  }
2967
2968
39.7M
  auto intents_frontier = intents_db_
2969
39.7M
      ? 
MemTableFrontierFromDb(intents_db_.get(), rocksdb::UpdateUserValueType::kLargest)19.0M
:
nullptr20.6M
;
2970
39.7M
  if (intents_frontier) {
2971
2.45M
    auto index_delta =
2972
2.45M
        lastest_log_entry_op_id.index -
2973
2.45M
        down_cast<docdb::ConsensusFrontier*>(intents_frontier.get())->op_id().index;
2974
2.45M
    if (index_delta > FLAGS_num_raft_ops_to_force_idle_intents_db_to_flush) {
2975
4
      auto intents_flush_ability = intents_db_->GetFlushAbility();
2976
4
      if (intents_flush_ability == rocksdb::FlushAbility::kHasNewData) {
2977
4
        LOG_WITH_PREFIX(INFO)
2978
4
            << "Force flushing intents DB since it was not flushed for " << index_delta
2979
4
            << " operations, while only "
2980
4
            << FLAGS_num_raft_ops_to_force_idle_intents_db_to_flush << " is allowed";
2981
4
        rocksdb::FlushOptions options;
2982
4
        options.wait = false;
2983
4
        WARN_NOT_OK(intents_db_->Flush(options), "Flush intents db failed");
2984
4
      }
2985
4
    }
2986
2.45M
  }
2987
39.7M
}
2988
2989
16.8M
bool Tablet::IsTransactionalRequest(bool is_ysql_request) const {
2990
  // We consider all YSQL tables within the sys catalog transactional.
2991
16.8M
  return txns_enabled_ && (
2992
16.8M
      schema()->table_properties().is_transactional() ||
2993
16.8M
          
(15.9M
is_sys_catalog_15.9M
&&
is_ysql_request1.29M
));
2994
16.8M
}
2995
2996
2.79k
Result<HybridTime> Tablet::MaxPersistentHybridTime() const {
2997
2.79k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
2998
2.79k
  RETURN_NOT_OK(scoped_read_operation);
2999
3000
2.79k
  if (!regular_db_) {
3001
559
    return HybridTime::kMin;
3002
559
  }
3003
3004
2.23k
  HybridTime result = HybridTime::kMin;
3005
2.23k
  auto temp = regular_db_->GetFlushedFrontier();
3006
2.23k
  if (temp) {
3007
1.25k
    result.MakeAtLeast(down_cast<docdb::ConsensusFrontier*>(temp.get())->hybrid_time());
3008
1.25k
  }
3009
2.23k
  if (intents_db_) {
3010
237
    temp = intents_db_->GetFlushedFrontier();
3011
237
    if (temp) {
3012
103
      result.MakeAtLeast(down_cast<docdb::ConsensusFrontier*>(temp.get())->hybrid_time());
3013
103
    }
3014
237
  }
3015
2.23k
  return result;
3016
2.79k
}
3017
3018
17.7k
Result<HybridTime> Tablet::OldestMutableMemtableWriteHybridTime() const {
3019
17.7k
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
3020
17.7k
  RETURN_NOT_OK(scoped_read_operation);
3021
3022
17.7k
  HybridTime result = HybridTime::kMax;
3023
35.4k
  for (auto* db : { regular_db_.get(), intents_db_.get() }) {
3024
35.4k
    if (db) {
3025
16.7k
      auto mem_frontier = MemTableFrontierFromDb(db, rocksdb::UpdateUserValueType::kSmallest);
3026
16.7k
      if (mem_frontier) {
3027
7.15k
        const auto hybrid_time =
3028
7.15k
            static_cast<const docdb::ConsensusFrontier&>(*mem_frontier).hybrid_time();
3029
7.15k
        result = std::min(result, hybrid_time);
3030
7.15k
      }
3031
16.7k
    }
3032
35.4k
  }
3033
17.7k
  return result;
3034
17.7k
}
3035
3036
17.3M
const yb::SchemaPtr Tablet::schema() const {
3037
17.3M
  return metadata_->schema();
3038
17.3M
}
3039
3040
0
Status Tablet::DebugDump(vector<string> *lines) {
3041
0
  switch (table_type_) {
3042
0
    case TableType::PGSQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
3043
0
    case TableType::YQL_TABLE_TYPE: FALLTHROUGH_INTENDED;
3044
0
    case TableType::REDIS_TABLE_TYPE:
3045
0
      DocDBDebugDump(lines);
3046
0
      return Status::OK();
3047
0
    case TableType::TRANSACTION_STATUS_TABLE_TYPE:
3048
0
      return Status::OK();
3049
0
  }
3050
0
  FATAL_INVALID_ENUM_VALUE(TableType, table_type_);
3051
0
}
3052
3053
0
void Tablet::DocDBDebugDump(vector<string> *lines) {
3054
0
  LOG_STRING(INFO, lines) << "Dumping tablet:";
3055
0
  LOG_STRING(INFO, lines) << "---------------------------";
3056
0
  docdb::DocDBDebugDump(regular_db_.get(), LOG_STRING(INFO, lines), docdb::StorageDbType::kRegular);
3057
0
}
3058
3059
0
Status Tablet::TEST_SwitchMemtable() {
3060
0
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3061
0
  RETURN_NOT_OK(scoped_operation);
3062
3063
0
  if (regular_db_) {
3064
0
    regular_db_->TEST_SwitchMemtable();
3065
0
  } else {
3066
0
    LOG_WITH_PREFIX(INFO) << "Ignoring TEST_SwitchMemtable: no regular RocksDB";
3067
0
  }
3068
0
  return Status::OK();
3069
0
}
3070
3071
Result<HybridTime> Tablet::DoGetSafeTime(
3072
10.2M
    RequireLease require_lease, HybridTime min_allowed, CoarseTimePoint deadline) const {
3073
10.2M
  if (require_lease == RequireLease::kFalse) {
3074
404k
    return mvcc_.SafeTimeForFollower(min_allowed, deadline);
3075
404k
  }
3076
9.87M
  FixedHybridTimeLease ht_lease;
3077
9.87M
  if (ht_lease_provider_) {
3078
    // This will block until a leader lease reaches the given value or a timeout occurs.
3079
9.85M
    auto ht_lease_result = ht_lease_provider_(min_allowed, deadline);
3080
9.85M
    if (!ht_lease_result.ok()) {
3081
3
      if (require_lease == RequireLease::kFallbackToFollower &&
3082
3
          
ht_lease_result.status().IsIllegalState()0
) {
3083
0
        return mvcc_.SafeTimeForFollower(min_allowed, deadline);
3084
0
      }
3085
3
      return ht_lease_result.status();
3086
3
    }
3087
9.85M
    ht_lease = *ht_lease_result;
3088
9.85M
    if (min_allowed > ht_lease.time) {
3089
0
      return STATUS_FORMAT(
3090
0
          InternalError, "Read request hybrid time after current time: $0, lease: $1",
3091
0
          min_allowed, ht_lease);
3092
0
    }
3093
9.85M
  } else 
if (27.3k
min_allowed27.3k
) {
3094
918
    RETURN_NOT_OK(WaitUntil(clock_.get(), min_allowed, deadline));
3095
918
  }
3096
9.87M
  if (min_allowed > ht_lease.lease) {
3097
0
    return STATUS_FORMAT(
3098
0
        InternalError, "Read request hybrid time after leader lease: $0, lease: $1",
3099
0
        min_allowed, ht_lease);
3100
0
  }
3101
9.87M
  return mvcc_.SafeTime(min_allowed, deadline, ht_lease);
3102
9.87M
}
3103
3104
27.5k
ScopedRWOperationPause Tablet::PauseWritePermits(CoarseTimePoint deadline) {
3105
27.5k
  TRACE("Blocking write permit(s)");
3106
27.5k
  auto se = ScopeExit([] { TRACE("Blocking write permit(s) done"); });
3107
  // Prevent new write ops from being submitted.
3108
27.5k
  return ScopedRWOperationPause(&write_ops_being_submitted_counter_, deadline, Stop::kFalse);
3109
27.5k
}
3110
3111
3.25M
ScopedRWOperation Tablet::GetPermitToWrite(CoarseTimePoint deadline) {
3112
3.25M
  TRACE("Acquiring write permit");
3113
3.26M
  auto se = ScopeExit([] { TRACE("Acquiring write permit done"); });
3114
3.25M
  return ScopedRWOperation(&write_ops_being_submitted_counter_);
3115
3.25M
}
3116
3117
3.09M
Result<bool> Tablet::StillHasOrphanedPostSplitData() {
3118
3.09M
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3119
3.09M
  RETURN_NOT_OK(scoped_operation);
3120
3.09M
  return doc_db().key_bounds->IsInitialized() && 
!metadata()->has_been_fully_compacted()139k
;
3121
3.09M
}
3122
3123
2.35M
bool Tablet::MayHaveOrphanedPostSplitData() {
3124
2.35M
  auto res = StillHasOrphanedPostSplitData();
3125
2.35M
  if (!res.ok()) {
3126
214
    LOG(WARNING) << "Failed to call StillHasOrphanedPostSplitData: " << res.ToString();
3127
214
    return true;
3128
214
  }
3129
2.35M
  return res.get();
3130
2.35M
}
3131
3132
597k
bool Tablet::ShouldDisableLbMove() {
3133
597k
  auto still_has_parent_data_result = StillHasOrphanedPostSplitData();
3134
597k
  if (
still_has_parent_data_result.ok()597k
) {
3135
597k
    return still_has_parent_data_result.get();
3136
597k
  }
3137
  // If this call failed, one of three things may be true:
3138
  // 1. We are in the middle of a tablet shutdown.
3139
  //
3140
  // In this case, what we report is not of much consequence, as the load balancer shouldn't try to
3141
  // move us anyways. We choose to return false.
3142
  //
3143
  // 2. We are in the middle of a TRUNCATE.
3144
  //
3145
  // In this case, any concurrent attempted LB move should fail before trying to move data,
3146
  // since the RocksDB instances are destroyed. On top of that, we do want to allow the LB to move
3147
  // this tablet after the TRUNCATE completes, so we should return false.
3148
  //
3149
  // 3. We are in the middle of an AlterSchema operation. This is only true for tablets belonging to
3150
  //    colocated tables.
3151
  //
3152
  // In this case, we want to disable tablet moves. We conservatively return true for any failure
3153
  // if the tablet is part of a colocated table.
3154
18.4E
  return metadata_->schema()->has_colocation_id();
3155
597k
}
3156
3157
6
void Tablet::ForceRocksDBCompactInTest() {
3158
6
  CHECK_OK(ForceFullRocksDBCompact());
3159
6
}
3160
3161
139
Status Tablet::ForceFullRocksDBCompact() {
3162
139
  auto scoped_operation = CreateAbortableScopedRWOperation();
3163
139
  RETURN_NOT_OK(scoped_operation);
3164
3165
139
  if (regular_db_) {
3166
139
    RETURN_NOT_OK(docdb::ForceRocksDBCompact(regular_db_.get()));
3167
139
  }
3168
139
  if (intents_db_) {
3169
94
    RETURN_NOT_OK_PREPEND(
3170
94
        intents_db_->Flush(rocksdb::FlushOptions()), "Pre-compaction flush of intents db failed");
3171
94
    RETURN_NOT_OK(docdb::ForceRocksDBCompact(intents_db_.get()));
3172
94
  }
3173
139
  return Status::OK();
3174
139
}
3175
3176
7
std::string Tablet::TEST_DocDBDumpStr(IncludeIntents include_intents) {
3177
7
  if (!regular_db_) 
return ""0
;
3178
3179
7
  if (!include_intents) {
3180
0
    return docdb::DocDBDebugDumpToStr(doc_db().WithoutIntents());
3181
0
  }
3182
3183
7
  return docdb::DocDBDebugDumpToStr(doc_db());
3184
7
}
3185
3186
void Tablet::TEST_DocDBDumpToContainer(
3187
7
    IncludeIntents include_intents, std::unordered_set<std::string>* out) {
3188
7
  if (!regular_db_) 
return0
;
3189
3190
7
  if (!include_intents) {
3191
0
    return docdb::DocDBDebugDumpToContainer(doc_db().WithoutIntents(), out);
3192
0
  }
3193
3194
7
  return docdb::DocDBDebugDumpToContainer(doc_db(), out);
3195
7
}
3196
3197
0
void Tablet::TEST_DocDBDumpToLog(IncludeIntents include_intents) {
3198
0
  if (!regular_db_) {
3199
0
    LOG_WITH_PREFIX(INFO) << "No RocksDB to dump";
3200
0
    return;
3201
0
  }
3202
3203
0
  docdb::DumpRocksDBToLog(regular_db_.get(), StorageDbType::kRegular, LogPrefix());
3204
3205
0
  if (include_intents && intents_db_) {
3206
0
    docdb::DumpRocksDBToLog(intents_db_.get(), StorageDbType::kIntents, LogPrefix());
3207
0
  }
3208
0
}
3209
3210
0
size_t Tablet::TEST_CountRegularDBRecords() {
3211
0
  if (!regular_db_) return 0;
3212
0
  rocksdb::ReadOptions read_opts;
3213
0
  read_opts.query_id = rocksdb::kDefaultQueryId;
3214
0
  docdb::BoundedRocksDbIterator iter(regular_db_.get(), read_opts, &key_bounds_);
3215
3216
0
  size_t result = 0;
3217
0
  for (iter.SeekToFirst(); iter.Valid(); iter.Next()) {
3218
0
    ++result;
3219
0
  }
3220
0
  return result;
3221
0
}
3222
3223
template <class F>
3224
54.5M
auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const {
3225
54.5M
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3226
54.5M
  std::lock_guard<rw_spinlock> lock(component_lock_);
3227
3228
  // In order to get actual stats we would have to wait.
3229
  // This would give us correct stats but would make this request slower.
3230
54.5M
  if (
!scoped_operation.ok()54.5M
|| !regular_db_) {
3231
12.3M
    return default_value;
3232
12.3M
  }
3233
42.1M
  return func();
3234
54.5M
}
tablet.cc:auto yb::tablet::Tablet::GetRegularDbStat<yb::tablet::Tablet::GetCurrentVersionSstFilesSize() const::$_3>(yb::tablet::Tablet::GetCurrentVersionSstFilesSize() const::$_3 const&, decltype(fp()) const&) const
Line
Count
Source
3224
8.07k
auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const {
3225
8.07k
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3226
8.07k
  std::lock_guard<rw_spinlock> lock(component_lock_);
3227
3228
  // In order to get actual stats we would have to wait.
3229
  // This would give us correct stats but would make this request slower.
3230
8.07k
  if (!scoped_operation.ok() || !regular_db_) {
3231
2.94k
    return default_value;
3232
2.94k
  }
3233
5.12k
  return func();
3234
8.07k
}
tablet.cc:auto yb::tablet::Tablet::GetRegularDbStat<yb::tablet::Tablet::GetCurrentVersionSstFilesUncompressedSize() const::$_4>(yb::tablet::Tablet::GetCurrentVersionSstFilesUncompressedSize() const::$_4 const&, decltype(fp()) const&) const
Line
Count
Source
3224
8.07k
auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const {
3225
8.07k
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3226
8.07k
  std::lock_guard<rw_spinlock> lock(component_lock_);
3227
3228
  // In order to get actual stats we would have to wait.
3229
  // This would give us correct stats but would make this request slower.
3230
8.07k
  if (!scoped_operation.ok() || !regular_db_) {
3231
2.94k
    return default_value;
3232
2.94k
  }
3233
5.12k
  return func();
3234
8.07k
}
tablet.cc:auto yb::tablet::Tablet::GetRegularDbStat<yb::tablet::Tablet::GetCurrentVersionSstFilesAllSizes() const::$_5>(yb::tablet::Tablet::GetCurrentVersionSstFilesAllSizes() const::$_5 const&, decltype(fp()) const&) const
Line
Count
Source
3224
2.35M
auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const {
3225
2.35M
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3226
2.35M
  std::lock_guard<rw_spinlock> lock(component_lock_);
3227
3228
  // In order to get actual stats we would have to wait.
3229
  // This would give us correct stats but would make this request slower.
3230
2.35M
  if (!scoped_operation.ok() || 
!regular_db_2.35M
) {
3231
709k
    return default_value;
3232
709k
  }
3233
1.64M
  return func();
3234
2.35M
}
tablet.cc:auto yb::tablet::Tablet::GetRegularDbStat<yb::tablet::Tablet::GetCurrentVersionNumSSTFiles() const::$_6>(yb::tablet::Tablet::GetCurrentVersionNumSSTFiles() const::$_6 const&, decltype(fp()) const&) const
Line
Count
Source
3224
52.1M
auto Tablet::GetRegularDbStat(const F& func, const decltype(func())& default_value) const {
3225
52.1M
  auto scoped_operation = CreateNonAbortableScopedRWOperation();
3226
52.1M
  std::lock_guard<rw_spinlock> lock(component_lock_);
3227
3228
  // In order to get actual stats we would have to wait.
3229
  // This would give us correct stats but would make this request slower.
3230
52.2M
  if (
!scoped_operation.ok()52.1M
|| !regular_db_) {
3231
11.6M
    return default_value;
3232
11.6M
  }
3233
40.5M
  return func();
3234
52.1M
}
3235
3236
8.07k
uint64_t Tablet::GetCurrentVersionSstFilesSize() const {
3237
8.07k
  return GetRegularDbStat([this] {
3238
5.12k
    return regular_db_->GetCurrentVersionSstFilesSize();
3239
5.12k
  }, 0);
3240
8.07k
}
3241
3242
8.07k
uint64_t Tablet::GetCurrentVersionSstFilesUncompressedSize() const {
3243
8.07k
  return GetRegularDbStat([this] {
3244
5.12k
    return regular_db_->GetCurrentVersionSstFilesUncompressedSize();
3245
5.12k
  }, 0);
3246
8.07k
}
3247
3248
2.35M
std::pair<uint64_t, uint64_t> Tablet::GetCurrentVersionSstFilesAllSizes() const {
3249
2.35M
  return GetRegularDbStat([this] {
3250
1.64M
    return regular_db_->GetCurrentVersionSstFilesAllSizes();
3251
1.64M
  }, std::pair<uint64_t, uint64_t>(0, 0));
3252
2.35M
}
3253
3254
52.1M
uint64_t Tablet::GetCurrentVersionNumSSTFiles() const {
3255
52.1M
  return GetRegularDbStat([this] {
3256
40.5M
    return regular_db_->GetCurrentVersionNumSSTFiles();
3257
40.5M
  }, 0);
3258
52.1M
}
3259
3260
4.39k
std::pair<int, int> Tablet::GetNumMemtables() const {
3261
4.39k
  int intents_num_memtables = 0;
3262
4.39k
  int regular_num_memtables = 0;
3263
3264
4.39k
  {
3265
4.39k
    auto scoped_operation = CreateNonAbortableScopedRWOperation();
3266
4.39k
    if (!scoped_operation.ok()) {
3267
0
      return std::make_pair(0, 0);
3268
0
    }
3269
4.39k
    std::lock_guard<rw_spinlock> lock(component_lock_);
3270
4.39k
    if (intents_db_) {
3271
      // NOTE: 1 is added on behalf of cfd->mem().
3272
1.47k
      intents_num_memtables = 1 + intents_db_->GetCfdImmNumNotFlushed();
3273
1.47k
    }
3274
4.39k
    if (regular_db_) {
3275
      // NOTE: 1 is added on behalf of cfd->mem().
3276
1.88k
      regular_num_memtables = 1 + regular_db_->GetCfdImmNumNotFlushed();
3277
1.88k
    }
3278
4.39k
  }
3279
3280
0
  return std::make_pair(intents_num_memtables, regular_num_memtables);
3281
4.39k
}
3282
3283
// ------------------------------------------------------------------------------------------------
3284
3285
Result<TransactionOperationContext> Tablet::CreateTransactionOperationContext(
3286
    const TransactionMetadataPB& transaction_metadata,
3287
    bool is_ysql_catalog_table,
3288
13.8M
    const SubTransactionMetadataPB* subtransaction_metadata) const {
3289
13.8M
  if (!txns_enabled_)
3290
177k
    return TransactionOperationContext();
3291
3292
13.7M
  if (transaction_metadata.has_transaction_id()) {
3293
3.44M
    Result<TransactionId> txn_id = FullyDecodeTransactionId(
3294
3.44M
        transaction_metadata.transaction_id());
3295
3.44M
    RETURN_NOT_OK(txn_id);
3296
3.44M
    return CreateTransactionOperationContext(
3297
3.44M
        boost::make_optional(*txn_id), is_ysql_catalog_table, subtransaction_metadata);
3298
10.2M
  } else {
3299
10.2M
    return CreateTransactionOperationContext(
3300
10.2M
        /* transaction_id */ boost::none, is_ysql_catalog_table, subtransaction_metadata);
3301
10.2M
  }
3302
13.7M
}
3303
3304
Result<TransactionOperationContext> Tablet::CreateTransactionOperationContext(
3305
    const boost::optional<TransactionId>& transaction_id,
3306
    bool is_ysql_catalog_table,
3307
14.1M
    const SubTransactionMetadataPB* subtransaction_metadata) const {
3308
14.1M
  if (!txns_enabled_) {
3309
1.24k
    return TransactionOperationContext();
3310
1.24k
  }
3311
3312
14.1M
  const TransactionId* txn_id = nullptr;
3313
3314
14.1M
  if (transaction_id.is_initialized()) {
3315
3.44M
    txn_id = transaction_id.get_ptr();
3316
10.6M
  } else if (metadata_->schema()->table_properties().is_transactional() || 
is_ysql_catalog_table10.2M
) {
3317
    // deadbeef-dead-beef-dead-beef00000075
3318
1.50M
    static const TransactionId kArbitraryTxnIdForNonTxnReads(
3319
1.50M
        17275436393656397278ULL, 8430738506459819486ULL);
3320
    // We still need context with transaction participant in order to resolve intents during
3321
    // possible reads.
3322
1.50M
    txn_id = &kArbitraryTxnIdForNonTxnReads;
3323
9.17M
  } else {
3324
9.17M
    return TransactionOperationContext();
3325
9.17M
  }
3326
3327
4.95M
  if (!transaction_participant_) {
3328
0
    return STATUS(IllegalState, "Transactional operation for non transactional tablet");
3329
0
  }
3330
3331
4.95M
  if (!subtransaction_metadata) {
3332
400k
    return TransactionOperationContext(*txn_id, transaction_participant());
3333
400k
  }
3334
3335
4.55M
  auto subtxn = VERIFY_RESULT(SubTransactionMetadata::FromPB(*subtransaction_metadata));
3336
0
  return TransactionOperationContext(*txn_id, std::move(subtxn), transaction_participant());
3337
4.55M
}
3338
3339
Status Tablet::CreateReadIntents(
3340
    const TransactionMetadataPB& transaction_metadata,
3341
    const SubTransactionMetadataPB& subtransaction_metadata,
3342
    const google::protobuf::RepeatedPtrField<QLReadRequestPB>& ql_batch,
3343
    const google::protobuf::RepeatedPtrField<PgsqlReadRequestPB>& pgsql_batch,
3344
314k
    docdb::KeyValueWriteBatchPB* write_batch) {
3345
314k
  auto txn_op_ctx = VERIFY_RESULT(CreateTransactionOperationContext(
3346
314k
      transaction_metadata,
3347
314k
      /* is_ysql_catalog_table */ pgsql_batch.size() > 0 && is_sys_catalog_,
3348
314k
      &subtransaction_metadata));
3349
3350
0
  for (const auto& ql_read : ql_batch) {
3351
0
    docdb::QLReadOperation doc_op(ql_read, txn_op_ctx);
3352
0
    RETURN_NOT_OK(doc_op.GetIntents(*GetSchema(), write_batch));
3353
0
  }
3354
3355
2.76M
  
for (const auto& pgsql_read : pgsql_batch)314k
{
3356
2.76M
    docdb::PgsqlReadOperation doc_op(pgsql_read, txn_op_ctx);
3357
2.76M
    RETURN_NOT_OK(doc_op.GetIntents(*GetSchema(pgsql_read.table_id()), write_batch));
3358
2.76M
  }
3359
3360
314k
  return Status::OK();
3361
314k
}
3362
3363
5.53M
bool Tablet::ShouldApplyWrite() {
3364
5.53M
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
3365
5.53M
  if (!scoped_read_operation.ok()) {
3366
0
    return false;
3367
0
  }
3368
3369
5.53M
  return !regular_db_->NeedsDelay();
3370
5.53M
}
3371
3372
1.35M
Result<IsolationLevel> Tablet::GetIsolationLevel(const TransactionMetadataPB& transaction) {
3373
1.35M
  if (transaction.has_isolation()) {
3374
738k
    return transaction.isolation();
3375
738k
  }
3376
619k
  return 
VERIFY_RESULT618k
(transaction_participant_->PrepareMetadata(transaction)).isolation618k
;
3377
619k
}
3378
3379
Result<RaftGroupMetadataPtr> Tablet::CreateSubtablet(
3380
    const TabletId& tablet_id, const Partition& partition, const docdb::KeyBounds& key_bounds,
3381
141
    const yb::OpId& split_op_id, const HybridTime& split_op_hybrid_time) {
3382
141
  auto scoped_read_operation = CreateNonAbortableScopedRWOperation();
3383
141
  RETURN_NOT_OK(scoped_read_operation);
3384
3385
141
  RETURN_NOT_OK(Flush(FlushMode::kSync));
3386
3387
141
  auto metadata = VERIFY_RESULT(metadata_->CreateSubtabletMetadata(
3388
141
      tablet_id, partition, key_bounds.lower.ToStringBuffer(), key_bounds.upper.ToStringBuffer()));
3389
3390
141
  RETURN_NOT_OK(snapshots_->CreateCheckpoint(
3391
141
      metadata->rocksdb_dir(), CreateIntentsCheckpointIn::kSubDir));
3392
3393
  // We want flushed frontier to cover split_op_id, so during bootstrap of after-split tablets
3394
  // we don't replay split operation.
3395
141
  docdb::ConsensusFrontier frontier;
3396
141
  frontier.set_op_id(split_op_id);
3397
141
  frontier.set_hybrid_time(split_op_hybrid_time);
3398
3399
141
  struct RocksDbDirWithType {
3400
141
    std::string db_dir;
3401
141
    docdb::StorageDbType db_type;
3402
141
  };
3403
141
  boost::container::static_vector<RocksDbDirWithType, 2> subtablet_rocksdbs(
3404
141
      {{ metadata->rocksdb_dir(), docdb::StorageDbType::kRegular }});
3405
141
  if (intents_db_) {
3406
95
    subtablet_rocksdbs.push_back(
3407
95
        { metadata->intents_rocksdb_dir(), docdb::StorageDbType::kIntents });
3408
95
  }
3409
236
  for (auto rocksdb : subtablet_rocksdbs) {
3410
236
    rocksdb::Options rocksdb_options;
3411
236
    docdb::InitRocksDBOptions(
3412
236
        &rocksdb_options, MakeTabletLogPrefix(tablet_id, log_prefix_suffix_, rocksdb.db_type),
3413
236
        /* statistics */ nullptr, tablet_options_);
3414
236
    rocksdb_options.create_if_missing = false;
3415
    // Disable background compactions, we only need to update flushed frontier.
3416
236
    rocksdb_options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone;
3417
236
    std::unique_ptr<rocksdb::DB> db =
3418
236
        VERIFY_RESULT(rocksdb::DB::Open(rocksdb_options, rocksdb.db_dir));
3419
236
    RETURN_NOT_OK(
3420
236
        db->ModifyFlushedFrontier(frontier.Clone(), rocksdb::FrontierModificationMode::kUpdate));
3421
236
  }
3422
141
  return metadata;
3423
141
}
3424
3425
0
Result<int64_t> Tablet::CountIntents() {
3426
0
  auto pending_op = CreateNonAbortableScopedRWOperation();
3427
0
  RETURN_NOT_OK(pending_op);
3428
3429
0
  if (!intents_db_) {
3430
0
    return 0;
3431
0
  }
3432
0
  rocksdb::ReadOptions read_options;
3433
0
  auto intent_iter = std::unique_ptr<rocksdb::Iterator>(
3434
0
      intents_db_->NewIterator(read_options));
3435
0
  int64_t num_intents = 0;
3436
0
  intent_iter->SeekToFirst();
3437
0
  while (intent_iter->Valid()) {
3438
0
    num_intents++;
3439
0
    intent_iter->Next();
3440
0
  }
3441
0
  return num_intents;
3442
0
}
3443
3444
225k
void Tablet::ListenNumSSTFilesChanged(std::function<void()> listener) {
3445
225k
  std::lock_guard<std::mutex> lock(num_sst_files_changed_listener_mutex_);
3446
225k
  bool has_new_listener = listener != nullptr;
3447
225k
  bool has_old_listener = num_sst_files_changed_listener_ != nullptr;
3448
225k
  
LOG_IF_WITH_PREFIX63
(DFATAL, has_new_listener == has_old_listener)
3449
63
      << __func__ << " in wrong state, has_old_listener: " << has_old_listener;
3450
225k
  num_sst_files_changed_listener_ = std::move(listener);
3451
225k
}
3452
3453
void Tablet::InitRocksDBOptions(
3454
    rocksdb::Options* options, const std::string& log_prefix,
3455
451k
    rocksdb::BlockBasedTableOptions table_options) {
3456
451k
  docdb::InitRocksDBOptions(
3457
451k
      options, log_prefix, regulardb_statistics_, tablet_options_, std::move(table_options));
3458
451k
}
3459
3460
3.59k
rocksdb::Env& Tablet::rocksdb_env() const {
3461
3.59k
  return *tablet_options_.rocksdb_env;
3462
3.59k
}
3463
3464
3.93M
const std::string& Tablet::tablet_id() const {
3465
3.93M
  return metadata_->raft_group_id();
3466
3.93M
}
3467
3468
144
Result<std::string> Tablet::GetEncodedMiddleSplitKey() const {
3469
144
  auto error_prefix = [this]() {
3470
0
    return Format(
3471
0
        "Failed to detect middle key for tablet $0 (key_bounds: $1 - $2)",
3472
0
        tablet_id(),
3473
0
        Slice(key_bounds_.lower).ToDebugHexString(),
3474
0
        Slice(key_bounds_.upper).ToDebugHexString());
3475
0
  };
3476
3477
  // TODO(tsplit): should take key_bounds_ into account.
3478
144
  auto middle_key = 
VERIFY_RESULT143
(regular_db_->GetMiddleKey());143
3479
3480
  // In some rare cases middle key can point to a special internal record which is not visible
3481
  // for a user, but tablet splitting routines expect the specific structure for partition keys
3482
  // that does not match the struct of the internally used records. Moreover, it is expected
3483
  // to have two child tablets with alive user records after the splitting, but the split
3484
  // by the internal record will lead to a case when one tablet will consist of internal records
3485
  // only and these records will be compacted out at some point making an empty tablet.
3486
143
  if (PREDICT_FALSE(docdb::IsInternalRecordKeyType(docdb::DecodeValueType(middle_key[0])))) {
3487
0
    return STATUS_FORMAT(
3488
0
        IllegalState, "$0: got internal record \"$1\"",
3489
0
        error_prefix(), Slice(middle_key).ToDebugHexString());
3490
0
  }
3491
3492
143
  const auto key_part = metadata()->partition_schema()->IsHashPartitioning()
3493
143
                            ? docdb::DocKeyPart::kUpToHashCode
3494
143
                            : 
docdb::DocKeyPart::kWholeDocKey0
;
3495
143
  const auto split_key_size = VERIFY_RESULT(DocKey::EncodedSize(middle_key, key_part));
3496
143
  if (PREDICT_FALSE(split_key_size == 0)) {
3497
    // Using this verification just to have a more sensible message. The below verification will
3498
    // not pass with split_key_size == 0 also, but its message is not accurate enough. This failure
3499
    // may happen when a key cannot be decoded with key_part inside DocKey::EncodedSize and the key
3500
    // still valid for any reason (e.g. gettining non-hash key for hash partitioning).
3501
0
    return STATUS_FORMAT(
3502
0
        IllegalState, "$0: got unexpected key \"$1\"",
3503
0
        error_prefix(), Slice(middle_key).ToDebugHexString());
3504
0
  }
3505
3506
143
  middle_key.resize(split_key_size);
3507
143
  const Slice middle_key_slice(middle_key);
3508
143
  if (middle_key_slice.compare(key_bounds_.lower) <= 0 ||
3509
143
      (!key_bounds_.upper.empty() && 
middle_key_slice.compare(key_bounds_.upper) >= 075
)) {
3510
0
    return STATUS_FORMAT(
3511
0
        IllegalState,
3512
0
        "$0: got \"$1\". This can happen if post-split tablet wasn't fully compacted after split",
3513
0
        error_prefix(), middle_key_slice.ToDebugHexString());
3514
0
  }
3515
143
  return middle_key;
3516
143
}
3517
3518
Status Tablet::TriggerPostSplitCompactionIfNeeded(
3519
142k
    std::function<std::unique_ptr<ThreadPoolToken>()> get_token_for_compaction) {
3520
142k
  if (post_split_compaction_task_pool_token_) {
3521
0
    return STATUS(
3522
0
        IllegalState, "Already triggered post split compaction for this tablet instance.");
3523
0
  }
3524
142k
  if (VERIFY_RESULT(StillHasOrphanedPostSplitData())) {
3525
132
    post_split_compaction_task_pool_token_ = get_token_for_compaction();
3526
132
    return post_split_compaction_task_pool_token_->SubmitFunc(
3527
132
        std::bind(&Tablet::TriggerPostSplitCompactionSync, this));
3528
132
  }
3529
142k
  return Status::OK();
3530
142k
}
3531
3532
133
void Tablet::TriggerPostSplitCompactionSync() {
3533
133
  TEST_PAUSE_IF_FLAG(TEST_pause_before_post_split_compaction);
3534
133
  WARN_WITH_PREFIX_NOT_OK(
3535
133
      ForceFullRocksDBCompact(), LogPrefix() + "Failed to compact post-split tablet.");
3536
133
}
3537
3538
3
Status Tablet::VerifyDataIntegrity() {
3539
3
  LOG_WITH_PREFIX(INFO) << "Beginning data integrity checks on this tablet";
3540
3541
  // Verify regular db.
3542
3
  if (regular_db_) {
3543
3
    const auto& db_dir = metadata()->rocksdb_dir();
3544
3
    RETURN_NOT_OK(OpenDbAndCheckIntegrity(db_dir));
3545
3
  }
3546
3547
  // Verify intents db.
3548
1
  if (intents_db_) {
3549
0
    const auto& db_dir = metadata()->intents_rocksdb_dir();
3550
0
    RETURN_NOT_OK(OpenDbAndCheckIntegrity(db_dir));
3551
0
  }
3552
3553
1
  return Status::OK();
3554
1
}
3555
3556
3
Status Tablet::OpenDbAndCheckIntegrity(const std::string& db_dir) {
3557
  // Similar to ldb's CheckConsistency, we open db as read-only with paranoid checks on.
3558
  // If any corruption is detected then the open will fail with a Corruption status.
3559
3
  rocksdb::Options db_opts;
3560
3
  InitRocksDBOptions(&db_opts, LogPrefix());
3561
3
  db_opts.paranoid_checks = true;
3562
3563
3
  std::unique_ptr<rocksdb::DB> db;
3564
3
  rocksdb::DB* db_raw = nullptr;
3565
3
  rocksdb::Status st = rocksdb::DB::OpenForReadOnly(db_opts, db_dir, &db_raw);
3566
3
  if (db_raw != nullptr) {
3567
1
    db.reset(db_raw);
3568
1
  }
3569
3
  if (!st.ok()) {
3570
2
    if (st.IsCorruption()) {
3571
2
      LOG_WITH_PREFIX(WARNING) << "Detected rocksdb data corruption: " << st;
3572
      // TODO: should we bump metric here or in top-level validation or both?
3573
2
      metrics()->tablet_data_corruptions->Increment();
3574
2
      return st;
3575
2
    }
3576
3577
0
    LOG_WITH_PREFIX(WARNING) << "Failed to open read-only RocksDB in directory " << db_dir
3578
0
                             << ": " << st;
3579
0
    return Status::OK();
3580
2
  }
3581
3582
  // TODO: we can add more checks here to verify block contents/checksums
3583
3584
1
  return Status::OK();
3585
3
}
3586
3587
68
void Tablet::SplitDone() {
3588
68
  {
3589
68
    std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
3590
68
    if (completed_split_operation_filter_) {
3591
0
      LOG_WITH_PREFIX(DFATAL) << "Already have split operation filter";
3592
0
      return;
3593
0
    }
3594
3595
68
    completed_split_operation_filter_ = MakeFunctorOperationFilter(
3596
68
        [this](const OpId& op_id, consensus::OperationType op_type) -> Status {
3597
25
          if (SplitOperation::ShouldAllowOpAfterSplitTablet(op_type)) {
3598
4
            return Status::OK();
3599
4
          }
3600
3601
21
          auto children = metadata_->split_child_tablet_ids();
3602
21
          return SplitOperation::RejectionStatus(OpId(), op_id, op_type, children[0], children[1]);
3603
25
        });
3604
68
    operation_filters_.push_back(*completed_split_operation_filter_);
3605
3606
68
    completed_split_log_anchor_ = std::make_unique<log::LogAnchor>();
3607
3608
68
    log_anchor_registry_->Register(
3609
68
        metadata_->split_op_id().index, "Splitted tablet", completed_split_log_anchor_.get());
3610
68
  }
3611
68
}
3612
3613
150k
void Tablet::SyncRestoringOperationFilter(ResetSplit reset_split) {
3614
150k
  std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
3615
3616
150k
  if (reset_split) {
3617
30
    if (completed_split_log_anchor_) {
3618
0
      WARN_NOT_OK(log_anchor_registry_->Unregister(completed_split_log_anchor_.get()),
3619
0
                  "Unregister split anchor");
3620
0
      completed_split_log_anchor_ = nullptr;
3621
0
    }
3622
3623
30
    if (completed_split_operation_filter_) {
3624
0
      UnregisterOperationFilterUnlocked(completed_split_operation_filter_.get());
3625
0
      completed_split_operation_filter_ = nullptr;
3626
0
    }
3627
30
  }
3628
3629
150k
  if (metadata_->has_active_restoration()) {
3630
30
    if (restoring_operation_filter_) {
3631
0
      return;
3632
0
    }
3633
30
    restoring_operation_filter_ = MakeFunctorOperationFilter(
3634
30
        [](const OpId& op_id, consensus::OperationType op_type) -> Status {
3635
15
      if (SnapshotOperation::ShouldAllowOpDuringRestore(op_type)) {
3636
15
        return Status::OK();
3637
15
      }
3638
3639
0
      return SnapshotOperation::RejectionStatus(op_id, op_type);
3640
15
    });
3641
30
    operation_filters_.push_back(*restoring_operation_filter_);
3642
150k
  } else {
3643
150k
    if (
!restoring_operation_filter_150k
) {
3644
150k
      return;
3645
150k
    }
3646
3647
18.4E
    UnregisterOperationFilterUnlocked(restoring_operation_filter_.get());
3648
18.4E
    restoring_operation_filter_ = nullptr;
3649
18.4E
  }
3650
150k
}
3651
3652
30
Status Tablet::RestoreStarted(const TxnSnapshotRestorationId& restoration_id) {
3653
30
  metadata_->RegisterRestoration(restoration_id);
3654
30
  RETURN_NOT_OK(metadata_->Flush());
3655
3656
30
  SyncRestoringOperationFilter(ResetSplit::kTrue);
3657
3658
30
  return Status::OK();
3659
30
}
3660
3661
Status Tablet::RestoreFinished(
3662
30
    const TxnSnapshotRestorationId& restoration_id, HybridTime restoration_hybrid_time) {
3663
30
  metadata_->UnregisterRestoration(restoration_id);
3664
30
  if (restoration_hybrid_time) {
3665
30
    metadata_->SetRestorationHybridTime(restoration_hybrid_time);
3666
30
    if (transaction_participant_ && 
FLAGS_consistent_restore27
) {
3667
0
      transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time);
3668
0
    }
3669
30
  }
3670
30
  RETURN_NOT_OK(metadata_->Flush());
3671
3672
30
  SyncRestoringOperationFilter(ResetSplit::kFalse);
3673
3674
30
  return Status::OK();
3675
30
}
3676
3677
2.71k
Status Tablet::CheckRestorations(const RestorationCompleteTimeMap& restoration_complete_time) {
3678
2.71k
  auto restoration_hybrid_time = metadata_->CheckCompleteRestorations(restoration_complete_time);
3679
2.71k
  if (restoration_hybrid_time != HybridTime::kMin
3680
2.71k
      && 
transaction_participant_3
3681
2.71k
      && 
FLAGS_consistent_restore3
) {
3682
0
    transaction_participant_->IgnoreAllTransactionsStartedBefore(restoration_hybrid_time);
3683
0
  }
3684
3685
  // We cannot do it in a single shot, because should update transaction participant before
3686
  // removing active transactions.
3687
2.71k
  if (!metadata_->CleanupRestorations(restoration_complete_time)) {
3688
2.71k
    return Status::OK();
3689
2.71k
  }
3690
3691
3
  RETURN_NOT_OK(metadata_->Flush());
3692
3
  SyncRestoringOperationFilter(ResetSplit::kFalse);
3693
3694
3
  return Status::OK();
3695
3
}
3696
3697
5.11M
Status Tablet::CheckOperationAllowed(const OpId& op_id, consensus::OperationType op_type) {
3698
5.11M
  std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
3699
5.11M
  for (const auto& filter : operation_filters_) {
3700
853
    RETURN_NOT_OK(filter.CheckOperationAllowed(op_id, op_type));
3701
853
  }
3702
3703
5.11M
  return Status::OK();
3704
5.11M
}
3705
3706
1.87k
void Tablet::RegisterOperationFilter(OperationFilter* filter) {
3707
1.87k
  std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
3708
1.87k
  operation_filters_.push_back(*filter);
3709
1.87k
}
3710
3711
1.87k
void Tablet::UnregisterOperationFilter(OperationFilter* filter) {
3712
1.87k
  std::lock_guard<simple_spinlock> lock(operation_filters_mutex_);
3713
1.87k
  UnregisterOperationFilterUnlocked(filter);
3714
1.87k
}
3715
3716
1.94k
void Tablet::UnregisterOperationFilterUnlocked(OperationFilter* filter) {
3717
1.94k
  operation_filters_.erase(operation_filters_.iterator_to(*filter));
3718
1.94k
}
3719
3720
13.9M
SchemaPtr Tablet::GetSchema(const std::string& table_id) const {
3721
13.9M
  if (table_id.empty()) {
3722
7.25M
    return metadata_->schema();
3723
7.25M
  }
3724
6.70M
  auto table_info = CHECK_RESULT(metadata_->GetTableInfo(table_id));
3725
6.70M
  return SchemaPtr(table_info, table_info->schema.get());
3726
13.9M
}
3727
3728
72.9k
Schema Tablet::GetKeySchema(const std::string& table_id) const {
3729
72.9k
  if (table_id.empty()) {
3730
2
    return *key_schema_;
3731
2
  }
3732
72.9k
  auto table_info = CHECK_RESULT(metadata_->GetTableInfo(table_id));
3733
72.9k
  return table_info->schema->CreateKeyProjection();
3734
72.9k
}
3735
3736
// ------------------------------------------------------------------------------------------------
3737
3738
Result<ScopedReadOperation> ScopedReadOperation::Create(
3739
    AbstractTablet* tablet,
3740
    RequireLease require_lease,
3741
10.0M
    ReadHybridTime read_time) {
3742
10.0M
  if (!read_time) {
3743
6.32k
    read_time = ReadHybridTime::SingleTime(VERIFY_RESULT(tablet->SafeTime(require_lease)));
3744
6.32k
  }
3745
10.0M
  auto* retention_policy = tablet->RetentionPolicy();
3746
10.0M
  if (retention_policy) {
3747
9.78M
    RETURN_NOT_OK(retention_policy->RegisterReaderTimestamp(read_time.read));
3748
9.78M
  }
3749
10.0M
  return ScopedReadOperation(tablet, read_time);
3750
10.0M
}
3751
3752
ScopedReadOperation::ScopedReadOperation(
3753
    AbstractTablet* tablet, const ReadHybridTime& read_time)
3754
10.0M
    : tablet_(tablet), read_time_(read_time) {
3755
10.0M
}
3756
3757
42.0M
ScopedReadOperation::~ScopedReadOperation() {
3758
42.0M
  Reset();
3759
42.0M
}
3760
3761
9.44M
void ScopedReadOperation::operator=(ScopedReadOperation&& rhs) {
3762
9.44M
  Reset();
3763
9.44M
  tablet_ = rhs.tablet_;
3764
9.44M
  read_time_ = rhs.read_time_;
3765
9.44M
  rhs.tablet_ = nullptr;
3766
9.44M
}
3767
3768
51.5M
void ScopedReadOperation::Reset() {
3769
51.5M
  if (tablet_) {
3770
10.0M
    auto* retention_policy = tablet_->RetentionPolicy();
3771
10.0M
    if (retention_policy) {
3772
9.72M
      retention_policy->UnregisterReaderTimestamp(read_time_.read);
3773
9.72M
    }
3774
10.0M
    tablet_ = nullptr;
3775
10.0M
  }
3776
51.5M
}
3777
3778
}  // namespace tablet
3779
}  // namespace yb