YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/table/block_based_table_builder.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/table/block_based_table_builder.h"
25
26
#include <assert.h>
27
#include <inttypes.h>
28
#include <stdio.h>
29
30
#include <map>
31
#include <memory>
32
#include <string>
33
#include <unordered_map>
34
#include <utility>
35
36
#include <glog/logging.h>
37
38
#include "yb/gutil/macros.h"
39
40
#include "yb/rocksdb/cache.h"
41
#include "yb/rocksdb/comparator.h"
42
#include "yb/rocksdb/db/dbformat.h"
43
#include "yb/rocksdb/env.h"
44
#include "yb/rocksdb/filter_policy.h"
45
#include "yb/rocksdb/flush_block_policy.h"
46
#include "yb/rocksdb/table.h"
47
#include "yb/rocksdb/table/block.h"
48
#include "yb/rocksdb/table/block_based_filter_block.h"
49
#include "yb/rocksdb/table/block_based_table_factory.h"
50
#include "yb/rocksdb/table/block_based_table_internal.h"
51
#include "yb/rocksdb/table/block_builder.h"
52
#include "yb/rocksdb/table/filter_block.h"
53
#include "yb/rocksdb/table/fixed_size_filter_block.h"
54
#include "yb/rocksdb/table/format.h"
55
#include "yb/rocksdb/table/full_filter_block.h"
56
#include "yb/rocksdb/table/index_builder.h"
57
#include "yb/rocksdb/table/meta_blocks.h"
58
#include "yb/rocksdb/table/table_builder.h"
59
#include "yb/rocksdb/util/coding.h"
60
#include "yb/rocksdb/util/compression.h"
61
#include "yb/rocksdb/util/crc32c.h"
62
#include "yb/rocksdb/util/file_reader_writer.h"
63
#include "yb/rocksdb/util/stop_watch.h"
64
#include "yb/rocksdb/util/xxhash.h"
65
66
#include "yb/util/mem_tracker.h"
67
#include "yb/util/status_log.h"
68
69
namespace rocksdb {
70
71
extern const char kHashIndexPrefixesBlock[];
72
extern const char kHashIndexPrefixesMetadataBlock[];
73
74
typedef FilterPolicy::FilterType FilterType;
75
76
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
77
namespace {
78
79
65.6k
FilterType GetFilterType(const BlockBasedTableOptions& table_opt) {
80
65.6k
  std::shared_ptr<const FilterPolicy> policy(table_opt.filter_policy);
81
65.6k
  return policy != nullptr ? 
policy->GetFilterType()11.7k
:
FilterType::kNoFilter53.9k
;
82
65.6k
}
83
84
// Create a filter builder based on its type.
85
FilterBlockBuilder* CreateFilterBlockBuilder(const ImmutableCFOptions& opt,
86
65.3k
    const BlockBasedTableOptions& table_opt, FilterType filter_type) {
87
65.3k
  switch (filter_type) {
88
962
    case FilterType::kBlockBasedFilter:
89
962
      return new BlockBasedFilterBlockBuilder(opt.prefix_extractor, table_opt);
90
7.10k
    case FilterType::kFixedSizeFilter:
91
7.10k
      return new FixedSizeFilterBlockBuilder(opt.prefix_extractor, table_opt);
92
3.45k
    case FilterType::kFullFilter:
93
3.45k
      return new FullFilterBlockBuilder(opt.prefix_extractor,
94
3.45k
                                        table_opt.whole_key_filtering,
95
3.45k
                                        table_opt.filter_policy->GetFilterBitsBuilder());
96
53.8k
    case FilterType::kNoFilter:
97
53.8k
      return nullptr;
98
65.3k
  }
99
0
  RLOG(InfoLogLevel::FATAL_LEVEL, opt.info_log, "Corrupted filter_type: %d", filter_type);
100
0
  assert(false);
101
0
  return nullptr;
102
65.3k
}
103
104
2.03M
bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
105
  // Check to see if compressed less than 12.5%
106
2.03M
  return compressed_size < raw_size - (raw_size / 8u);
107
2.03M
}
108
109
// format_version is the block format as defined in include/rocksdb/table.h
110
Slice CompressBlock(const Slice& raw,
111
                    const CompressionOptions& compression_options,
112
                    CompressionType* type, uint32_t format_version,
113
2.95M
                    std::string* compressed_output) {
114
2.95M
  if (*type == kNoCompression) {
115
918k
    return raw;
116
918k
  }
117
118
  // Will return compressed block contents if (1) the compression method is
119
  // supported in this platform and (2) the compression rate is "good enough".
120
2.03M
  switch (*type) {
121
1.98M
    case kSnappyCompression:
122
1.98M
      if (Snappy_Compress(compression_options, raw.cdata(), raw.size(),
123
1.98M
                          compressed_output) &&
124
1.98M
          GoodCompressionRatio(compressed_output->size(), raw.size())) {
125
995k
        return *compressed_output;
126
995k
      }
127
985k
      break;  // fall back to no compression.
128
985k
    case kZlibCompression:
129
18.7k
      if (Zlib_Compress(
130
18.7k
              compression_options,
131
18.7k
              GetCompressFormatForVersion(kZlibCompression, format_version),
132
18.7k
              raw.cdata(), raw.size(), compressed_output) &&
133
18.7k
          
GoodCompressionRatio(compressed_output->size(), raw.size())18.6k
) {
134
15.9k
        return *compressed_output;
135
15.9k
      }
136
2.79k
      break;  // fall back to no compression.
137
2.79k
    case kBZip2Compression:
138
3
      if (BZip2_Compress(
139
3
              compression_options,
140
3
              GetCompressFormatForVersion(kBZip2Compression, format_version),
141
3
              raw.cdata(), raw.size(), compressed_output) &&
142
3
          
GoodCompressionRatio(compressed_output->size(), raw.size())0
) {
143
0
        return *compressed_output;
144
0
      }
145
3
      break;  // fall back to no compression.
146
19.5k
    case kLZ4Compression:
147
19.5k
      if (LZ4_Compress(
148
19.5k
              compression_options,
149
19.5k
              GetCompressFormatForVersion(kLZ4Compression, format_version),
150
19.5k
              raw.cdata(), raw.size(), compressed_output) &&
151
19.5k
          GoodCompressionRatio(compressed_output->size(), raw.size())) {
152
13.4k
        return *compressed_output;
153
13.4k
      }
154
6.06k
      break;  // fall back to no compression.
155
17.8k
    case kLZ4HCCompression:
156
17.8k
      if (LZ4HC_Compress(
157
17.8k
              compression_options,
158
17.8k
              GetCompressFormatForVersion(kLZ4HCCompression, format_version),
159
17.8k
              raw.cdata(), raw.size(), compressed_output) &&
160
17.8k
          GoodCompressionRatio(compressed_output->size(), raw.size())) {
161
13.4k
        return *compressed_output;
162
13.4k
      }
163
4.38k
      break;     // fall back to no compression.
164
4.38k
    case kZSTDNotFinalCompression:
165
3
      if (ZSTD_Compress(compression_options, raw.cdata(), raw.size(),
166
3
                        compressed_output) &&
167
3
          
GoodCompressionRatio(compressed_output->size(), raw.size())0
) {
168
0
        return *compressed_output;
169
0
      }
170
3
      break;     // fall back to no compression.
171
3
    
default: {}0
// Do not recognize this compression type
172
2.03M
  }
173
174
  // Compression method is not supported, or not good compression ratio, so just
175
  // fall back to uncompressed form.
176
999k
  *type = kNoCompression;
177
999k
  return raw;
178
2.03M
}
179
180
}  // namespace
181
182
// kBlockBasedTableMagicNumber was picked by running
183
//    echo rocksdb.table.block_based | sha1sum
184
// and taking the leading 64 bits.
185
// Please note that kBlockBasedTableMagicNumber may also be accessed by other
186
// .cc files
187
// for that reason we declare it extern in the header but to get the space
188
// allocated
189
// it must be not extern in one place.
190
const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
191
// We also support reading and writing legacy block based table format (for
192
// backwards compatibility)
193
const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
194
195
// A collector that collects properties of interest to block-based table.
196
// For now this class looks heavy-weight since we only write one additional
197
// property.
198
// But in the foreseeable future, we will add more and more properties that are
199
// specific to block-based table.
200
class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
201
    : public IntTblPropCollector {
202
 public:
203
  explicit BlockBasedTablePropertiesCollector(
204
      BlockBasedTableBuilder::Rep* rep, IndexType index_type, bool whole_key_filtering,
205
      bool prefix_filtering, const KeyValueEncodingFormat key_value_encoding_format)
206
      : rep_(rep),
207
        index_type_(index_type),
208
        whole_key_filtering_(whole_key_filtering),
209
        prefix_filtering_(prefix_filtering),
210
65.5k
        key_value_encoding_format_(key_value_encoding_format) {}
211
212
  virtual Status InternalAdd(const Slice& key, const Slice& value,
213
179M
                             uint64_t file_size) override {
214
    // Intentionally left blank. Have no interest in collecting stats for
215
    // individual key/value pairs.
216
179M
    return Status::OK();
217
179M
  }
218
219
  Status Finish(UserCollectedProperties* properties) override;
220
221
  // The name of the properties collector can be used for debugging purpose.
222
0
  const char* Name() const override {
223
0
    return "BlockBasedTablePropertiesCollector";
224
0
  }
225
226
55.8k
  UserCollectedProperties GetReadableProperties() const override {
227
    // Intentionally left blank.
228
55.8k
    return UserCollectedProperties();
229
55.8k
  }
230
231
 private:
232
  BlockBasedTableBuilder::Rep* rep_;
233
  IndexType index_type_;
234
  bool whole_key_filtering_;
235
  bool prefix_filtering_;
236
  KeyValueEncodingFormat key_value_encoding_format_;
237
};
238
239
// Originally following data was stored in BlockBasedTableBuilder::Rep and related to a single SST
240
// file. Since SST file is now split into two types of files - data file and metadata file,
241
// all file-related data was moved into dedicated structure for each file.
242
struct BlockBasedTableBuilder::FileWriterWithOffsetAndCachePrefix {
243
  // Pointer to file writer. BlockBasedTableBuilder constructor accepts raw pointers to
244
  // WritableFileWriter and it is responsibility of client code to delete writer instance after
245
  // usage.
246
  WritableFileWriter* writer = nullptr;
247
248
  // Current offset.
249
  uint64_t offset = 0;
250
251
  // BlockBasedTableBuilder uses compressed block cache passed to BlockBasedTableBuilder constructor
252
  // inside BlockBasedTableOptions instance. If that cache is set in options (off by default)
253
  // all data written to files is also put into compressed block cache to reduce number of file
254
  // read requests later during read operations.
255
  // File blocks are referred in cache by keys, which are composed from following data (see
256
  // BlockBasedTableBuilder::InsertBlockInCache):
257
  // - cache key prefix (unique for each file), generated by GenerateCachePrefix
258
  // - block offset within a file.
259
  block_based_table::CacheKeyPrefixBuffer compressed_cache_key_prefix;
260
};
261
262
struct BlockBasedTableBuilder::Rep {
263
  const ImmutableCFOptions ioptions;
264
  const BlockBasedTableOptions table_options;
265
  InternalKeyComparatorPtr internal_comparator;
266
  // When two file writers are passed to BlockBasedTableBuilder during creation - we
267
  // use separate writers for data and metadata. In case BlockBasedTableBuilder was created for
268
  // single file - both data_writer and metadata_writer will actually point to the same structure,
269
  // so data and metadata will both go into one file. As of 2017-03-10 we support both cases.
270
  // Actually it is only allowed to pass nullptr as data_writer at BlockBasedTableBuilder::Rep
271
  // constructor and higher levels in order to support one method signature (with default nullptr
272
  // value) for handling both cases. At the level of BlockBasedTableBuilder implementation both
273
  // writers (inside BlockBasedTableBuilder::Rep) are not null and refer to
274
  // the same file or separate files.
275
  std::shared_ptr<FileWriterWithOffsetAndCachePrefix> metadata_writer;
276
  std::shared_ptr<FileWriterWithOffsetAndCachePrefix> data_writer;
277
  Status status;
278
279
  FilterType filter_type;
280
  std::unique_ptr<FilterBlockBuilder> filter_block_builder;
281
  BlockBuilder data_block_builder;
282
283
  InternalKeySliceTransform internal_prefix_transform;
284
  const FilterPolicy::KeyTransformer* const filter_key_transformer;
285
  std::unique_ptr<IndexBuilder> data_index_builder;
286
  IndexBuilder::IndexBlocks data_index_blocks;
287
  BlockHandle last_index_block_handle;
288
  std::unique_ptr<IndexBuilder> filter_index_builder;
289
290
  std::string last_key;
291
  std::string last_filter_key;
292
  const CompressionType compression_type;
293
  const CompressionOptions compression_opts;
294
  TableProperties props;
295
296
  bool closed = false;  // Either Finish() or Abandon() has been called.
297
298
  BlockHandle data_pending_handle;    // Handle to add to data index block
299
  BlockHandle filter_pending_handle;  // Handle to add to filter index block
300
301
  std::string compressed_output;
302
  std::unique_ptr<FlushBlockPolicy> flush_block_policy;
303
304
  std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
305
306
  yb::MemTrackerPtr mem_tracker;
307
308
  bool TEST_skip_writing_key_value_encoding_format_ = false;
309
310
  Rep(const ImmutableCFOptions& _ioptions,
311
      const BlockBasedTableOptions& table_opt,
312
      const InternalKeyComparatorPtr& icomparator,
313
      const IntTblPropCollectorFactories& int_tbl_prop_collector_factories,
314
      uint32_t column_family_id,
315
      WritableFileWriter* metadata_file,
316
      WritableFileWriter* data_file,
317
      const CompressionType _compression_type,
318
      const CompressionOptions& _compression_opts,
319
      const bool skip_filters);
320
321
47.6M
  bool is_split_sst() const { return data_writer != metadata_writer; }
322
};
323
324
Status BlockBasedTableBuilder::BlockBasedTablePropertiesCollector::Finish(
325
121k
    UserCollectedProperties* properties) {
326
121k
  std::string val;
327
121k
  PutFixed32(&val, static_cast<uint32_t>(index_type_));
328
121k
  properties->emplace(BlockBasedTablePropertyNames::kIndexType, val);
329
121k
  properties->emplace(
330
121k
      BlockBasedTablePropertyNames::kWholeKeyFiltering,
331
121k
      ToBlockBasedTablePropertyValue(whole_key_filtering_));
332
121k
  properties->emplace(
333
121k
      BlockBasedTablePropertyNames::kPrefixFiltering,
334
121k
      ToBlockBasedTablePropertyValue(prefix_filtering_));
335
121k
  val.clear();
336
121k
  PutFixed32(&val, rep_->data_index_builder->NumLevels());
337
121k
  properties->emplace(BlockBasedTablePropertyNames::kNumIndexLevels, val);
338
121k
  if (!rep_->TEST_skip_writing_key_value_encoding_format_) {
339
121k
    val.clear();
340
121k
    PutFixed8(&val, static_cast<uint8_t>(key_value_encoding_format_));
341
121k
    properties->emplace(BlockBasedTablePropertyNames::kDataBlockKeyValueEncodingFormat, val);
342
121k
  }
343
121k
  return Status::OK();
344
121k
}
345
346
BlockBasedTableBuilder::Rep::Rep(
347
    const ImmutableCFOptions& _ioptions,
348
    const BlockBasedTableOptions& table_opt,
349
    const InternalKeyComparatorPtr& icomparator,
350
    const IntTblPropCollectorFactories& int_tbl_prop_collector_factories,
351
    uint32_t column_family_id,
352
    WritableFileWriter* metadata_file,
353
    WritableFileWriter* data_file,
354
    const CompressionType _compression_type,
355
    const CompressionOptions& _compression_opts,
356
    const bool skip_filters)
