/Users/deen/code/yugabyte-db/src/yb/rocksdb/table/sst_file_writer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #include "yb/rocksdb/sst_file_writer.h" |
22 | | |
23 | | #include <stdint.h> |
24 | | |
25 | | #include <limits> |
26 | | #include <string> |
27 | | #include <utility> |
28 | | #include <vector> |
29 | | |
30 | | #include "yb/rocksdb/db/dbformat.h" |
31 | | #include "yb/rocksdb/db/filename.h" |
32 | | #include "yb/rocksdb/table.h" |
33 | | #include "yb/rocksdb/options.h" |
34 | | #include "yb/rocksdb/status.h" |
35 | | #include "yb/rocksdb/table/table_builder.h" |
36 | | #include "yb/rocksdb/util/file_reader_writer.h" |
37 | | #include "yb/util/string_util.h" |
38 | | |
39 | | namespace rocksdb { |
40 | | |
41 | | const char ExternalSstFilePropertyNames::kVersion[] = |
42 | | "rocksdb.external_sst_file.version"; |
43 | | |
44 | | // PropertiesCollector used to add properties specific to tables |
45 | | // generated by SstFileWriter |
46 | | class SstFileWriter::SstFileWriterPropertiesCollector |
47 | | : public IntTblPropCollector { |
48 | | public: |
49 | | explicit SstFileWriterPropertiesCollector(int32_t version) |
50 | 9.56k | : version_(version) {} |
51 | | |
52 | | virtual Status InternalAdd(const Slice& key, const Slice& value, |
53 | 508k | uint64_t file_size) override { |
54 | | // Intentionally left blank. Have no interest in collecting stats for |
55 | | // individual key/value pairs. |
56 | 508k | return Status::OK(); |
57 | 508k | } |
58 | | |
59 | 9.57k | Status Finish(UserCollectedProperties* properties) override { |
60 | 9.57k | std::string version_val; |
61 | 9.57k | PutFixed32(&version_val, static_cast<int32_t>(version_)); |
62 | 9.57k | properties->insert({ExternalSstFilePropertyNames::kVersion, version_val}); |
63 | 9.57k | return Status::OK(); |
64 | 9.57k | } |
65 | | |
66 | 0 | const char* Name() const override { |
67 | 0 | return "SstFileWriterPropertiesCollector"; |
68 | 0 | } |
69 | | |
70 | 0 | UserCollectedProperties GetReadableProperties() const override { |
71 | 0 | return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}}; |
72 | 0 | } |
73 | | |
74 | | private: |
75 | | int32_t version_; |
76 | | }; |
77 | | |
78 | | class SstFileWriter::SstFileWriterPropertiesCollectorFactory |
79 | | : public IntTblPropCollectorFactory { |
80 | | public: |
81 | | explicit SstFileWriterPropertiesCollectorFactory(int32_t version) |
82 | 9.56k | : version_(version) {} |
83 | | |
84 | | virtual IntTblPropCollector* CreateIntTblPropCollector( |
85 | 9.57k | uint32_t column_family_id) override { |
86 | 9.57k | return new SstFileWriterPropertiesCollector(version_); |
87 | 9.57k | } |
88 | | |
89 | 0 | const char* Name() const override { |
90 | 0 | return "SstFileWriterPropertiesCollector"; |
91 | 0 | } |
92 | | |
93 | | private: |
94 | | int32_t version_; |
95 | | }; |
96 | | |
97 | | struct SstFileWriter::Rep { |
98 | | Rep(const EnvOptions& _env_options, const ImmutableCFOptions& _ioptions, |
99 | | const Comparator* _user_comparator) |
100 | | : env_options(_env_options), |
101 | | ioptions(_ioptions), |
102 | 267 | internal_comparator(std::make_shared<InternalKeyComparator>(_user_comparator)) {} |
103 | | |
104 | | std::unique_ptr<WritableFileWriter> base_file_writer; |
105 | | std::unique_ptr<WritableFileWriter> data_file_writer; |
106 | | std::unique_ptr<TableBuilder> builder; |
107 | | EnvOptions env_options; |
108 | | ImmutableCFOptions ioptions; |
109 | | InternalKeyComparatorPtr internal_comparator; |
110 | | ExternalSstFileInfo file_info; |
111 | | }; |
112 | | |
113 | | SstFileWriter::SstFileWriter(const EnvOptions& env_options, |
114 | | const ImmutableCFOptions& ioptions, |
115 | | const Comparator* user_comparator) |
116 | 275 | : rep_(new Rep(env_options, ioptions, user_comparator)) {} |
117 | | |
118 | 278 | SstFileWriter::~SstFileWriter() { delete rep_; } |
119 | | |
120 | 9.56k | Status SstFileWriter::Open(const std::string& file_path) { |
121 | 9.56k | Rep* r = rep_; |
122 | 9.56k | Status s; |
123 | 9.56k | const bool is_split_sst = r->ioptions.table_factory->IsSplitSstForWriteSupported(); |
124 | 9.56k | std::unique_ptr<WritableFile> base_sst_file; |
125 | 9.56k | std::unique_ptr<WritableFile> data_sst_file; |
126 | 9.56k | s = r->ioptions.env->NewWritableFile(file_path, &base_sst_file, r->env_options); |
127 | 9.56k | if (!s.ok()) { |
128 | 0 | return s; |
129 | 0 | } |
130 | 9.57k | if (9.56k is_split_sst9.56k ) { |
131 | 9.57k | s = r->ioptions.env->NewWritableFile(TableBaseToDataFileName(file_path), &data_sst_file, |
132 | 9.57k | r->env_options); |
133 | 9.57k | if (!s.ok()) { |
134 | 0 | return s; |
135 | 0 | } |
136 | 9.57k | } |
137 | | |
138 | 9.56k | CompressionType compression_type = r->ioptions.compression; |
139 | 9.56k | if (!r->ioptions.compression_per_level.empty()) { |
140 | | // Use the compression of the last level if we have per level compression |
141 | 0 | compression_type = *(r->ioptions.compression_per_level.rbegin()); |
142 | 0 | } |
143 | | |
144 | 9.56k | IntTblPropCollectorFactories int_tbl_prop_collector_factories; |
145 | 9.56k | int_tbl_prop_collector_factories.emplace_back( |
146 | 9.56k | new SstFileWriterPropertiesCollectorFactory(1 /* version */)); |
147 | | |
148 | 9.56k | TableBuilderOptions table_builder_options(r->ioptions, |
149 | 9.56k | r->internal_comparator, |
150 | 9.56k | int_tbl_prop_collector_factories, |
151 | 9.56k | compression_type, |
152 | 9.56k | r->ioptions.compression_opts, |
153 | 9.56k | false); |
154 | 9.56k | r->base_file_writer.reset( |
155 | 9.56k | new WritableFileWriter(std::move(base_sst_file), r->env_options)); |
156 | 9.57k | if (is_split_sst9.56k ) { |
157 | 9.57k | r->data_file_writer.reset( |
158 | 9.57k | new WritableFileWriter(std::move(data_sst_file), r->env_options)); |
159 | 9.57k | } |
160 | 9.56k | r->builder.reset(r->ioptions.table_factory->NewTableBuilder( |
161 | 9.56k | table_builder_options, |
162 | 9.56k | TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, |
163 | 9.56k | r->base_file_writer.get(), r->data_file_writer.get())); |
164 | | |
165 | 9.56k | r->file_info.file_path = file_path; |
166 | 9.56k | r->file_info.file_size = 0; |
167 | 9.56k | r->file_info.base_file_size = 0; |
168 | 9.56k | r->file_info.is_split_sst = is_split_sst; |
169 | 9.56k | r->file_info.num_entries = 0; |
170 | 9.56k | r->file_info.sequence_number = 0; |
171 | 9.56k | r->file_info.version = 1; |
172 | 9.56k | return s; |
173 | 9.56k | } |
174 | | |
175 | 519k | Status SstFileWriter::Add(const Slice& user_key, const Slice& value) { |
176 | 519k | Rep* r = rep_; |
177 | 519k | if (!r->builder) { |
178 | 23 | return STATUS(InvalidArgument, "File is not opened"); |
179 | 23 | } |
180 | | |
181 | 519k | if (r->file_info.num_entries == 0) { |
182 | 9.57k | r->file_info.smallest_key = user_key.ToString(); |
183 | 510k | } else { |
184 | 510k | if (r->internal_comparator->user_comparator()->Compare( |
185 | 510k | user_key, r->file_info.largest_key) <= 0) { |
186 | | // Make sure that keys are added in order |
187 | 23 | return STATUS(InvalidArgument, "Keys must be added in order"); |
188 | 23 | } |
189 | 510k | } |
190 | | |
191 | | // update file info |
192 | 519k | r->file_info.num_entries++; |
193 | 519k | r->file_info.largest_key = user_key.ToString(); |
194 | 519k | r->file_info.file_size = r->builder->TotalFileSize(); |
195 | | |
196 | 519k | InternalKey ikey(user_key, 0 /* Sequence Number */, |
197 | 519k | ValueType::kTypeValue /* Put */); |
198 | 519k | r->builder->Add(ikey.Encode(), value); |
199 | | |
200 | 519k | return Status::OK(); |
201 | 519k | } |
202 | | |
203 | 9.59k | Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { |
204 | 9.59k | Rep* r = rep_; |
205 | 9.59k | if (!r->builder) { |
206 | 23 | return STATUS(InvalidArgument, "File is not opened"); |
207 | 23 | } |
208 | 9.57k | if (r->file_info.num_entries == 0) { |
209 | 0 | return STATUS(InvalidArgument, "Cannot create sst file with no entries"); |
210 | 0 | } |
211 | | |
212 | 9.57k | Status s = r->builder->Finish(); |
213 | 9.57k | if (s.ok()) { |
214 | 9.57k | if (!r->ioptions.disable_data_sync) { |
215 | 9.57k | s = r->base_file_writer->Sync(r->ioptions.use_fsync); |
216 | 9.57k | if (s.ok() && r->data_file_writer) { |
217 | 9.57k | s = r->data_file_writer->Sync(r->ioptions.use_fsync); |
218 | 9.57k | } |
219 | 9.57k | } |
220 | 9.57k | if (s.ok()) { |
221 | 9.57k | s = r->base_file_writer->Close(); |
222 | 9.57k | } |
223 | 9.57k | if (s.ok() && r->data_file_writer) { |
224 | 9.57k | s = r->data_file_writer->Close(); |
225 | 9.57k | } |
226 | 9.57k | } else { |
227 | 0 | r->builder->Abandon(); |
228 | 0 | } |
229 | | |
230 | 9.57k | if (!s.ok()) { |
231 | 0 | r->ioptions.env->CleanupFile(r->file_info.file_path); |
232 | 0 | } |
233 | | |
234 | 9.57k | if (s.ok() && file_info != nullptr) { |
235 | 9.34k | r->file_info.file_size = r->builder->TotalFileSize(); |
236 | 9.34k | r->file_info.base_file_size = r->builder->BaseFileSize(); |
237 | 9.34k | *file_info = r->file_info; |
238 | 9.34k | } |
239 | | |
240 | 9.57k | r->builder.reset(); |
241 | 9.57k | return s; |
242 | 9.57k | } |
243 | | } // namespace rocksdb |