YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/builder.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/builder.h"
25
26
#include <stdint.h>
27
28
#include <algorithm>
29
#include <deque>
30
#include <limits>
31
#include <string>
32
#include <utility>
33
#include <vector>
34
35
#include "yb/rocksdb/db/compaction_iterator.h"
36
#include "yb/rocksdb/db/dbformat.h"
37
#include "yb/rocksdb/db/filename.h"
38
#include "yb/rocksdb/db/internal_stats.h"
39
#include "yb/rocksdb/db/merge_helper.h"
40
#include "yb/rocksdb/db/table_cache.h"
41
#include "yb/rocksdb/db/version_edit.h"
42
#include "yb/rocksdb/env.h"
43
#include "yb/rocksdb/iterator.h"
44
#include "yb/rocksdb/options.h"
45
#include "yb/rocksdb/status.h"
46
#include "yb/rocksdb/table.h"
47
#include "yb/rocksdb/table/internal_iterator.h"
48
#include "yb/rocksdb/table/table_builder.h"
49
#include "yb/rocksdb/util/file_reader_writer.h"
50
#include "yb/rocksdb/util/stop_watch.h"
51
52
#include "yb/util/result.h"
53
54
namespace rocksdb {
55
56
class TableFactory;
57
58
TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions,
59
                              const InternalKeyComparatorPtr& internal_comparator,
60
                              const IntTblPropCollectorFactories& int_tbl_prop_collector_factories,
61
                              uint32_t column_family_id,
62
                              WritableFileWriter* file,
63
                              const CompressionType compression_type,
64
                              const CompressionOptions& compression_opts,
65
36
                              const bool skip_filters) {
66
36
  return ioptions.table_factory->NewTableBuilder(
67
36
      TableBuilderOptions(ioptions, internal_comparator,
68
36
                          int_tbl_prop_collector_factories, compression_type,
69
36
                          compression_opts, skip_filters),
70
36
      column_family_id, file);
71
36
}
72
73
TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions,
74
                              const InternalKeyComparatorPtr& internal_comparator,
75
                              const IntTblPropCollectorFactories& int_tbl_prop_collector_factories,
76
                              uint32_t column_family_id,
77
                              WritableFileWriter* metadata_file,
78
                              WritableFileWriter* data_file,
79
                              const CompressionType compression_type,
80
                              const CompressionOptions& compression_opts,
81
54.1k
                              const bool skip_filters) {
82
54.1k
  return ioptions.table_factory->NewTableBuilder(
83
54.1k
      TableBuilderOptions(ioptions, internal_comparator,
84
54.1k
          int_tbl_prop_collector_factories, compression_type,
85
54.1k
          compression_opts, skip_filters),
86
54.1k
      column_family_id, metadata_file, data_file);
87
54.1k
}
88
89
namespace {
90
  Status CreateWritableFileWriter(const std::string& filename, const EnvOptions& env_options,
91
      const Env::IOPriority io_priority, Env* env,
92
61.3k
      std::shared_ptr<WritableFileWriter>* file_writer) {
93
61.3k
    unique_ptr<WritableFile> file;
94
61.3k
    Status s = NewWritableFile(env, filename, &file, env_options);
95
61.3k
    if (!s.ok()) {
96
13
      return s;
97
13
    }
98
61.2k
    file->SetIOPriority(io_priority);
99
61.2k
    file_writer->reset(new WritableFileWriter(std::move(file), env_options));
100
61.2k
    return Status::OK();
101
61.3k
  }
102
} // anonymous namespace
103
104
Status BuildTable(const std::string& dbname,
105
                  Env* env,
106
                  const ImmutableCFOptions& ioptions,
107
                  const EnvOptions& env_options,
108
                  TableCache* table_cache,
109
                  InternalIterator* iter,
110
                  FileMetaData* meta,
111
                  const InternalKeyComparatorPtr& internal_comparator,
112
                  const IntTblPropCollectorFactories& int_tbl_prop_collector_factories,
113
                  uint32_t column_family_id,
114
                  std::vector<SequenceNumber> snapshots,
115
                  SequenceNumber earliest_write_conflict_snapshot,
116
                  const CompressionType compression,
117
                  const CompressionOptions& compression_opts,
118
                  bool paranoid_file_checks,
119
                  InternalStats* internal_stats,
120
                  BoundaryValuesExtractor* boundary_values_extractor,
121
                  const Env::IOPriority io_priority,
122
32.2k
                  TableProperties* table_properties) {
123
  // Reports the IOStats for flush for every following bytes.
124
32.2k
  const size_t kReportFlushIOStatsEvery = 1048576;
125
32.2k
  Status s;
126
32.2k
  meta->fd.total_file_size = 0;
127
32.2k
  meta->fd.base_file_size = 0;
128
32.2k
  iter->SeekToFirst();
129
130
32.2k
  const bool is_split_sst = ioptions.table_factory->IsSplitSstForWriteSupported();
131
132
32.2k
  const std::string base_fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
133
32.2k
                                             meta->fd.GetPathId());
134
32.2k
  const std::string data_fname = is_split_sst ? 