357
    : ioptions(_ioptions),
358
      table_options(table_opt),
359
      internal_comparator(icomparator),
360
      filter_type(GetFilterType(table_options)),
361
      filter_block_builder(skip_filters ? nullptr : CreateFilterBlockBuilder(
362
          _ioptions, table_options, filter_type)),
363
      data_block_builder(
364
          table_options.block_restart_interval,
365
          table_options.data_block_key_value_encoding_format, table_options.use_delta_encoding),
366
      internal_prefix_transform(_ioptions.prefix_extractor),
367
      filter_key_transformer(table_opt.filter_policy ?
368
          table_opt.filter_policy->GetKeyTransformer() : nullptr),
369
      data_index_builder(
370
          IndexBuilder::CreateIndexBuilder(
371
              table_options.index_type, internal_comparator.get(), &internal_prefix_transform,
372
              table_options)),
373
      filter_index_builder(
374
          // Prefix_extractor is not used by binary search index which we use for bloom filter
375
          // blocks indexing.
376
          IndexBuilder::CreateIndexBuilder(
377
              IndexType::kBinarySearch, BytewiseComparator(),
378
              nullptr /* prefix_extractor */, table_options)),
379
      compression_type(_compression_type),
380
      compression_opts(_compression_opts),
381
      flush_block_policy(
382
          table_options.flush_block_policy_factory->NewFlushBlockPolicy(
383
65.6k
              table_options, data_block_builder)) {
384
65.6k
  if (_ioptions.mem_tracker) {
385
4.65k
    mem_tracker = yb::MemTracker::FindOrCreateTracker(
386
4.65k
        "BlockBasedTableBuilder", _ioptions.mem_tracker);
387
4.65k
  }
388
389
65.6k
  metadata_writer = std::make_shared<FileWriterWithOffsetAndCachePrefix>();
390
65.6k
  metadata_writer->writer = metadata_file;
391
65.6k
  if (data_file != nullptr) {
392
62.3k
    data_writer = std::make_shared<FileWriterWithOffsetAndCachePrefix>();
393
62.3k
    data_writer->writer = data_file;
394
62.3k
  } else {
395
3.24k
    data_writer = metadata_writer;
396
3.24k
  }
397
65.6k
  for (auto& collector_factories : int_tbl_prop_collector_factories) {
398
62.6k
    table_properties_collectors.emplace_back(
399
62.6k
        collector_factories->CreateIntTblPropCollector(column_family_id));
400
62.6k
  }
401
65.6k
  table_properties_collectors.emplace_back(new BlockBasedTablePropertiesCollector(
402
65.6k
      this, table_options.index_type, table_options.whole_key_filtering,
403
65.6k
      _ioptions.prefix_extractor != nullptr, table_options.data_block_key_value_encoding_format));
404
65.6k
}
405
406
BlockBasedTableBuilder::BlockBasedTableBuilder(
407
    const ImmutableCFOptions& ioptions,
408
    const BlockBasedTableOptions& table_options,
409
    const InternalKeyComparatorPtr& internal_comparator,
410
    const IntTblPropCollectorFactories& int_tbl_prop_collector_factories,
411
    uint32_t column_family_id,
412
    WritableFileWriter* metadata_file,
413
    WritableFileWriter* data_file,
414
    const CompressionType compression_type,
415
    const CompressionOptions& compression_opts,
416
65.6k
    const bool skip_filters) {
417
65.6k
  BlockBasedTableOptions sanitized_table_options(table_options);
418
65.6k
  if (sanitized_table_options.format_version == 0 &&
419
65.6k
      
sanitized_table_options.checksum != kCRC32c0
) {
420
0
    RLOG(InfoLogLevel::WARN_LEVEL, ioptions.info_log,
421
0
        "Silently converting format_version to 1 because checksum is "
422
0
        "non-default");
423
    // silently convert format_version to 1 to keep consistent with current
424
    // behavior
425
0
    sanitized_table_options.format_version = 1;
426
0
  }
427
428
65.6k
  rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator,
429
65.6k
                 int_tbl_prop_collector_factories, column_family_id, metadata_file, data_file,
430
65.6k
                 compression_type, compression_opts, skip_filters);
