YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/table/sst_file_writer.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include "yb/rocksdb/sst_file_writer.h"
22
23
#include <stdint.h>
24
25
#include <limits>
26
#include <string>
27
#include <utility>
28
#include <vector>
29
30
#include "yb/rocksdb/db/dbformat.h"
31
#include "yb/rocksdb/db/filename.h"
32
#include "yb/rocksdb/table.h"
33
#include "yb/rocksdb/options.h"
34
#include "yb/rocksdb/status.h"
35
#include "yb/rocksdb/table/table_builder.h"
36
#include "yb/rocksdb/util/file_reader_writer.h"
37
#include "yb/util/string_util.h"
38
39
namespace rocksdb {
40
41
const char ExternalSstFilePropertyNames::kVersion[] =
42
    "rocksdb.external_sst_file.version";
43
44
// PropertiesCollector used to add properties specific to tables
45
// generated by SstFileWriter
46
class SstFileWriter::SstFileWriterPropertiesCollector
47
    : public IntTblPropCollector {
48
 public:
49
  explicit SstFileWriterPropertiesCollector(int32_t version)
50
9.56k
      : version_(version) {}
51
52
  virtual Status InternalAdd(const Slice& key, const Slice& value,
53
508k
                             uint64_t file_size) override {
54
    // Intentionally left blank. Have no interest in collecting stats for
55
    // individual key/value pairs.
56
508k
    return Status::OK();
57
508k
  }
58
59
9.57k
  Status Finish(UserCollectedProperties* properties) override {
60
9.57k
    std::string version_val;
61
9.57k
    PutFixed32(&version_val, static_cast<int32_t>(version_));
62
9.57k
    properties->insert({ExternalSstFilePropertyNames::kVersion, version_val});
63
9.57k
    return Status::OK();
64
9.57k
  }
65
66
0
  const char* Name() const override {
67
0
    return "SstFileWriterPropertiesCollector";
68
0
  }
69
70
0
  UserCollectedProperties GetReadableProperties() const override {
71
0
    return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}};
72
0
  }
73
74
 private:
75
  int32_t version_;
76
};
77
78
class SstFileWriter::SstFileWriterPropertiesCollectorFactory
79
    : public IntTblPropCollectorFactory {
80
 public:
81
  explicit SstFileWriterPropertiesCollectorFactory(int32_t version)
82
9.56k
      : version_(version) {}
83
84
  virtual IntTblPropCollector* CreateIntTblPropCollector(
85
9.57k
      uint32_t column_family_id) override {
86
9.57k
    return new SstFileWriterPropertiesCollector(version_);
87
9.57k
  }
88
89
0
  const char* Name() const override {
90
0
    return "SstFileWriterPropertiesCollector";
91
0
  }
92
93
 private:
94
  int32_t version_;
95
};
96
97
struct SstFileWriter::Rep {
98
  Rep(const EnvOptions& _env_options, const ImmutableCFOptions& _ioptions,
99
      const Comparator* _user_comparator)
100
      : env_options(_env_options),
101
        ioptions(_ioptions),
102
267
        internal_comparator(std::make_shared<InternalKeyComparator>(_user_comparator)) {}
103
104
  std::unique_ptr<WritableFileWriter> base_file_writer;
105
  std::unique_ptr<WritableFileWriter> data_file_writer;
106
  std::unique_ptr<TableBuilder> builder;
107
  EnvOptions env_options;
108
  ImmutableCFOptions ioptions;
109
  InternalKeyComparatorPtr internal_comparator;
110
  ExternalSstFileInfo file_info;
111
};
112
113
SstFileWriter::SstFileWriter(const EnvOptions& env_options,
114
                             const ImmutableCFOptions& ioptions,
115
                             const Comparator* user_comparator)
116
275
    : rep_(new Rep(env_options, ioptions, user_comparator)) {}
