YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/builder.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/builder.h"
25
26
#include <stdint.h>
27
28
#include <algorithm>
29
#include <deque>
30
#include <limits>
31
#include <string>
32
#include <utility>
33
#include <vector>
34
35
#include "yb/rocksdb/db/compaction_iterator.h"
36
#include "yb/rocksdb/db/dbformat.h"
37
#include "yb/rocksdb/db/filename.h"
38
#include "yb/rocksdb/db/internal_stats.h"
39
#include "yb/rocksdb/db/merge_helper.h"
40
#include "yb/rocksdb/db/table_cache.h"
41
#include "yb/rocksdb/db/version_edit.h"
42
#include "yb/rocksdb/env.h"
43
#include "yb/rocksdb/iterator.h"
44
#include "yb/rocksdb/options.h"
45
#include "yb/rocksdb/status.h"
46
#include "yb/rocksdb/table.h"
47
#include "yb/rocksdb/table/internal_iterator.h"
48
#include "yb/rocksdb/table/table_builder.h"
49
#include "yb/rocksdb/util/file_reader_writer.h"
50
#include "yb/rocksdb/util/stop_watch.h"
51
52
#include "yb/util/result.h"
53
54
namespace rocksdb {
55
56
class TableFactory;
57
58
TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions,
59
                              const InternalKeyComparatorPtr& internal_comparator,
60
                              const IntTblPropCollectorFactories& int_tbl_prop_collector_factories,
61
                              uint32_t column_family_id,
62
                              WritableFileWriter* file,
63
                              const CompressionType compression_type,
64
                              const CompressionOptions& compression_opts,
65
36
                              const bool skip_filters) {
66
36
  return ioptions.table_factory->NewTableBuilder(
67
36
      TableBuilderOptions(ioptions, internal_comparator,
68
36
                          int_tbl_prop_collector_factories, compression_type,
69
36
                          compression_opts, skip_filters),
70
36
      column_family_id, file);
71
36
}
72
73
TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions,
74
                              const InternalKeyComparatorPtr& internal_comparator,
75
                              const IntTblPropCollectorFactories& int_tbl_prop_collector_factories,
76
                              uint32_t column_family_id,
77
                              WritableFileWriter* metadata_file,
78
                              WritableFileWriter* data_file,
79
                              const CompressionType compression_type,
80
                              const CompressionOptions& compression_opts,
81
50.0k
                              const bool skip_filters) {
82
50.0k
  return ioptions.table_factory->NewTableBuilder(
83
50.0k
      TableBuilderOptions(ioptions, internal_comparator,
84
50.0k
          int_tbl_prop_collector_factories, compression_type,
85
50.0k
          compression_opts, skip_filters),
86
50.0k
      column_family_id, metadata_file, data_file);
87
50.0k
}
88
89
namespace {
90
  Status CreateWritableFileWriter(const std::string& filename, const EnvOptions& env_options,
91
      const Env::IOPriority io_priority, Env* env,
92
54.7k
      std::shared_ptr<WritableFileWriter>* file_writer) {
93
54.7k
    unique_ptr<WritableFile> file;
94
54.7k
    Status s = NewWritableFile(env, filename, &file, env_options);
95
54.7k
    if (!s.ok()) {
96
13
      return s;
97
13
    }
98
54.7k
    file->SetIOPriority(io_priority);
99
54.7k
    file_writer->reset(new WritableFileWriter(std::move(file), env_options));
100
54.7k
    return Status::OK();
101
54.7k
  }
102
} // anonymous namespace
103
104
Status BuildTable(const std::string& dbname,
105
                  Env* env,
106
                  const ImmutableCFOptions& ioptions,
107
                  const EnvOptions& env_options,
108
                  TableCache* table_cache,
109
                  InternalIterator* iter,
110
                  FileMetaData* meta,
111
                  const InternalKeyComparatorPtr& internal_comparator,
112
                  const IntTblPropCollectorFactories& int_tbl_prop_collector_factories,
113
                  uint32_t column_family_id,
114
                  std::vector<SequenceNumber> snapshots,
115
                  SequenceNumber earliest_write_conflict_snapshot,
116
                  const CompressionType compression,
117
                  const CompressionOptions& compression_opts,
118
                  bool paranoid_file_checks,
119
                  InternalStats* internal_stats,
120
                  BoundaryValuesExtractor* boundary_values_extractor,
121
                  const Env::IOPriority io_priority,
122
28.7k
                  TableProperties* table_properties) {
123
  // Reports the IOStats for flush for every following bytes.
124
28.7k
  const size_t kReportFlushIOStatsEvery = 1048576;
125
28.7k
  Status s;
126
28.7k
  meta->fd.total_file_size = 0;
127
28.7k
  meta->fd.base_file_size = 0;
128
28.7k
  iter->SeekToFirst();
129
130
28.7k
  const bool is_split_sst = ioptions.table_factory->IsSplitSstForWriteSupported();
131
132
28.7k
  const std::string base_fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
133
28.7k
                                             meta->fd.GetPathId());
134
27.6k
  const std::string data_fname = is_split_sst ? TableBaseToDataFileName(base_fname) : "";