431
432
65.6k
  if (rep_->filter_block_builder != nullptr) {
433
11.5k
    rep_->filter_block_builder->StartBlock(0);
434
11.5k
  }
435
65.6k
  if (table_options.block_cache_compressed.get() != nullptr) {
436
714
    GenerateCachePrefix(
437
714
        table_options.block_cache_compressed.get(), metadata_file->writable_file(),
438
714
        &rep_->metadata_writer->compressed_cache_key_prefix);
439
714
    if (rep_->is_split_sst()) {
440
714
      GenerateCachePrefix(
441
714
          table_options.block_cache_compressed.get(), data_file->writable_file(),
442
714
          &rep_->metadata_writer->compressed_cache_key_prefix);
443
714
    }
444
714
  }
445
65.6k
}
446
447
65.5k
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
448
65.5k
  assert(rep_->closed);  // Catch errors where caller forgot to call Finish()
449
0
  delete rep_;
450
65.5k
}
451
452
179M
void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
453
179M
  Rep* const r = rep_;
454
179M
  DCHECK(!r->closed);
455
179M
  if (!ok()) 
return0
;
456
179M
  if (r->props.num_entries > 0) {
457
179M
    DCHECK_GT(r->internal_comparator->Compare(key, Slice(r->last_key)), 0)
458
0
        << "New key: " << key.ToDebugHexString()
459
0
        << ", last key: " << Slice(r->last_key).ToDebugHexString();
460
179M
  }