117
118
278
SstFileWriter::~SstFileWriter() { delete rep_; }
119
120
9.56k
Status SstFileWriter::Open(const std::string& file_path) {
121
9.56k
  Rep* r = rep_;
122
9.56k
  Status s;
123
9.56k
  const bool is_split_sst = r->ioptions.table_factory->IsSplitSstForWriteSupported();
124
9.56k
  std::unique_ptr<WritableFile> base_sst_file;
125
9.56k
  std::unique_ptr<WritableFile> data_sst_file;
126
9.56k
  s = r->ioptions.env->NewWritableFile(file_path, &base_sst_file, r->env_options);
127
9.56k
  if (!s.ok()) {
128
0
    return s;
129
0
  }
130
9.57k
  
if (9.56k
is_split_sst9.56k
) {
131
9.57k
    s = r->ioptions.env->NewWritableFile(TableBaseToDataFileName(file_path), &data_sst_file,
132
9.57k
        r->env_options);
133
9.57k
    if (!s.ok()) {
134
0
      return s;
135
0
    }
136
9.57k
  }
137
138
9.56k
  CompressionType compression_type = r->ioptions.compression;
139
9.56k
  if (!r->ioptions.compression_per_level.empty()) {
140
    // Use the compression of the last level if we have per level compression
141
0
    compression_type = *(r->ioptions.compression_per_level.rbegin());
142
0
  }
143
144
9.56k
  IntTblPropCollectorFactories int_tbl_prop_collector_factories;
145
9.56k
  int_tbl_prop_collector_factories.emplace_back(
146
9.56k
      new SstFileWriterPropertiesCollectorFactory(1 /* version */));
147
148
9.56k
  TableBuilderOptions table_builder_options(r->ioptions,
149
9.56k
                                            r->internal_comparator,
150
9.56k
                                            int_tbl_prop_collector_factories,
151
9.56k
                                            compression_type,
152
9.56k
                                            r->ioptions.compression_opts,
153
9.56k
                                            false);
154
9.56k
  r->base_file_writer.reset(
155
9.56k
      new WritableFileWriter(std::move(base_sst_file), r->env_options));
156
9.57k
  if (
is_split_sst9.56k
) {
157
9.57k
    r->data_file_writer.reset(
158
9.57k
        new WritableFileWriter(std::move(data_sst_file), r->env_options));
159
9.57k
  }
160
9.56k
  r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
161
9.56k
      table_builder_options,
162
9.56k
      TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
163
9.56k
      r->base_file_writer.get(), r->data_file_writer.get()));
164
165
9.56k
  r->file_info.file_path = file_path;
166
9.56k
  r->file_info.file_size = 0;
167
9.56k
  r->file_info.base_file_size = 0;
168
9.56k
  r->file_info.is_split_sst = is_split_sst;
169
9.56k
  r->file_info.num_entries = 0;
170
9.56k
  r->file_info.sequence_number = 0;
171
9.56k
  r->file_info.version = 1;
172
9.56k
  return s;
173
9.56k
}
174
175
519k
Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
176
519k
  Rep* r = rep_;
177
519k
  if (!r->builder) {
178
23
    return STATUS(InvalidArgument, "File is not opened");
179
23
  }
180
181
519k
  if (r->file_info.num_entries == 0) {
182
9.57k
    r->file_info.smallest_key = user_key.ToString();
183
510k
  } else {
184
510k
    if (r->internal_comparator->user_comparator()->Compare(
185
510k
            user_key, r->file_info.largest_key) <= 0) {
186
      // Make sure that keys are added in order
187
23
      return STATUS(InvalidArgument, "Keys must be added in order");
188
23
    }
189
510k
  }
190
191
  // update file info
192
519k
  r->file_info.num_entries++;
193
519k
  r->file_info.largest_key = user_key.ToString();
194
519k
  r->file_info.file_size = r->builder->TotalFileSize();
195
196
519k
  InternalKey ikey(user_key, 0 /* Sequence Number */,
197
519k
                   ValueType::kTypeValue /* Put */);
198
519k
  r->builder->Add(ikey.Encode(), value);
199
200
519k
  return Status::OK();
201
519k
}
202
203
9.59k
Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
204
9.59k
  Rep* r = rep_;
205
9.59k
  if (!r->builder) {
206
23
    return STATUS(InvalidArgument, "File is not opened");
207
23
  }
208
9.57k
  if (r->file_info.num_entries == 0) {
209
0
    return STATUS(InvalidArgument, "Cannot create sst file with no entries");
210
0
  }
211
212
9.57k
  Status s = r->builder->Finish();
213
9.57k
  if (s.ok()) {
214
9.57k
    if (!r->ioptions.disable_data_sync) {
215
9.57k
      s = r->base_file_writer->Sync(r->ioptions.use_fsync);
216
9.57k
      if (s.ok() && r->data_file_writer) {
217
9.57k
        s = r->data_file_writer->Sync(r->ioptions.use_fsync);
218
9.57k
      }
219
9.57k
    }
220
9.57k
    if (s.ok()) {
221
9.57k
      s = r->base_file_writer->Close();
222
9.57k
    }
223
9.57k
    if (s.ok() && r->data_file_writer) {
224
9.57k
      s = r->data_file_writer->Close();
225
9.57k
    }
226
9.57k
  } else {
227
0
    r->builder->Abandon();
228
0
  }
229
230
9.57k
  if (!s.ok()) {
231
0
    r->ioptions.env->CleanupFile(r->file_info.file_path);
232
0
  }
233
234
9.57k
  if (s.ok() && file_info != nullptr) {
235
9.34k
    r->file_info.file_size = r->builder->TotalFileSize();
236
9.34k
    r->file_info.base_file_size = r->builder->BaseFileSize();
237
9.34k
    *file_info = r->file_info;
238
9.34k
  }
239
240
9.57k
  r->builder.reset();
241
9.57k
  return s;
242
9.57k
}
243
}  // namespace rocksdb