/Users/deen/code/yugabyte-db/src/yb/util/pb_util.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | // Some portions copyright (C) 2008, Google, inc. |
33 | | // |
34 | | // Utilities for working with protobufs. |
35 | | // Some of this code is cribbed from the protobuf source, |
36 | | // but modified to work with yb's 'faststring' instead of STL strings. |
37 | | |
38 | | #include "yb/util/pb_util.h" |
39 | | |
40 | | #include <memory> |
41 | | #include <string> |
42 | | #include <unordered_set> |
43 | | #include <vector> |
44 | | |
45 | | #include <glog/logging.h> |
46 | | #include <google/protobuf/descriptor.pb.h> |
47 | | #include <google/protobuf/descriptor_database.h> |
48 | | #include <google/protobuf/dynamic_message.h> |
49 | | #include <google/protobuf/io/coded_stream.h> |
50 | | #include <google/protobuf/io/zero_copy_stream.h> |
51 | | #include <google/protobuf/io/zero_copy_stream_impl_lite.h> |
52 | | #include <google/protobuf/message.h> |
53 | | #include <google/protobuf/util/message_differencer.h> |
54 | | |
55 | | #include "yb/gutil/callback.h" |
56 | | #include "yb/gutil/casts.h" |
57 | | #include "yb/gutil/map-util.h" |
58 | | #include "yb/gutil/strings/escaping.h" |
59 | | #include "yb/gutil/strings/fastmem.h" |
60 | | |
61 | | #include "yb/util/coding-inl.h" |
62 | | #include "yb/util/coding.h" |
63 | | #include "yb/util/crc.h" |
64 | | #include "yb/util/debug/sanitizer_scopes.h" |
65 | | #include "yb/util/debug/trace_event.h" |
66 | | #include "yb/util/env.h" |
67 | | #include "yb/util/env_util.h" |
68 | | #include "yb/util/path_util.h" |
69 | | #include "yb/util/pb_util-internal.h" |
70 | | #include "yb/util/pb_util.pb.h" |
71 | | #include "yb/util/result.h" |
72 | | #include "yb/util/status.h" |
73 | | #include "yb/util/status_log.h" |
74 | | |
75 | | using google::protobuf::Descriptor; |
76 | | using google::protobuf::DescriptorPool; |
77 | | using google::protobuf::DynamicMessageFactory; |
78 | | using google::protobuf::FieldDescriptor; |
79 | | using google::protobuf::FileDescriptor; |
80 | | using google::protobuf::FileDescriptorProto; |
81 | | using google::protobuf::FileDescriptorSet; |
82 | | using google::protobuf::io::ArrayInputStream; |
83 | | using google::protobuf::io::CodedInputStream; |
84 | | using google::protobuf::Message; |
85 | | using google::protobuf::MessageLite; |
86 | | using google::protobuf::Reflection; |
87 | | using google::protobuf::SimpleDescriptorDatabase; |
88 | | using yb::crc::Crc; |
89 | | using yb::pb_util::internal::SequentialFileFileInputStream; |
90 | | using yb::pb_util::internal::WritableFileOutputStream; |
91 | | using std::deque; |
92 | | using std::endl; |
93 | | using std::shared_ptr; |
94 | | using std::string; |
95 | | using std::unordered_set; |
96 | | using std::vector; |
97 | | using strings::Substitute; |
98 | | using strings::Utf8SafeCEscape; |
99 | | |
100 | | static const char* const kTmpTemplateSuffix = ".tmp.XXXXXX"; |
101 | | |
102 | | // Protobuf container constants. |
103 | | static const int kPBContainerVersion = 1; |
104 | | static const char kPBContainerMagic[] = "yugacntr"; |
105 | | static const int kPBContainerMagicLen = 8; |
106 | | static const int kPBContainerHeaderLen = |
107 | | // magic number + version |
108 | | kPBContainerMagicLen + sizeof(uint32_t); |
109 | | static const int kPBContainerChecksumLen = sizeof(uint32_t); |
110 | | |
111 | | COMPILE_ASSERT((arraysize(kPBContainerMagic) - 1) == kPBContainerMagicLen, |
112 | | kPBContainerMagic_does_not_match_expected_length); |
113 | | |
114 | | namespace yb { |
115 | | namespace pb_util { |
116 | | |
117 | | namespace { |
118 | | |
119 | | // When serializing, we first compute the byte size, then serialize the message. |
120 | | // If serialization produces a different number of bytes than expected, we |
121 | | // call this function, which crashes. The problem could be due to a bug in the |
122 | | // protobuf implementation but is more likely caused by concurrent modification |
123 | | // of the message. This function attempts to distinguish between the two and |
124 | | // provide a useful error message. |
125 | | void ByteSizeConsistencyError(size_t byte_size_before_serialization, |
126 | | size_t byte_size_after_serialization, |
127 | 0 | size_t bytes_produced_by_serialization) { |
128 | 0 | CHECK_EQ(byte_size_before_serialization, byte_size_after_serialization) |
129 | 0 | << "Protocol message was modified concurrently during serialization."; |
130 | 0 | CHECK_EQ(bytes_produced_by_serialization, byte_size_before_serialization) |
131 | 0 | << "Byte size calculation and serialization were inconsistent. This " |
132 | 0 | "may indicate a bug in protocol buffers or it may be caused by " |
133 | 0 | "concurrent modification of the message."; |
134 | 0 | LOG(FATAL) << "This shouldn't be called if all the sizes are equal."; |
135 | 0 | } |
136 | | |
137 | | string InitializationErrorMessage(const char* action, |
138 | 0 | const MessageLite& message) { |
139 | | // Note: We want to avoid depending on strutil in the lite library, otherwise |
140 | | // we'd use: |
141 | | // |
142 | | // return strings::Substitute( |
143 | | // "Can't $0 message of type \"$1\" because it is missing required " |
144 | | // "fields: $2", |
145 | | // action, message.GetTypeName(), |
146 | | // message.InitializationErrorString()); |
147 | |
|
148 | 0 | string result; |
149 | 0 | result += "Can't "; |
150 | 0 | result += action; |
151 | 0 | result += " message of type \""; |
152 | 0 | result += message.GetTypeName(); |
153 | 0 | result += "\" because it is missing required fields: "; |
154 | 0 | result += message.InitializationErrorString(); |
155 | 0 | return result; |
156 | 0 | } |
157 | | |
158 | 0 | uint8_t* GetUInt8Ptr(const char* buffer) { |
159 | 0 | return pointer_cast<uint8_t*>(const_cast<char*>(buffer)); |
160 | 0 | } |
161 | | |
162 | 14.2M | uint8_t* GetUInt8Ptr(uint8_t* buffer) { |
163 | 14.2M | return buffer; |
164 | 14.2M | } |
165 | | |
166 | | template <class Out> |
167 | 14.2M | void DoAppendPartialToString(const MessageLite &msg, Out* output) { |
168 | 14.2M | auto old_size = output->size(); |
169 | 14.2M | int byte_size = msg.ByteSize(); |
170 | | |
171 | 14.2M | output->resize(old_size + byte_size); |
172 | | |
173 | 14.2M | uint8* start = GetUInt8Ptr(output->data()) + old_size; |
174 | 14.2M | uint8* end = msg.SerializeWithCachedSizesToArray(start); |
175 | 14.2M | if (end - start != byte_size) { |
176 | 0 | ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start); |
177 | 0 | } |
178 | 14.2M | } pb_util.cc:_ZN2yb7pb_util12_GLOBAL__N_123DoAppendPartialToStringINS_10faststringEEEvRKN6google8protobuf11MessageLiteEPT_ Line | Count | Source | 167 | 14.2M | void DoAppendPartialToString(const MessageLite &msg, Out* output) { | 168 | 14.2M | auto old_size = output->size(); | 169 | 14.2M | int byte_size = msg.ByteSize(); | 170 | | | 171 | 14.2M | output->resize(old_size + byte_size); | 172 | | | 173 | 14.2M | uint8* start = GetUInt8Ptr(output->data()) + old_size; | 174 | 14.2M | uint8* end = msg.SerializeWithCachedSizesToArray(start); | 175 | 14.2M | if (end - start != byte_size) { | 176 | 0 | ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start); | 177 | 0 | } | 178 | 14.2M | } |
Unexecuted instantiation: pb_util.cc:_ZN2yb7pb_util12_GLOBAL__N_123DoAppendPartialToStringINSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEEEEvRKN6google8protobuf11MessageLiteEPT_ |
179 | | |
180 | | } // anonymous namespace |
181 | | |
182 | 14.2M | void AppendToString(const MessageLite &msg, faststring *output) { |
183 | 3.42k | DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg); |
184 | 14.2M | AppendPartialToString(msg, output); |
185 | 14.2M | } |
186 | | |
187 | 14.2M | void AppendPartialToString(const MessageLite &msg, faststring* output) { |
188 | 14.2M | DoAppendPartialToString(msg, output); |
189 | 14.2M | } |
190 | | |
191 | 0 | void AppendPartialToString(const MessageLite &msg, std::string* output) { |
192 | 0 | DoAppendPartialToString(msg, output); |
193 | 0 | } |
194 | | |
195 | 236k | void SerializeToString(const MessageLite &msg, faststring *output) { |
196 | 236k | output->clear(); |
197 | 236k | AppendToString(msg, output); |
198 | 236k | } |
199 | | |
200 | 1.80k | bool ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) { |
201 | 1.80k | SequentialFileFileInputStream istream(rfile); |
202 | 1.80k | return msg->ParseFromZeroCopyStream(&istream); |
203 | 1.80k | } |
204 | | |
205 | 2.59M | Status ParseFromArray(MessageLite* msg, const uint8_t* data, size_t length) { |
206 | 2.59M | CodedInputStream in(data, narrow_cast<uint32_t>(length)); |
207 | 2.59M | in.SetTotalBytesLimit(511 * 1024 * 1024, -1); |
208 | | // Parse data into protobuf message |
209 | 2.59M | if (!msg->ParseFromCodedStream(&in)) { |
210 | 0 | return STATUS(Corruption, "Error parsing msg", InitializationErrorMessage("parse", *msg)); |
211 | 0 | } |
212 | 2.59M | return Status::OK(); |
213 | 2.59M | } |
214 | | |
215 | | Status WritePBToPath(Env* env, const std::string& path, |
216 | | const MessageLite& msg, |
217 | 2.54k | SyncMode sync) { |
218 | 2.54k | const string tmp_template = path + kTmpTemplateSuffix; |
219 | 2.54k | string tmp_path; |
220 | | |
221 | 2.54k | std::unique_ptr<WritableFile> file; |
222 | 2.54k | RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file)); |
223 | 2.54k | env_util::ScopedFileDeleter tmp_deleter(env, tmp_path); |
224 | | |
225 | 2.54k | WritableFileOutputStream ostream(file.get()); |
226 | 2.54k | bool res = msg.SerializeToZeroCopyStream(&ostream); |
227 | 2.54k | if (!res || !ostream.Flush()) { |
228 | 0 | return STATUS(IOError, "Unable to serialize PB to file"); |
229 | 0 | } |
230 | | |
231 | 2.54k | if (sync == pb_util::SYNC) { |
232 | 0 | RETURN_NOT_OK_PREPEND(file->Sync(), "Failed to Sync() " + tmp_path); |
233 | 0 | } |
234 | 2.54k | RETURN_NOT_OK_PREPEND(file->Close(), "Failed to Close() " + tmp_path); |
235 | 2.54k | RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), "Failed to rename tmp file to " + path); |
236 | 2.54k | tmp_deleter.Cancel(); |
237 | 2.54k | if (sync == pb_util::SYNC) { |
238 | 0 | RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), "Failed to SyncDir() parent of " + path); |
239 | 0 | } |
240 | 2.54k | return Status::OK(); |
241 | 2.54k | } |
242 | | |
243 | 1.80k | Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) { |
244 | 1.80k | shared_ptr<SequentialFile> rfile; |
245 | 1.80k | RETURN_NOT_OK(env_util::OpenFileForSequential(env, path, &rfile)); |
246 | 1.80k | if (!ParseFromSequentialFile(msg, rfile.get())) { |
247 | 0 | return STATUS(IOError, "Unable to parse PB from path", path); |
248 | 0 | } |
249 | 1.80k | return Status::OK(); |
250 | 1.80k | } |
251 | | |
252 | 0 | static void TruncateString(string* s, size_t max_len) { |
253 | 0 | if (s->size() > max_len) { |
254 | 0 | s->resize(max_len); |
255 | 0 | s->append("<truncated>"); |
256 | 0 | } |
257 | 0 | } |
258 | | |
259 | 0 | void TruncateFields(Message* message, int max_len) { |
260 | 0 | const Reflection* reflection = message->GetReflection(); |
261 | 0 | vector<const FieldDescriptor*> fields; |
262 | 0 | reflection->ListFields(*message, &fields); |
263 | 0 | for (const FieldDescriptor* field : fields) { |
264 | 0 | if (field->is_repeated()) { |
265 | 0 | for (int i = 0; i < reflection->FieldSize(*message, field); i++) { |
266 | 0 | switch (field->cpp_type()) { |
267 | 0 | case FieldDescriptor::CPPTYPE_STRING: { |
268 | 0 | const string& s_const = reflection->GetRepeatedStringReference(*message, field, i, |
269 | 0 | nullptr); |
270 | 0 | TruncateString(const_cast<string*>(&s_const), max_len); |
271 | 0 | break; |
272 | 0 | } |
273 | 0 | case FieldDescriptor::CPPTYPE_MESSAGE: { |
274 | 0 | TruncateFields(reflection->MutableRepeatedMessage(message, field, i), max_len); |
275 | 0 | break; |
276 | 0 | } |
277 | 0 | default: |
278 | 0 | break; |
279 | 0 | } |
280 | 0 | } |
281 | 0 | } else { |
282 | 0 | switch (field->cpp_type()) { |
283 | 0 | case FieldDescriptor::CPPTYPE_STRING: { |
284 | 0 | const string& s_const = reflection->GetStringReference(*message, field, nullptr); |
285 | 0 | TruncateString(const_cast<string*>(&s_const), max_len); |
286 | 0 | break; |
287 | 0 | } |
288 | 0 | case FieldDescriptor::CPPTYPE_MESSAGE: { |
289 | 0 | TruncateFields(reflection->MutableMessage(message, field), max_len); |
290 | 0 | break; |
291 | 0 | } |
292 | 0 | default: |
293 | 0 | break; |
294 | 0 | } |
295 | 0 | } |
296 | 0 | } |
297 | 0 | } |
298 | | |
299 | | WritablePBContainerFile::WritablePBContainerFile(std::unique_ptr<WritableFile> writer) |
300 | | : closed_(false), |
301 | 1.13M | writer_(std::move(writer)) { |
302 | 1.13M | } |
303 | | |
304 | 1.13M | WritablePBContainerFile::~WritablePBContainerFile() { |
305 | 1.13M | WARN_NOT_OK(Close(), "Could not Close() when destroying file"); |
306 | 1.13M | } |
307 | | |
308 | 1.13M | Status WritablePBContainerFile::Init(const Message& msg) { |
309 | 1.13M | DCHECK(!closed_); |
310 | | |
311 | 1.13M | faststring buf; |
312 | 1.13M | buf.resize(kPBContainerHeaderLen); |
313 | | |
314 | | // Serialize the magic. |
315 | 1.13M | strings::memcpy_inlined(buf.data(), kPBContainerMagic, kPBContainerMagicLen); |
316 | 1.13M | size_t offset = kPBContainerMagicLen; |
317 | | |
318 | | // Serialize the version. |
319 | 1.13M | InlineEncodeFixed32(buf.data() + offset, kPBContainerVersion); |
320 | 1.13M | offset += sizeof(uint32_t); |
321 | 0 | DCHECK_EQ(kPBContainerHeaderLen, offset) |
322 | 0 | << "Serialized unexpected number of total bytes"; |
323 | | |
324 | | // Serialize the supplemental header. |
325 | 1.13M | ContainerSupHeaderPB sup_header; |
326 | 1.13M | PopulateDescriptorSet(msg.GetDescriptor()->file(), |
327 | 1.13M | sup_header.mutable_protos()); |
328 | 1.13M | sup_header.set_pb_type(msg.GetTypeName()); |
329 | 1.13M | RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(sup_header, &buf), |
330 | 1.13M | "Failed to prepare supplemental header for writing"); |
331 | | |
332 | | // Write the serialized buffer to the file. |
333 | 1.13M | RETURN_NOT_OK_PREPEND(writer_->Append(buf), |
334 | 1.13M | "Failed to Append() header to file"); |
335 | 1.13M | return Status::OK(); |
336 | 1.13M | } |
337 | | |
338 | 1.13M | Status WritablePBContainerFile::Append(const Message& msg) { |
339 | 1.13M | DCHECK(!closed_); |
340 | | |
341 | 1.13M | faststring buf; |
342 | 1.13M | RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(msg, &buf), |
343 | 1.13M | "Failed to prepare buffer for writing"); |
344 | 1.13M | RETURN_NOT_OK_PREPEND(writer_->Append(buf), "Failed to Append() data to file"); |
345 | | |
346 | 1.13M | return Status::OK(); |
347 | 1.13M | } |
348 | | |
349 | 0 | Status WritablePBContainerFile::Flush() { |
350 | 0 | DCHECK(!closed_); |
351 | | |
352 | | // TODO: Flush just the dirty bytes. |
353 | 0 | RETURN_NOT_OK_PREPEND(writer_->Flush(WritableFile::FLUSH_ASYNC), "Failed to Flush() file"); |
354 | |
|
355 | 0 | return Status::OK(); |
356 | 0 | } |
357 | | |
358 | 1.13M | Status WritablePBContainerFile::Sync() { |
359 | 1.13M | DCHECK(!closed_); |
360 | | |
361 | 1.13M | RETURN_NOT_OK_PREPEND(writer_->Sync(), "Failed to Sync() file"); |
362 | | |
363 | 1.13M | return Status::OK(); |
364 | 1.13M | } |
365 | | |
366 | 2.26M | Status WritablePBContainerFile::Close() { |
367 | 2.26M | if (!closed_) { |
368 | 1.13M | closed_ = true; |
369 | | |
370 | 1.13M | RETURN_NOT_OK_PREPEND(writer_->Close(), "Failed to Close() file"); |
371 | 1.13M | } |
372 | | |
373 | 2.26M | return Status::OK(); |
374 | 2.26M | } |
375 | | |
376 | 2.26M | Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) { |
377 | 643 | DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg); |
378 | 2.26M | int data_size = msg.ByteSize(); |
379 | 2.26M | uint64_t bufsize = sizeof(uint32_t) + data_size + kPBContainerChecksumLen; |
380 | | |
381 | | // Grow the buffer to hold the new data. |
382 | 2.26M | size_t orig_size = buf->size(); |
383 | 2.26M | buf->resize(orig_size + bufsize); |
384 | 2.26M | uint8_t* dst = buf->data() + orig_size; |
385 | | |
386 | | // Serialize the data size. |
387 | 2.26M | InlineEncodeFixed32(dst, static_cast<uint32_t>(data_size)); |
388 | 2.26M | size_t offset = sizeof(uint32_t); |
389 | | |
390 | | // Serialize the data. |
391 | 2.26M | if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + offset))) { |
392 | 0 | return STATUS(IOError, "Failed to serialize PB to array"); |
393 | 0 | } |
394 | 2.26M | offset += data_size; |
395 | | |
396 | | // Calculate and serialize the checksum. |
397 | 2.26M | uint32_t checksum = crc::Crc32c(dst, offset); |
398 | 2.26M | InlineEncodeFixed32(dst + offset, checksum); |
399 | 2.26M | offset += kPBContainerChecksumLen; |
400 | | |
401 | 0 | DCHECK_EQ(bufsize, offset) << "Serialized unexpected number of total bytes"; |
402 | 2.26M | return Status::OK(); |
403 | 2.26M | } |
404 | | |
405 | | void WritablePBContainerFile::PopulateDescriptorSet( |
406 | 1.13M | const FileDescriptor* desc, FileDescriptorSet* output) { |
407 | | // Because we don't compile protobuf with TSAN enabled, copying the |
408 | | // static PB descriptors in this function ends up triggering a lot of |
409 | | // race reports. We suppress the reports, but TSAN still has to walk |
410 | | // the stack, etc, and this function becomes very slow. So, we ignore |
411 | | // TSAN here. |
412 | 1.13M | debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; |
413 | | |
414 | 1.13M | FileDescriptorSet all_descs; |
415 | | |
416 | | // Tracks all schemas that have been added to 'unemitted' at one point |
417 | | // or another. Is a superset of 'unemitted' and only ever grows. |
418 | 1.13M | unordered_set<const FileDescriptor*> processed; |
419 | | |
420 | | // Tracks all remaining unemitted schemas. |
421 | 1.13M | deque<const FileDescriptor*> unemitted; |
422 | | |
423 | 1.13M | InsertOrDie(&processed, desc); |
424 | 1.13M | unemitted.push_front(desc); |
425 | 9.42M | while (!unemitted.empty()) { |
426 | 8.29M | const FileDescriptor* proto = unemitted.front(); |
427 | | |
428 | | // The current schema is emitted iff we've processed (i.e. emitted) all |
429 | | // of its dependencies. |
430 | 8.29M | bool emit = true; |
431 | 20.3M | for (int i = 0; i < proto->dependency_count(); i++) { |
432 | 12.0M | const FileDescriptor* dep = proto->dependency(i); |
433 | 12.0M | if (InsertIfNotPresent(&processed, dep)) { |
434 | 5.22M | unemitted.push_front(dep); |
435 | 5.22M | emit = false; |
436 | 5.22M | } |
437 | 12.0M | } |
438 | 8.29M | if (emit) { |
439 | 6.35M | unemitted.pop_front(); |
440 | 6.35M | proto->CopyTo(all_descs.mutable_file()->Add()); |
441 | 6.35M | } |
442 | 8.29M | } |
443 | 1.13M | all_descs.Swap(output); |
444 | 1.13M | } |
445 | | |
446 | | ReadablePBContainerFile::ReadablePBContainerFile(std::unique_ptr<RandomAccessFile> reader) |
447 | | : offset_(0), |
448 | 197k | reader_(std::move(reader)) { |
449 | 197k | } |
450 | | |
451 | 197k | ReadablePBContainerFile::~ReadablePBContainerFile() { |
452 | 197k | WARN_NOT_OK(Close(), "Could not Close() when destroying file"); |
453 | 197k | } |
454 | | |
455 | 197k | Status ReadablePBContainerFile::Init() { |
456 | | // Read header data. |
457 | 197k | Slice header; |
458 | 197k | std::unique_ptr<uint8_t[]> scratch; |
459 | 197k | RETURN_NOT_OK_PREPEND(ValidateAndRead(kPBContainerHeaderLen, EOF_NOT_OK, &header, &scratch), |
460 | 197k | Substitute("Could not read header for proto container file $0", |
461 | 197k | reader_->filename())); |
462 | | |
463 | | // Validate magic number. |
464 | 197k | if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic, header.data(), kPBContainerMagicLen))) { |
465 | 1 | string file_magic(reinterpret_cast<const char*>(header.data()), kPBContainerMagicLen); |
466 | 1 | return STATUS(Corruption, "Invalid magic number", |
467 | 1 | Substitute("Expected: $0, found: $1", |
468 | 1 | Utf8SafeCEscape(kPBContainerMagic), |
469 | 1 | Utf8SafeCEscape(file_magic))); |
470 | 1 | } |
471 | | |
472 | | // Validate container file version. |
473 | 197k | uint32_t version = DecodeFixed32(header.data() + kPBContainerMagicLen); |
474 | 197k | if (PREDICT_FALSE(version != kPBContainerVersion)) { |
475 | | // We only support version 1. |
476 | 1 | return STATUS(NotSupported, |
477 | 1 | Substitute("Protobuf container has version $0, we only support version $1", |
478 | 1 | version, kPBContainerVersion)); |
479 | 1 | } |
480 | | |
481 | | // Read the supplemental header. |
482 | 197k | ContainerSupHeaderPB sup_header; |
483 | 197k | RETURN_NOT_OK_PREPEND(ReadNextPB(&sup_header), Substitute( |
484 | 197k | "Could not read supplemental header from proto container file $0", |
485 | 197k | reader_->filename())); |
486 | 197k | protos_.reset(sup_header.release_protos()); |
487 | 197k | pb_type_ = sup_header.pb_type(); |
488 | | |
489 | 197k | return Status::OK(); |
490 | 197k | } |
491 | | |
492 | 395k | Status ReadablePBContainerFile::ReadNextPB(Message* msg) { |
493 | 126 | VLOG(1) << "Reading PB from offset " << offset_; |
494 | | |
495 | | // Read the size from the file. EOF here is acceptable: it means we're |
496 | | // out of PB entries. |
497 | 395k | Slice size; |
498 | 395k | std::unique_ptr<uint8_t[]> size_scratch; |
499 | 395k | RETURN_NOT_OK_PREPEND(ValidateAndRead(sizeof(uint32_t), EOF_OK, &size, &size_scratch), |
500 | 395k | Substitute("Could not read data size from proto container file $0", |
501 | 395k | reader_->filename())); |
502 | 395k | uint32_t data_size = DecodeFixed32(size.data()); |
503 | | |
504 | | // Read body into buffer for checksum & parsing. |
505 | 395k | Slice body; |
506 | 395k | std::unique_ptr<uint8_t[]> body_scratch; |
507 | 395k | RETURN_NOT_OK_PREPEND(ValidateAndRead(data_size, EOF_NOT_OK, &body, &body_scratch), |
508 | 395k | Substitute("Could not read body from proto container file $0", |
509 | 395k | reader_->filename())); |
510 | | |
511 | | // Read checksum. |
512 | 395k | uint32_t expected_checksum = 0; |
513 | 395k | { |
514 | 395k | Slice encoded_checksum; |
515 | 395k | std::unique_ptr<uint8_t[]> encoded_checksum_scratch; |
516 | 395k | RETURN_NOT_OK_PREPEND(ValidateAndRead(kPBContainerChecksumLen, EOF_NOT_OK, |
517 | 395k | &encoded_checksum, &encoded_checksum_scratch), |
518 | 395k | Substitute("Could not read checksum from proto container file $0", |
519 | 395k | reader_->filename())); |
520 | 395k | expected_checksum = DecodeFixed32(encoded_checksum.data()); |
521 | 395k | } |
522 | | |
523 | | // Validate CRC32C checksum. |
524 | 395k | Crc* crc32c = crc::GetCrc32cInstance(); |
525 | 395k | uint64_t actual_checksum = 0; |
526 | | // Compute a rolling checksum over the two byte arrays (size, body). |
527 | 395k | crc32c->Compute(size.data(), size.size(), &actual_checksum); |
528 | 395k | crc32c->Compute(body.data(), body.size(), &actual_checksum); |
529 | 395k | if (PREDICT_FALSE(actual_checksum != expected_checksum)) { |
530 | 2 | return STATUS(Corruption, Substitute("Incorrect checksum of file $0: actually $1, expected $2", |
531 | 2 | reader_->filename(), actual_checksum, expected_checksum)); |
532 | 2 | } |
533 | | |
534 | | // The checksum is correct. Time to decode the body. |
535 | | // |
536 | | // We could compare pb_type_ against msg.GetTypeName(), but: |
537 | | // 1. pb_type_ is not available when reading the supplemental header, |
538 | | // 2. ParseFromArray() should fail if the data cannot be parsed into the |
539 | | // provided message type. |
540 | | |
541 | | // To permit parsing of very large PB messages, we must use parse through a |
542 | | // CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs |
543 | | // say that 512MB is the shortest theoretical message length that may produce |
544 | | // integer overflow warnings, so that's what we'll use. |
545 | 395k | ArrayInputStream ais(body.data(), narrow_cast<int>(body.size())); |
546 | 395k | CodedInputStream cis(&ais); |
547 | 395k | cis.SetTotalBytesLimit(512 * 1024 * 1024, -1); |
548 | 395k | if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) { |
549 | 0 | return STATUS(IOError, "Unable to parse PB from path", reader_->filename()); |
550 | 0 | } |
551 | | |
552 | 395k | return Status::OK(); |
553 | 395k | } |
554 | | |
555 | 2 | Status ReadablePBContainerFile::Dump(ostream* os, bool oneline) { |
556 | | // Use the embedded protobuf information from the container file to |
557 | | // create the appropriate kind of protobuf Message. |
558 | | // |
559 | | // Loading the schemas into a DescriptorDatabase (and not directly into |
560 | | // a DescriptorPool) defers resolution until FindMessageTypeByName() |
561 | | // below, allowing for schemas to be loaded in any order. |
562 | 2 | SimpleDescriptorDatabase db; |
563 | 8 | for (int i = 0; i < protos()->file_size(); i++) { |
564 | 6 | if (!db.Add(protos()->file(i))) { |
565 | 0 | return STATUS(Corruption, "Descriptor not loaded", Substitute( |
566 | 0 | "Could not load descriptor for PB type $0 referenced in container file", |
567 | 0 | pb_type())); |
568 | 0 | } |
569 | 6 | } |
570 | 2 | DescriptorPool pool(&db); |
571 | 2 | const Descriptor* desc = pool.FindMessageTypeByName(pb_type()); |
572 | 2 | if (!desc) { |
573 | 0 | return STATUS(NotFound, "Descriptor not found", Substitute( |
574 | 0 | "Could not find descriptor for PB type $0 referenced in container file", |
575 | 0 | pb_type())); |
576 | 0 | } |
577 | 2 | DynamicMessageFactory factory; |
578 | 2 | const Message* prototype = factory.GetPrototype(desc); |
579 | 2 | if (!prototype) { |
580 | 0 | return STATUS(NotSupported, "Descriptor not supported", Substitute( |
581 | 0 | "Descriptor $0 referenced in container file not supported", |
582 | 0 | pb_type())); |
583 | 0 | } |
584 | 2 | std::unique_ptr<Message> msg(prototype->New()); |
585 | | |
586 | | // Dump each message in the container file. |
587 | 2 | int count = 0; |
588 | 2 | Status s; |
589 | 2 | for (s = ReadNextPB(msg.get()); |
590 | 6 | s.ok(); |
591 | 4 | s = ReadNextPB(msg.get())) { |
592 | 4 | if (oneline) { |
593 | 2 | *os << count++ << "\t" << msg->ShortDebugString() << endl; |
594 | 2 | } else { |
595 | 2 | *os << pb_type_ << " " << count << endl; |
596 | 2 | *os << "-------" << endl; |
597 | 2 | *os << msg->DebugString() << endl; |
598 | 2 | count++; |
599 | 2 | } |
600 | 4 | } |
601 | 2 | return s.IsEndOfFile() ? Status::OK() : s; |
602 | 2 | } |
603 | | |
604 | 395k | Status ReadablePBContainerFile::Close() { |
605 | 395k | std::unique_ptr<RandomAccessFile> deleter; |
606 | 395k | deleter.swap(reader_); |
607 | 395k | return Status::OK(); |
608 | 395k | } |
609 | | |
610 | | Status ReadablePBContainerFile::ValidateAndRead(size_t length, EofOK eofOK, |
611 | | Slice* result, |
612 | 1.38M | std::unique_ptr<uint8_t[]>* scratch) { |
613 | | // Validate the read length using the file size. |
614 | 1.38M | uint64_t file_size = VERIFY_RESULT(reader_->Size()); |
615 | 1.38M | if (offset_ + length > file_size) { |
616 | 7 | switch (eofOK) { |
617 | 4 | case EOF_OK: |
618 | 4 | return STATUS(EndOfFile, "Reached end of file"); |
619 | 3 | case EOF_NOT_OK: |
620 | 3 | return STATUS(Corruption, "File size not large enough to be valid", |
621 | 0 | Substitute("Proto container file $0: " |
622 | 0 | "tried to read $0 bytes at offset " |
623 | 0 | "$1 but file size is only $2", |
624 | 0 | reader_->filename(), length, |
625 | 0 | offset_, file_size)); |
626 | 0 | default: |
627 | 0 | LOG(FATAL) << "Unknown value for eofOK: " << eofOK; |
628 | 7 | } |
629 | 7 | } |
630 | | |
631 | | // Perform the read. |
632 | 1.38M | Slice s; |
633 | 1.38M | std::unique_ptr<uint8_t[]> local_scratch(new uint8_t[length]); |
634 | 1.38M | RETURN_NOT_OK(reader_->Read(offset_, length, &s, local_scratch.get())); |
635 | | |
636 | | // Sanity check the result. |
637 | 1.38M | if (PREDICT_FALSE(s.size() < length)) { |
638 | 0 | return STATUS(Corruption, "Unexpected short read", Substitute( |
639 | 0 | "Proto container file $0: tried to read $1 bytes; got $2 bytes", |
640 | 0 | reader_->filename(), length, s.size())); |
641 | 0 | } |
642 | | |
643 | 1.38M | *result = s; |
644 | 1.38M | scratch->swap(local_scratch); |
645 | 1.38M | offset_ += s.size(); |
646 | 1.38M | return Status::OK(); |
647 | 1.38M | } |
648 | | |
649 | | namespace { |
650 | | |
651 | | Status ReadPBContainer( |
652 | 215k | Env* env, const std::string& path, Message* msg, const std::string* pb_type_name = nullptr) { |
653 | 215k | std::unique_ptr<RandomAccessFile> file; |
654 | 215k | RETURN_NOT_OK(env->NewRandomAccessFile(path, &file)); |
655 | | |
656 | 198k | ReadablePBContainerFile pb_file(std::move(file)); |
657 | 198k | RETURN_NOT_OK(pb_file.Init()); |
658 | | |
659 | 198k | if (pb_type_name && pb_file.pb_type() != *pb_type_name) { |
660 | 0 | WARN_NOT_OK(pb_file.Close(), "Could not Close() PB container file"); |
661 | 0 | return STATUS(InvalidArgument, |
662 | 0 | Substitute("Wrong PB type: $0, expected $1", pb_file.pb_type(), *pb_type_name)); |
663 | 0 | } |
664 | | |
665 | 198k | RETURN_NOT_OK(pb_file.ReadNextPB(msg)); |
666 | 198k | return pb_file.Close(); |
667 | 198k | } |
668 | | |
669 | | } // namespace |
670 | | |
671 | 215k | Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* msg) { |
672 | 215k | return ReadPBContainer(env, path, msg); |
673 | 215k | } |
674 | | |
675 | | Status ReadPBContainerFromPath( |
676 | 0 | Env* env, const std::string& path, const std::string& pb_type_name, Message* msg) { |
677 | 0 | return ReadPBContainer(env, path, msg, &pb_type_name); |
678 | 0 | } |
679 | | |
680 | | Status WritePBContainerToPath(Env* env, const std::string& path, |
681 | | const Message& msg, |
682 | | CreateMode create, |
683 | 1.13M | SyncMode sync) { |
684 | 1.13M | TRACE_EVENT2("io", "WritePBContainerToPath", |
685 | 1.13M | "path", path, |
686 | 1.13M | "msg_type", msg.GetTypeName()); |
687 | | |
688 | 1.13M | if (create == NO_OVERWRITE && env->FileExists(path)) { |
689 | 1 | return STATUS(AlreadyPresent, Substitute("File $0 already exists", path)); |
690 | 1 | } |
691 | | |
692 | 1.13M | const string tmp_template = path + kTmpTemplateSuffix; |
693 | 1.13M | string tmp_path; |
694 | | |
695 | 1.13M | std::unique_ptr<WritableFile> file; |
696 | 1.13M | RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file)); |
697 | 1.13M | env_util::ScopedFileDeleter tmp_deleter(env, tmp_path); |
698 | | |
699 | 1.13M | WritablePBContainerFile pb_file(std::move(file)); |
700 | 1.13M | RETURN_NOT_OK(pb_file.Init(msg)); |
701 | 1.13M | RETURN_NOT_OK(pb_file.Append(msg)); |
702 | 1.13M | if (sync == pb_util::SYNC) { |
703 | 1.13M | RETURN_NOT_OK(pb_file.Sync()); |
704 | 1.13M | } |
705 | 1.13M | RETURN_NOT_OK(pb_file.Close()); |
706 | 1.13M | RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), |
707 | 1.13M | "Failed to rename tmp file to " + path); |
708 | 1.13M | tmp_deleter.Cancel(); |
709 | 1.13M | if (sync == pb_util::SYNC) { |
710 | 1.13M | RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), |
711 | 1.13M | "Failed to SyncDir() parent of " + path); |
712 | 1.13M | } |
713 | 1.13M | return Status::OK(); |
714 | 1.13M | } |
715 | | |
716 | | bool ArePBsEqual(const google::protobuf::Message& prev_pb, |
717 | | const google::protobuf::Message& new_pb, |
718 | 437k | std::string* diff_str) { |
719 | 437k | google::protobuf::util::MessageDifferencer md; |
720 | 437k | if (diff_str) { |
721 | 0 | md.ReportDifferencesToString(diff_str); |
722 | 0 | } |
723 | 437k | return md.Compare(prev_pb, new_pb); |
724 | 437k | } |
725 | | |
726 | | } // namespace pb_util |
727 | | } // namespace yb |