461
462
179M
  const auto should_flush_data = r->flush_block_policy->Update(key, value);
463
179M
  if (should_flush_data) {
464
2.79M
    DCHECK(!r->data_block_builder.empty());
465
2.79M
    FlushDataBlock(key);
466
2.79M
  }
467
468
179M
  if (r->filter_block_builder != nullptr) {
469
97.9M
    const Slice user_key = ExtractUserKey(key);
470
97.9M
    const Slice filter_key = r->filter_key_transformer ?
471
96.2M
        r->filter_key_transformer->Transform(user_key) : 
user_key1.75M
;
472
97.9M
    if (!filter_key.empty() &&
473
97.9M
        
(97.1M
r->props.num_entries == 097.1M
||
474
97.1M
         
BytewiseComparator()->Compare(r->last_filter_key, filter_key) != 097.1M
)) {
475
      // No need to insert duplicate keys into Bloom filter.
476
42.0M
      if (r->filter_block_builder->ShouldFlush()) {
477
2.84k
        FlushFilterBlock(&filter_key);
478
2.84k
      }
479
42.0M
      r->filter_block_builder->Add(filter_key);
480
42.0M
      r->last_filter_key.assign(filter_key.cdata(), filter_key.size());
481
42.0M
    }
482
97.9M
  }