TableBaseToDataFileName(base_fname)31.1k
:
""1.08k
;
135
32.2k
  if (iter->Valid()) {
136
31.1k
    shared_ptr<WritableFileWriter> base_file_writer;
137
31.1k
    shared_ptr<WritableFileWriter> data_file_writer;
138
31.1k
    s = CreateWritableFileWriter(base_fname, env_options, io_priority, env, &base_file_writer);
139
31.1k
    if (s.ok() && 
is_split_sst31.1k
) {
140
30.1k
      s = CreateWritableFileWriter(data_fname, env_options, io_priority, env, &data_file_writer);
141
30.1k
    }
142
31.1k
    if (!s.ok()) {
143
13
      return s;
144
13
    }
145
31.1k
    std::unique_ptr<TableBuilder> builder(NewTableBuilder(
146
31.1k
        ioptions, internal_comparator, int_tbl_prop_collector_factories,
147
31.1k
        column_family_id, base_file_writer.get(), data_file_writer.get(), compression,
148
31.1k
        compression_opts));
149
150
31.1k
    MergeHelper merge(env, internal_comparator->user_comparator(),
151
31.1k
                      ioptions.merge_operator, nullptr, ioptions.info_log,
152
31.1k
                      ioptions.min_partial_merge_operands,
153
31.1k
                      true /* internal key corruption is not ok */,
154
31.1k
                      snapshots.empty() ? 
030.7k
:
snapshots.back()393
);
155
156
31.1k
    CompactionIterator c_iter(iter, internal_comparator->user_comparator(),
157
31.1k
                              &merge, kMaxSequenceNumber, &snapshots,
158
31.1k
                              earliest_write_conflict_snapshot,
159
31.1k
                              true /* internal key corruption is not ok */);
160
31.1k
    c_iter.SeekToFirst();
161
111M
    for (; c_iter.Valid(); 
c_iter.Next()111M
) {
162
111M
      const Slice& key = c_iter.key();
163
111M
      const Slice& value = c_iter.value();
164
111M
      builder->Add(key, value);
165
111M
      auto boundaries = MakeFileBoundaryValues(boundary_values_extractor, key, value);
166
111M
      if (!boundaries) {
167
0
        builder->Abandon();
168
0
        return std::move(boundaries.status());
169
0
      }
170
111M
      auto& boundary_values = *boundaries;
171
111M
      meta->UpdateBoundaries(std::move(boundary_values.key), boundary_values);
172
111M
    }
173
174
    // Finish and check for builder errors
175
31.1k
    bool empty = builder->NumEntries() == 0;
176
31.1k
    s = c_iter.status();
177
31.1k
    if (!s.ok() || 
empty31.1k
) {
178
26
      builder->Abandon();
179
31.1k
    } else {
180
31.1k
      s = builder->Finish();
181
31.1k
    }
182
183
31.1k
    if (s.ok() && 
!empty31.1k
) {
184
31.1k
      meta->fd.total_file_size = builder->TotalFileSize();
185
31.1k
      meta->fd.base_file_size = builder->BaseFileSize();
186
31.1k
      meta->marked_for_compaction = builder->NeedCompact();
187
31.1k
      assert(meta->fd.GetTotalFileSize() > 0);
188
31.1k
      if (table_properties) {
189
31.1k
        *table_properties = builder->GetTableProperties();
190
31.1k
      }
191
31.1k
    }
192
193
    // Finish and check for file errors
194
31.1k
    if (s.ok() && 
!empty31.1k
&&
!ioptions.disable_data_sync31.1k
) {
195
24.9k
      if (is_split_sst) {
196
23.8k
        RETURN_NOT_OK(data_file_writer->Sync(ioptions.use_fsync));
197
23.8k
      }
198
24.9k
      RETURN_NOT_OK(base_file_writer->Sync(ioptions.use_fsync));
199
24.9k
    }
200
31.1k
    if (s.ok() && 
!empty31.1k
&&
is_split_sst31.1k
) {
201
30.0k
      s = data_file_writer->Close();
202
30.0k
    }
203
31.1k
    if (s.ok() && 
!empty31.1k
) {
204
31.1k
      s = base_file_writer->Close();
205
31.1k
    }
206
207
31.1k
    if (s.ok() && 
!empty31.1k
) {
208
      // Verify that the table is usable
209
31.1k
      std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
210
31.1k
          ReadOptions(), env_options, internal_comparator, meta->fd, meta->UserFilter(), nullptr,
211
31.1k
          (internal_stats == nullptr) ? 
nullptr2
212
31.1k
                                      : 
internal_stats->GetFileReadHist(0)31.1k
,
213
31.1k
          false));
214
31.1k
      s = it->status();
215
31.1k
      if (s.ok() && 
paranoid_file_checks31.1k
) {
216
6
        for (it->SeekToFirst(); it->Valid(); 
it->Next()4
) {
217
4
        }
218
2
        s = it->status();
219
2
      }
220
31.1k
    }
221
31.1k
  }
222
223
  // Check for input iterator errors
224
32.2k
  if (!iter->status().ok()) {
225
0
    s = iter->status();
226
0
  }
227
228
32.2k
  if (!s.ok() || 
meta->fd.GetTotalFileSize() == 032.1k
) {
229
1.05k
    env->CleanupFile(base_fname);
230
1.05k
    if (is_split_sst) {
231
1.05k
      env->CleanupFile(data_fname);
232
1.05k
    }
233
1.05k
  }
234
32.2k
  return s;
235
32.2k
}
236
237
}  // namespace rocksdb