135
28.7k
  if (iter->Valid()) {
136
27.9k
    shared_ptr<WritableFileWriter> base_file_writer;
137
27.9k
    shared_ptr<WritableFileWriter> data_file_writer;
138
27.9k
    s = CreateWritableFileWriter(base_fname, env_options, io_priority, env, &base_file_writer);
139
27.9k
    if (s.ok() && is_split_sst) {
140
26.8k
      s = CreateWritableFileWriter(data_fname, env_options, io_priority, env, &data_file_writer);
141
26.8k
    }
142
27.9k
    if (!s.ok()) {
143
13
      return s;
144
13
    }
145
27.9k
    std::unique_ptr<TableBuilder> builder(NewTableBuilder(
146
27.9k
        ioptions, internal_comparator, int_tbl_prop_collector_factories,
147
27.9k
        column_family_id, base_file_writer.get(), data_file_writer.get(), compression,
148
27.9k
        compression_opts));
149
150
27.9k
    MergeHelper merge(env, internal_comparator->user_comparator(),
151
27.9k
                      ioptions.merge_operator, nullptr, ioptions.info_log,
152
27.9k
                      ioptions.min_partial_merge_operands,
153
27.9k
                      true /* internal key corruption is not ok */,
154
27.5k
                      snapshots.empty() ? 0 : snapshots.back());
155
156
27.9k
    CompactionIterator c_iter(iter, internal_comparator->user_comparator(),
157
27.9k
                              &merge, kMaxSequenceNumber, &snapshots,
158
27.9k
                              earliest_write_conflict_snapshot,
159
27.9k
                              true /* internal key corruption is not ok */);
160
27.9k
    c_iter.SeekToFirst();
161
37.8M
    for (; c_iter.Valid(); c_iter.Next()) {
162
37.8M
      const Slice& key = c_iter.key();
163
37.8M
      const Slice& value = c_iter.value();
164
37.8M
      builder->Add(key, value);
165
37.8M
      auto boundaries = MakeFileBoundaryValues(boundary_values_extractor, key, value);
166
37.8M
      if (!boundaries) {
167
0
        builder->Abandon();
168
0
        return std::move(boundaries.status());
169
0
      }
170
37.8M
      auto& boundary_values = *boundaries;
171
37.8M
      meta->UpdateBoundaries(std::move(boundary_values.key), boundary_values);
172
37.8M
    }
173
174
    // Finish and check for builder errors
175
27.9k
    bool empty = builder->NumEntries() == 0;
176
27.9k
    s = c_iter.status();
177
27.9k
    if (!s.ok() || empty) {
178
26
      builder->Abandon();
179
27.8k
    } else {
180
27.8k
      s = builder->Finish();
181
27.8k
    }
182
183
27.9k
    if (s.ok() && !empty) {
184
27.8k
      meta->fd.total_file_size = builder->TotalFileSize();
185
27.8k
      meta->fd.base_file_size = builder->BaseFileSize();
186
27.8k
      meta->marked_for_compaction = builder->NeedCompact();
187
27.8k
      assert(meta->fd.GetTotalFileSize() > 0);
188
27.8k
      if (table_properties) {
189
27.8k
        *table_properties = builder->GetTableProperties();
190
27.8k
      }
191
27.8k
    }
192
193
    // Finish and check for file errors
194
27.9k
    if (s.ok() && !empty && !ioptions.disable_data_sync) {
195
25.0k
      if (is_split_sst) {
196
23.9k
        RETURN_NOT_OK(data_file_writer->Sync(ioptions.use_fsync));
197
23.9k
      }
198
25.0k
      RETURN_NOT_OK(base_file_writer->Sync(ioptions.use_fsync));
199
25.0k
    }
200
27.9k
    if (s.ok() && !empty && is_split_sst) {
201
26.7k
      s = data_file_writer->Close();
202
26.7k
    }
203
27.9k
    if (s.ok() && !empty) {
204
27.8k
      s = base_file_writer->Close();
205
27.8k
    }
206
207
27.9k
    if (s.ok() && !empty) {
208
      // Verify that the table is usable
209
27.8k
      std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
210
27.8k
          ReadOptions(), env_options, internal_comparator, meta->fd, meta->UserFilter(), nullptr,
211
2
          (internal_stats == nullptr) ? nullptr
212
27.8k
                                      : internal_stats->GetFileReadHist(0),
213
27.8k
          false));
214
27.8k
      s = it->status();
215
27.8k
      if (s.ok() && paranoid_file_checks) {
216
6
        for (it->SeekToFirst(); it->Valid(); it->Next()) {
217
4
        }
218
2
        s = it->status();
219
2
      }
220
27.8k
    }
221
27.9k
  }
222
223
  // Check for input iterator errors
224
28.7k
  if (!iter->status().ok()) {
225
0
    s = iter->status();
226
0
  }
227
228
28.7k
  if (!s.ok() || meta->fd.GetTotalFileSize() == 0) {
229
827
    env->CleanupFile(base_fname);
230
827
    if (is_split_sst) {
231
823
      env->CleanupFile(data_fname);
232
823
    }
233
827
  }
234
28.7k
  return s;
235
28.7k
}
236
237
}  // namespace rocksdb