483
484
179M
  r->last_key.assign(key.cdata(), key.size());
485
179M
  r->data_block_builder.Add(key, value);
486
179M
  r->props.num_entries++;
487
179M
  r->props.raw_key_size += key.size();
488
179M
  r->props.raw_value_size += value.size();
489
490
179M
  r->data_index_builder->OnKeyAdded(key);
491
492
179M
  NotifyCollectTableCollectorsOnAdd(key, value, r->data_writer->offset,
493
179M
      r->table_properties_collectors,
494
179M
      r->ioptions.info_log);
495
179M
}
496
497
2.86M
void BlockBasedTableBuilder::FlushDataBlock(const Slice& next_block_first_key) {
498
2.86M
  Rep* const r = rep_;
499
2.86M
  assert(!r->closed);
500
2.86M
  if (!ok()) 
return0
;
501
2.86M
  size_t data_block_size = 0;
502
503
2.86M
  if (
!r->data_block_builder.empty()2.86M
) {
504
2.86M
    data_block_size = WriteBlock(&r->data_block_builder, &r->data_pending_handle,
505
2.86M
        r->data_writer.get());
506
2.86M
  }
507
2.86M
  if (!ok()) 
return0
;
508
509
2.86M
  
if (2.86M
!r->table_options.skip_table_builder_flush2.86M
) {
510
2.86M
    r->status = r->data_writer->writer->Flush();
511
2.86M
  }
512
2.86M
  if (!ok()) 
return5
;
513
514
2.86M
  if (r->filter_block_builder != nullptr && 
r->filter_type == FilterType::kBlockBasedFilter578k
) {
515
    // For FilterType::kBlockBasedFilter separate block of bloom filter is written per data block.
516
51.5k
    r->filter_block_builder->StartBlock(r->data_writer->offset);
517
51.5k
  }
518
519
2.86M
  r->props.data_size += data_block_size;
520
2.86M
  ++r->props.num_data_blocks;
521
  // Add item to index block.
522
  // We do not emit the index entry for a block until we have seen the
523
  // first key for the next data block.  This allows us to use shorter
524
  // keys in the index block.  For example, consider a block boundary
525
  // between the keys "the quick brown fox" and "the who".  We can use
526
  // "the r" as the key for the index block entry since it is >= all
527
  // entries in the first block and < all entries in subsequent
528
  // blocks.
529
2.86M
  r->data_index_builder->AddIndexEntry(&r->last_key,
530
2.86M
      next_block_first_key.empty() ? 
nullptr65.4k
:
&next_block_first_key2.79M
,
531
2.86M
      r->data_pending_handle);
532
2.94M
  while (r->data_index_builder->ShouldFlush()) {
533
81.9k
    auto result = r->data_index_builder->FlushNextBlock(
534
81.9k
        &r->data_index_blocks, r->last_index_block_handle);
535
81.9k
    if (!result.ok()) {
536
0
      r->status = result.status();
537
0
      return;
538
0
    }
539
81.9k
    DCHECK(result.get());
540
81.9k
    WriteBlock(
541
81.9k
        r->data_index_blocks.index_block_contents, &r->last_index_block_handle,
542
81.9k
        r->metadata_writer.get());
543
81.9k
    if (!ok()) 
return0
;
544
81.9k
    ++r->props.num_data_index_blocks;
545
81.9k
  }
546
2.86M
}
547
548
14.3k
void BlockBasedTableBuilder::FlushFilterBlock(const Slice* const next_block_first_filter_key) {
549
14.3k
  Rep* const r = rep_;
550
14.3k
  assert(!r->closed);
551
0
  assert(r->filter_block_builder != nullptr);
552
14.3k
  if (!ok()) 
return0
;
553
554
14.3k
  const size_t filter_block_size = WriteRawBlock(r->filter_block_builder->Finish(), kNoCompression,
555
14.3k
      &r->filter_pending_handle, r->metadata_writer.get());
556
14.3k
  if (!ok()) 
return0
;
557
558
14.3k
  
if (14.3k
!r->table_options.skip_table_builder_flush14.3k
) {
559
14.3k
    r->status = r->metadata_writer->writer->Flush();
560
14.3k
  }
561
14.3k
  if (!ok()) 
return0
;
562
563
14.3k
  r->props.filter_size += filter_block_size;
564
14.3k
  ++r->props.num_filter_blocks;
565
566
14.3k
  const bool is_last_flush = next_block_first_filter_key == nullptr;
567
14.3k
  if (!is_last_flush) {
568
2.84k
    r->filter_block_builder->StartBlock(0);
569
2.84k
  }
570
571
  // See explanation in BlockBasedTableBuilder::FlushDataBlock.
572
14.3k
  r->filter_index_builder->AddIndexEntry(
573
14.3k
      &r->last_filter_key, next_block_first_filter_key, r->filter_pending_handle);
574
14.3k
}
575
576
size_t BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
577
                                          BlockHandle* handle,
578
2.86M
                                          FileWriterWithOffsetAndCachePrefix* writer_info) {
579
2.86M
  size_t block_size = WriteBlock(block->Finish(), handle, writer_info);
580
2.86M
  block->Reset();
581
2.86M
  return block_size;
582
2.86M
}
583
584
size_t BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
585
    BlockHandle* handle,
586
2.95M
    FileWriterWithOffsetAndCachePrefix* writer_info) {
587
  // File format contains a sequence of blocks where each block has:
588
  //    block_data: uint8[n]
589
  //    type: uint8
590
  //    crc: uint32
591
2.95M
  assert(ok());
592
0
  Rep* r = rep_;
593
594
2.95M
  auto type = r->compression_type;
595
2.95M
  Slice block_contents;
596
2.95M
  if (raw_block_contents.size() < kCompressionSizeLimit) {
597
2.95M
    block_contents =
598
2.95M
        CompressBlock(raw_block_contents, r->compression_opts, &type,
599
2.95M
                      r->table_options.format_version, &r->compressed_output);
600
2.95M
  } else {
601
111
    RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
602
111
    type = kNoCompression;
603
111
    block_contents = raw_block_contents;
604
111
  }
605
2.95M
  size_t block_size = WriteRawBlock(block_contents, type, handle, writer_info);
606
2.95M
  r->compressed_output.clear();
607
2.95M
  return block_size;
608
2.95M
}
609
610
size_t BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
611
                                             CompressionType type,
612
                                             BlockHandle* handle,
613
3.10M
                                             FileWriterWithOffsetAndCachePrefix* writer_info) {
614
3.10M
  Rep* r = rep_;
615
3.10M
  StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
616
3.10M
  const auto start_offset = writer_info->offset;
617
3.10M
  handle->set_offset(writer_info->offset);
618
3.10M
  handle->set_size(block_contents.size());
619
3.10M
  r->status = writer_info->writer->Append(block_contents);
620
3.10M
  if (
r->status.ok()3.10M
) {
621
3.10M
    char trailer[kBlockTrailerSize];
622
3.10M
    trailer[0] = type;
623
3.10M
    char* trailer_without_type = trailer + 1;
624
3.10M
    switch (r->table_options.checksum) {
625
0
      case kNoChecksum:
626
        // we don't support no checksum yet
627
0
        assert(false);
628
        // intentional fallthrough in release binary
629
        // We add a fallthrough annotation in release mode only -- otherwise we get a compile error
630
        // in debug mode ("fallthrough annotation in unreachable code").
631
#ifdef NDEBUG
632
        FALLTHROUGH_INTENDED;
633
#endif
634
3.09M
      case kCRC32c: {
635
3.09M
        auto crc = crc32c::Value(block_contents.data(), block_contents.size());
636
3.09M
        crc = crc32c::Extend(crc, trailer, 1);  // Extend to cover block type
637
3.09M
        EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
638
3.09M
        break;
639
0
      }
640
6.99k
      case kxxHash: {
641
6.99k
        void* xxh = XXH32_init(0);
642
6.99k
        XXH32_update(xxh, block_contents.data(),
643
6.99k
                     static_cast<uint32_t>(block_contents.size()));
644
6.99k
        XXH32_update(xxh, trailer, 1);  // Extend  to cover block type
645
6.99k
        EncodeFixed32(trailer_without_type, XXH32_digest(xxh));
646
6.99k
        break;
647
0
      }
648
3.10M
    }
649
650
3.10M
    r->status = writer_info->writer->Append(Slice(trailer, kBlockTrailerSize));
651
3.10M
    if (
r->status.ok()3.10M
) {
652
3.10M
      r->status = InsertBlockInCache(block_contents, type, handle, writer_info);
653
3.10M
    }
654
3.10M
    if (
r->status.ok()3.10M
) {
655
3.10M
      writer_info->offset += block_contents.size() + kBlockTrailerSize;
656
3.10M
    }
657
3.10M
  }
658
3.10M
  return writer_info->offset - start_offset;
659
3.10M
}
660
661
191M
Status BlockBasedTableBuilder::status() const {
662
191M
  return rep_->status;
663
191M
}
664
665
4.82k
static void DeleteCachedBlock(const Slice& key, void* value) {
666
4.82k
  Block* block = reinterpret_cast<Block*>(value);
667
4.82k
  delete block;
668
4.82k
}
669
670
//
671
// Make a copy of the block contents and insert into compressed block cache
672
//
673
Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
674
                                                  const CompressionType type,
675
    const BlockHandle* handle,
676
3.10M
    FileWriterWithOffsetAndCachePrefix* writer_info) {
677
3.10M
  Rep* r = rep_;
678
3.10M
  Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
679
680
3.10M
  if (type != kNoCompression && 
block_cache_compressed != nullptr1.03M
) {
681
682
4.82k
    size_t size = block_contents.size();
683
684
4.82k
    std::unique_ptr<char[]> ubuf(new char[size + 1]);
685
4.82k
    memcpy(ubuf.get(), block_contents.data(), size);
686
4.82k
    ubuf[size] = type;
687
688
4.82k
    BlockContents results(std::move(ubuf), size, true, type, r->mem_tracker);
689
690
4.82k
    Block* block = new Block(std::move(results));
691
692
    // make cache key by appending the file offset to the cache prefix id
693
4.82k
    char* end = EncodeVarint64(
694
4.82k
                  writer_info->compressed_cache_key_prefix.data +
695
4.82k
                  writer_info->compressed_cache_key_prefix.size,
696
4.82k
                  handle->offset());
697
4.82k
    Slice key(writer_info->compressed_cache_key_prefix.data,
698
4.82k
        static_cast<size_t> (end - writer_info->compressed_cache_key_prefix.data));
699
700
    // Insert into compressed block cache.
701
4.82k
    RETURN_NOT_OK(block_cache_compressed->Insert(
702
4.82k
        key, kDefaultQueryId, block, block->usable_size(), &DeleteCachedBlock));
703
704
    // Invalidate OS cache.
705
4.82k
    auto status = writer_info->writer->InvalidateCache(
706
4.82k
        static_cast<size_t>(writer_info->offset), size);
707
4.82k
    if (!status.ok() && 
!status.IsNotSupported()4.42k
) {
708
0
      return status;
709
0
    }
710
4.82k
  }
711
3.10M
  return Status::OK();
712
3.10M
}
713
714
65.5k
Status BlockBasedTableBuilder::Finish() {
715
65.5k
  Rep* r = rep_;
716
65.5k
  Slice end_slice;
717
65.5k
  if (!r->data_block_builder.empty()) {
718
65.4k
    FlushDataBlock(end_slice);  // no more data block
719
65.4k
  }
720
65.5k
  if (r->filter_block_builder != nullptr) {
721
11.5k
    FlushFilterBlock(nullptr);  // no more filter block
722
11.5k
  }
723
65.5k
  assert(!r->closed);
724
0
  r->closed = true;
725
726
65.5k
  auto index_finish_result = r->data_index_builder->FlushNextBlock(
727
65.5k
      &r->data_index_blocks, r->last_index_block_handle);
728
65.5k
  RETURN_NOT_OK(index_finish_result);
729
65.5k
  if (index_finish_result.get()) {
730
1.51k
    ++r->props.num_data_index_blocks;
731
1.51k
  }
732
733
  // Write meta blocks and metaindex block with the following order.
734
  //    1. [meta block: filter]
735
  //    2. [other meta blocks]
736
  //    3. [meta block: properties]
737
  //    4. [metaindex block]
738
  // write meta blocks
739
65.5k
  MetaIndexBuilder meta_index_builder;
740
65.5k
  for (const auto& item : r->data_index_blocks.meta_blocks) {
741
2.90k
    BlockHandle block_handle;
742
2.90k
    WriteBlock(item.second, &block_handle, r->metadata_writer.get());
743
2.90k
    meta_index_builder.Add(item.first, block_handle);
744
2.90k
  }
745
746
65.5k
  if (ok()) {
747
65.5k
    if (r->filter_block_builder != nullptr) {
748
      // Add mapping from "<filter_block_prefix>.Name" to location of either filter block or
749
      // filter index (for fixed-size bloom filter). We only need filter index for fixed-size bloom
750
      // filter, which is stored as separate blocks in SST file. Filters of other types are stored
751
      // as single block in SST file, so we just need an offset to that block instead of index.
752
11.5k
      std::string key;
753
11.5k
      switch (r->filter_type) {
754
3.45k
        case FilterType::kFullFilter:
755
3.45k
          key = block_based_table::kFullFilterBlockPrefix;
756
3.45k
          break;
757
961
        case FilterType::kBlockBasedFilter:
758
961
          key = block_based_table::kFilterBlockPrefix;
759
961
          break;
760
7.09k
        case FilterType::kFixedSizeFilter:
761
7.09k
          key = block_based_table::kFixedSizeFilterBlockPrefix;
762
7.09k
          break;
763
0
        case FilterType::kNoFilter:
764
0
          RLOG(InfoLogLevel::FATAL_LEVEL, r->ioptions.info_log,
765
0
              "r->filter_block_builder should be null for FilterType::kNoFilter");
766
0
          assert(false);
767
11.5k
      }
768
11.5k
      key.append(r->table_options.filter_policy->Name());
769
11.5k
      if (r->filter_type == FilterType::kFixedSizeFilter) {
770
        // Flush the fixed-size bloom filter index and add its offset under the corresponding
771
        // key to meta index.
772
7.09k
        IndexBuilder::IndexBlocks filter_index_blocks;
773
7.09k
        RETURN_NOT_OK(r->filter_index_builder->Finish(&filter_index_blocks));
774
7.09k
        BlockHandle filter_index_block_handle;
775
7.09k
        WriteBlock(filter_index_blocks.index_block_contents, &filter_index_block_handle,
776
7.09k
            r->metadata_writer.get());
777
7.09k
        meta_index_builder.Add(key, filter_index_block_handle);
778
7.09k
        r->props.filter_index_size = r->filter_index_builder->EstimatedSize() + kBlockTrailerSize;
779
7.09k
      } else {
780
4.41k
        meta_index_builder.Add(key, r->filter_pending_handle);
781
4.41k
      }
782
11.5k
    }
783
784
    // Write properties block.
785
65.5k
    {
786
65.5k
      PropertyBlockBuilder property_block_builder;
787
65.5k
      r->props.filter_policy_name = r->table_options.filter_policy != nullptr ?
788
53.8k
          
r->table_options.filter_policy->Name()11.6k
: "";
789
65.5k
      r->props.data_index_size =
790
65.5k
          r->data_index_builder->EstimatedSize() + kBlockTrailerSize;
791
792
      // Add basic properties
793
65.5k
      property_block_builder.AddTableProperty(r->props);
794
795
      // Add use collected properties
796
65.5k
      NotifyCollectTableCollectorsOnFinish(r->table_properties_collectors,
797
65.5k
          r->ioptions.info_log,
798
65.5k
          &property_block_builder);
799
800
65.5k
      BlockHandle properties_block_handle;
801
65.5k
      WriteRawBlock(
802
65.5k
          property_block_builder.Finish(),
803
65.5k
          kNoCompression,
804
65.5k
          &properties_block_handle,
805
65.5k
          r->metadata_writer.get()
806
65.5k
      );
807
808
65.5k
      meta_index_builder.Add(kPropertiesBlock, properties_block_handle);
809
65.5k
    }  // end of properties block writing
810
65.5k
  }    // meta blocks
811
812
65.5k
  BlockHandle meta_index_block_handle;
813
  // Write meta index and index block.
814
65.5k
  if (ok()) {
815
    // Flush the meta index block.
816
65.5k
    WriteRawBlock(
817
65.5k
        meta_index_builder.Finish(), kNoCompression, &meta_index_block_handle,
818
65.5k
        r->metadata_writer.get());
819
    // Flush index block if not already flushed.
820
65.5k
    if (index_finish_result.get()) {
821
1.50k
      WriteBlock(
822
1.50k
          r->data_index_blocks.index_block_contents, &r->last_index_block_handle,
823
1.50k
          r->metadata_writer.get());
824
1.50k
    }
825
65.5k
  }
826
827
  // Write footer
828
65.5k
  if (ok()) {
829
    // No need to write out new footer if we're using default checksum.
830
    // We're writing legacy magic number because we want old versions of RocksDB
831
    // be able to read files generated with new release (just in case if
832
    // somebody wants to roll back after an upgrade)
833
    // TODO(icanadi) at some point in the future, when we're absolutely sure
834
    // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
835
    // number and always write new table files with new magic number
836
65.5k
    bool legacy = (r->table_options.format_version == 0);
837
    // this is guaranteed by BlockBasedTableBuilder's constructor
838
65.5k
    assert(r->table_options.checksum == kCRC32c ||
839
65.5k
        r->table_options.format_version != 0);
840
65.5k
    Footer footer(legacy ? 
kLegacyBlockBasedTableMagicNumber0
841
65.5k
            : kBlockBasedTableMagicNumber,
842
65.5k
        r->table_options.format_version);
843
65.5k
    footer.set_metaindex_handle(meta_index_block_handle);
844
65.5k
    footer.set_index_handle(r->last_index_block_handle);
845
65.5k
    footer.set_checksum(r->table_options.checksum);
846
65.5k
    std::string footer_encoding;
847
65.5k
    footer.AppendEncodedTo(&footer_encoding);
848
65.5k
    r->status = r->metadata_writer->writer->Append(footer_encoding);
849
65.5k
    if (r->status.ok()) {
850
65.5k
      r->metadata_writer->offset += footer_encoding.size();
851
65.5k
    }
852
65.5k
  }
853
854
0
  return r->status;
855
65.5k
}
856
857
55
void BlockBasedTableBuilder::Abandon() {
858
55
  Rep* r = rep_;
859
55
  assert(!r->closed);
860
0
  r->closed = true;
861
55
}
862
863
52.7k
uint64_t BlockBasedTableBuilder::NumEntries() const {
864
52.7k
  return rep_->props.num_entries;
865
52.7k
}
866
867
47.7M
uint64_t BlockBasedTableBuilder::TotalFileSize() const {
868
47.7M
  return rep_->is_split_sst() ? 
rep_->metadata_writer->offset + rep_->data_writer->offset47.5M
:
869
47.7M
      
rep_->metadata_writer->offset140k
;
870
47.7M
}
871
872
62.0k
uint64_t BlockBasedTableBuilder::BaseFileSize() const {
873
62.0k
  return rep_->metadata_writer->offset;
874
62.0k
}
875
876
52.7k
bool BlockBasedTableBuilder::NeedCompact() const {
877
105k
  for (const auto& collector : rep_->table_properties_collectors) {
878
105k
    if (collector->NeedCompact()) {
879
5
      return true;
880
5
    }
881
105k
  }
882
52.7k
  return false;
883
52.7k
}
884
885
55.9k
TableProperties BlockBasedTableBuilder::GetTableProperties() const {
886
55.9k
  TableProperties ret = rep_->props;
887
108k
  for (const auto& collector : rep_->table_properties_collectors) {
888
108k
    for (const auto& prop : collector->GetReadableProperties()) {
889
52.7k
      ret.readable_properties.insert(prop);
890
52.7k
    }
891
108k
    CHECK_OK(collector->Finish(&ret.user_collected_properties));
892
108k
  }
893
55.9k
  return ret;
894
55.9k
}
895
896
6
void BlockBasedTableBuilder::TEST_skip_writing_key_value_encoding_format() {
897
6
  rep_->TEST_skip_writing_key_value_encoding_format_ = true;
898
6
}
899
900
}  // namespace rocksdb