YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/pb_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
// Some portions copyright (C) 2008, Google, inc.
33
//
34
// Utilities for working with protobufs.
35
// Some of this code is cribbed from the protobuf source,
36
// but modified to work with yb's 'faststring' instead of STL strings.
37
38
#include "yb/util/pb_util.h"
39
40
#include <memory>
41
#include <string>
42
#include <unordered_set>
43
#include <vector>
44
45
#include <glog/logging.h>
46
#include <google/protobuf/descriptor.pb.h>
47
#include <google/protobuf/descriptor_database.h>
48
#include <google/protobuf/dynamic_message.h>
49
#include <google/protobuf/io/coded_stream.h>
50
#include <google/protobuf/io/zero_copy_stream.h>
51
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
52
#include <google/protobuf/message.h>
53
#include <google/protobuf/util/message_differencer.h>
54
55
#include "yb/gutil/callback.h"
56
#include "yb/gutil/casts.h"
57
#include "yb/gutil/map-util.h"
58
#include "yb/gutil/strings/escaping.h"
59
#include "yb/gutil/strings/fastmem.h"
60
61
#include "yb/util/coding-inl.h"
62
#include "yb/util/coding.h"
63
#include "yb/util/crc.h"
64
#include "yb/util/debug/sanitizer_scopes.h"
65
#include "yb/util/debug/trace_event.h"
66
#include "yb/util/env.h"
67
#include "yb/util/env_util.h"
68
#include "yb/util/path_util.h"
69
#include "yb/util/pb_util-internal.h"
70
#include "yb/util/pb_util.pb.h"
71
#include "yb/util/result.h"
72
#include "yb/util/status.h"
73
#include "yb/util/status_log.h"
74
75
using google::protobuf::Descriptor;
76
using google::protobuf::DescriptorPool;
77
using google::protobuf::DynamicMessageFactory;
78
using google::protobuf::FieldDescriptor;
79
using google::protobuf::FileDescriptor;
80
using google::protobuf::FileDescriptorProto;
81
using google::protobuf::FileDescriptorSet;
82
using google::protobuf::io::ArrayInputStream;
83
using google::protobuf::io::CodedInputStream;
84
using google::protobuf::Message;
85
using google::protobuf::MessageLite;
86
using google::protobuf::Reflection;
87
using google::protobuf::SimpleDescriptorDatabase;
88
using yb::crc::Crc;
89
using yb::pb_util::internal::SequentialFileFileInputStream;
90
using yb::pb_util::internal::WritableFileOutputStream;
91
using std::deque;
92
using std::endl;
93
using std::shared_ptr;
94
using std::string;
95
using std::unordered_set;
96
using std::vector;
97
using strings::Substitute;
98
using strings::Utf8SafeCEscape;
99
100
static const char* const kTmpTemplateSuffix = ".tmp.XXXXXX";
101
102
// Protobuf container constants.
103
static const int kPBContainerVersion = 1;
104
static const char kPBContainerMagic[] = "yugacntr";
105
static const int kPBContainerMagicLen = 8;
106
static const int kPBContainerHeaderLen =
107
    // magic number + version
108
    kPBContainerMagicLen + sizeof(uint32_t);
109
static const int kPBContainerChecksumLen = sizeof(uint32_t);
110
111
COMPILE_ASSERT((arraysize(kPBContainerMagic) - 1) == kPBContainerMagicLen,
112
               kPBContainerMagic_does_not_match_expected_length);
113
114
namespace yb {
115
namespace pb_util {
116
117
namespace {
118
119
// When serializing, we first compute the byte size, then serialize the message.
120
// If serialization produces a different number of bytes than expected, we
121
// call this function, which crashes.  The problem could be due to a bug in the
122
// protobuf implementation but is more likely caused by concurrent modification
123
// of the message.  This function attempts to distinguish between the two and
124
// provide a useful error message.
125
void ByteSizeConsistencyError(size_t byte_size_before_serialization,
126
                              size_t byte_size_after_serialization,
127
0
                              size_t bytes_produced_by_serialization) {
128
0
  CHECK_EQ(byte_size_before_serialization, byte_size_after_serialization)
129
0
      << "Protocol message was modified concurrently during serialization.";
130
0
  CHECK_EQ(bytes_produced_by_serialization, byte_size_before_serialization)
131
0
      << "Byte size calculation and serialization were inconsistent.  This "
132
0
         "may indicate a bug in protocol buffers or it may be caused by "
133
0
         "concurrent modification of the message.";
134
0
  LOG(FATAL) << "This shouldn't be called if all the sizes are equal.";
135
0
}
136
137
string InitializationErrorMessage(const char* action,
138
0
                                  const MessageLite& message) {
139
  // Note:  We want to avoid depending on strutil in the lite library, otherwise
140
  //   we'd use:
141
  //
142
  // return strings::Substitute(
143
  //   "Can't $0 message of type \"$1\" because it is missing required "
144
  //   "fields: $2",
145
  //   action, message.GetTypeName(),
146
  //   message.InitializationErrorString());
147
148
0
  string result;
149
0
  result += "Can't ";
150
0
  result += action;
151
0
  result += " message of type \"";
152
0
  result += message.GetTypeName();
153
0
  result += "\" because it is missing required fields: ";
154
0
  result += message.InitializationErrorString();
155
0
  return result;
156
0
}
157
158
0
uint8_t* GetUInt8Ptr(const char* buffer) {
159
0
  return pointer_cast<uint8_t*>(const_cast<char*>(buffer));
160
0
}
161
162
14.2M
uint8_t* GetUInt8Ptr(uint8_t* buffer) {
163
14.2M
  return buffer;
164
14.2M
}
165
166
template <class Out>
167
14.2M
void DoAppendPartialToString(const MessageLite &msg, Out* output) {
168
14.2M
  auto old_size = output->size();
169
14.2M
  int byte_size = msg.ByteSize();
170
171
14.2M
  output->resize(old_size + byte_size);
172
173
14.2M
  uint8* start = GetUInt8Ptr(output->data()) + old_size;
174
14.2M
  uint8* end = msg.SerializeWithCachedSizesToArray(start);
175
14.2M
  if (end - start != byte_size) {
176
0
    ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start);
177
0
  }
178
14.2M
}
pb_util.cc:_ZN2yb7pb_util12_GLOBAL__N_123DoAppendPartialToStringINS_10faststringEEEvRKN6google8protobuf11MessageLiteEPT_
Line
Count
Source
167
14.2M
void DoAppendPartialToString(const MessageLite &msg, Out* output) {
168
14.2M
  auto old_size = output->size();
169
14.2M
  int byte_size = msg.ByteSize();
170
171
14.2M
  output->resize(old_size + byte_size);
172
173
14.2M
  uint8* start = GetUInt8Ptr(output->data()) + old_size;
174
14.2M
  uint8* end = msg.SerializeWithCachedSizesToArray(start);
175
14.2M
  if (end - start != byte_size) {
176
0
    ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start);
177
0
  }
178
14.2M
}
Unexecuted instantiation: pb_util.cc:_ZN2yb7pb_util12_GLOBAL__N_123DoAppendPartialToStringINSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEEEEvRKN6google8protobuf11MessageLiteEPT_
179
180
} // anonymous namespace
181
182
14.2M
void AppendToString(const MessageLite &msg, faststring *output) {
183
3.42k
  DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
184
14.2M
  AppendPartialToString(msg, output);
185
14.2M
}
186
187
14.2M
void AppendPartialToString(const MessageLite &msg, faststring* output) {
188
14.2M
  DoAppendPartialToString(msg, output);
189
14.2M
}
190
191
0
void AppendPartialToString(const MessageLite &msg, std::string* output) {
192
0
  DoAppendPartialToString(msg, output);
193
0
}
194
195
236k
void SerializeToString(const MessageLite &msg, faststring *output) {
196
236k
  output->clear();
197
236k
  AppendToString(msg, output);
198
236k
}
199
200
1.80k
bool ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) {
201
1.80k
  SequentialFileFileInputStream istream(rfile);
202
1.80k
  return msg->ParseFromZeroCopyStream(&istream);
203
1.80k
}
204
205
2.59M
Status ParseFromArray(MessageLite* msg, const uint8_t* data, size_t length) {
206
2.59M
  CodedInputStream in(data, narrow_cast<uint32_t>(length));
207
2.59M
  in.SetTotalBytesLimit(511 * 1024 * 1024, -1);
208
  // Parse data into protobuf message
209
2.59M
  if (!msg->ParseFromCodedStream(&in)) {
210
0
    return STATUS(Corruption, "Error parsing msg", InitializationErrorMessage("parse", *msg));
211
0
  }
212
2.59M
  return Status::OK();
213
2.59M
}
214
215
Status WritePBToPath(Env* env, const std::string& path,
216
                     const MessageLite& msg,
217
2.54k
                     SyncMode sync) {
218
2.54k
  const string tmp_template = path + kTmpTemplateSuffix;
219
2.54k
  string tmp_path;
220
221
2.54k
  std::unique_ptr<WritableFile> file;
222
2.54k
  RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file));
223
2.54k
  env_util::ScopedFileDeleter tmp_deleter(env, tmp_path);
224
225
2.54k
  WritableFileOutputStream ostream(file.get());
226
2.54k
  bool res = msg.SerializeToZeroCopyStream(&ostream);
227
2.54k
  if (!res || !ostream.Flush()) {
228
0
    return STATUS(IOError, "Unable to serialize PB to file");
229
0
  }
230
231
2.54k
  if (sync == pb_util::SYNC) {
232
0
    RETURN_NOT_OK_PREPEND(file->Sync(), "Failed to Sync() " + tmp_path);
233
0
  }
234
2.54k
  RETURN_NOT_OK_PREPEND(file->Close(), "Failed to Close() " + tmp_path);
235
2.54k
  RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), "Failed to rename tmp file to " + path);
236
2.54k
  tmp_deleter.Cancel();
237
2.54k
  if (sync == pb_util::SYNC) {
238
0
    RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), "Failed to SyncDir() parent of " + path);
239
0
  }
240
2.54k
  return Status::OK();
241
2.54k
}
242
243
1.80k
Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) {
244
1.80k
  shared_ptr<SequentialFile> rfile;
245
1.80k
  RETURN_NOT_OK(env_util::OpenFileForSequential(env, path, &rfile));
246
1.80k
  if (!ParseFromSequentialFile(msg, rfile.get())) {
247
0
    return STATUS(IOError, "Unable to parse PB from path", path);
248
0
  }
249
1.80k
  return Status::OK();
250
1.80k
}
251
252
0
static void TruncateString(string* s, size_t max_len) {
253
0
  if (s->size() > max_len) {
254
0
    s->resize(max_len);
255
0
    s->append("<truncated>");
256
0
  }
257
0
}
258
259
0
void TruncateFields(Message* message, int max_len) {
260
0
  const Reflection* reflection = message->GetReflection();
261
0
  vector<const FieldDescriptor*> fields;
262
0
  reflection->ListFields(*message, &fields);
263
0
  for (const FieldDescriptor* field : fields) {
264
0
    if (field->is_repeated()) {
265
0
      for (int i = 0; i < reflection->FieldSize(*message, field); i++) {
266
0
        switch (field->cpp_type()) {
267
0
          case FieldDescriptor::CPPTYPE_STRING: {
268
0
            const string& s_const = reflection->GetRepeatedStringReference(*message, field, i,
269
0
                                                                           nullptr);
270
0
            TruncateString(const_cast<string*>(&s_const), max_len);
271
0
            break;
272
0
          }
273
0
          case FieldDescriptor::CPPTYPE_MESSAGE: {
274
0
            TruncateFields(reflection->MutableRepeatedMessage(message, field, i), max_len);
275
0
            break;
276
0
          }
277
0
          default:
278
0
            break;
279
0
        }
280
0
      }
281
0
    } else {
282
0
      switch (field->cpp_type()) {
283
0
        case FieldDescriptor::CPPTYPE_STRING: {
284
0
          const string& s_const = reflection->GetStringReference(*message, field, nullptr);
285
0
          TruncateString(const_cast<string*>(&s_const), max_len);
286
0
          break;
287
0
        }
288
0
        case FieldDescriptor::CPPTYPE_MESSAGE: {
289
0
          TruncateFields(reflection->MutableMessage(message, field), max_len);
290
0
          break;
291
0
        }
292
0
        default:
293
0
          break;
294
0
      }
295
0
    }
296
0
  }
297
0
}
298
299
WritablePBContainerFile::WritablePBContainerFile(std::unique_ptr<WritableFile> writer)
300
  : closed_(false),
301
1.13M
    writer_(std::move(writer)) {
302
1.13M
}
303
304
1.13M
WritablePBContainerFile::~WritablePBContainerFile() {
305
1.13M
  WARN_NOT_OK(Close(), "Could not Close() when destroying file");
306
1.13M
}
307
308
1.13M
Status WritablePBContainerFile::Init(const Message& msg) {
309
1.13M
  DCHECK(!closed_);
310
311
1.13M
  faststring buf;
312
1.13M
  buf.resize(kPBContainerHeaderLen);
313
314
  // Serialize the magic.
315
1.13M
  strings::memcpy_inlined(buf.data(), kPBContainerMagic, kPBContainerMagicLen);
316
1.13M
  size_t offset = kPBContainerMagicLen;
317
318
  // Serialize the version.
319
1.13M
  InlineEncodeFixed32(buf.data() + offset, kPBContainerVersion);
320
1.13M
  offset += sizeof(uint32_t);
321
0
  DCHECK_EQ(kPBContainerHeaderLen, offset)
322
0
    << "Serialized unexpected number of total bytes";
323
324
  // Serialize the supplemental header.
325
1.13M
  ContainerSupHeaderPB sup_header;
326
1.13M
  PopulateDescriptorSet(msg.GetDescriptor()->file(),
327
1.13M
                        sup_header.mutable_protos());
328
1.13M
  sup_header.set_pb_type(msg.GetTypeName());
329
1.13M
  RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(sup_header, &buf),
330
1.13M
                        "Failed to prepare supplemental header for writing");
331
332
  // Write the serialized buffer to the file.
333
1.13M
  RETURN_NOT_OK_PREPEND(writer_->Append(buf),
334
1.13M
                        "Failed to Append() header to file");
335
1.13M
  return Status::OK();
336
1.13M
}
337
338
1.13M
Status WritablePBContainerFile::Append(const Message& msg) {
339
1.13M
  DCHECK(!closed_);
340
341
1.13M
  faststring buf;
342
1.13M
  RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(msg, &buf),
343
1.13M
                        "Failed to prepare buffer for writing");
344
1.13M
  RETURN_NOT_OK_PREPEND(writer_->Append(buf), "Failed to Append() data to file");
345
346
1.13M
  return Status::OK();
347
1.13M
}
348
349
0
Status WritablePBContainerFile::Flush() {
350
0
  DCHECK(!closed_);
351
352
  // TODO: Flush just the dirty bytes.
353
0
  RETURN_NOT_OK_PREPEND(writer_->Flush(WritableFile::FLUSH_ASYNC), "Failed to Flush() file");
354
355
0
  return Status::OK();
356
0
}
357
358
1.13M
Status WritablePBContainerFile::Sync() {
359
1.13M
  DCHECK(!closed_);
360
361
1.13M
  RETURN_NOT_OK_PREPEND(writer_->Sync(), "Failed to Sync() file");
362
363
1.13M
  return Status::OK();
364
1.13M
}
365
366
2.26M
Status WritablePBContainerFile::Close() {
367
2.26M
  if (!closed_) {
368
1.13M
    closed_ = true;
369
370
1.13M
    RETURN_NOT_OK_PREPEND(writer_->Close(), "Failed to Close() file");
371
1.13M
  }
372
373
2.26M
  return Status::OK();
374
2.26M
}
375
376
2.26M
Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) {
377
643
  DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
378
2.26M
  int data_size = msg.ByteSize();
379
2.26M
  uint64_t bufsize = sizeof(uint32_t) + data_size + kPBContainerChecksumLen;
380
381
  // Grow the buffer to hold the new data.
382
2.26M
  size_t orig_size = buf->size();
383
2.26M
  buf->resize(orig_size + bufsize);
384
2.26M
  uint8_t* dst = buf->data() + orig_size;
385
386
  // Serialize the data size.
387
2.26M
  InlineEncodeFixed32(dst, static_cast<uint32_t>(data_size));
388
2.26M
  size_t offset = sizeof(uint32_t);
389
390
  // Serialize the data.
391
2.26M
  if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + offset))) {
392
0
    return STATUS(IOError, "Failed to serialize PB to array");
393
0
  }
394
2.26M
  offset += data_size;
395
396
  // Calculate and serialize the checksum.
397
2.26M
  uint32_t checksum = crc::Crc32c(dst, offset);
398
2.26M
  InlineEncodeFixed32(dst + offset, checksum);
399
2.26M
  offset += kPBContainerChecksumLen;
400
401
0
  DCHECK_EQ(bufsize, offset) << "Serialized unexpected number of total bytes";
402
2.26M
  return Status::OK();
403
2.26M
}
404
405
void WritablePBContainerFile::PopulateDescriptorSet(
406
1.13M
    const FileDescriptor* desc, FileDescriptorSet* output) {
407
  // Because we don't compile protobuf with TSAN enabled, copying the
408
  // static PB descriptors in this function ends up triggering a lot of
409
  // race reports. We suppress the reports, but TSAN still has to walk
410
  // the stack, etc, and this function becomes very slow. So, we ignore
411
  // TSAN here.
412
1.13M
  debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
413
414
1.13M
  FileDescriptorSet all_descs;
415
416
  // Tracks all schemas that have been added to 'unemitted' at one point
417
  // or another. Is a superset of 'unemitted' and only ever grows.
418
1.13M
  unordered_set<const FileDescriptor*> processed;
419
420
  // Tracks all remaining unemitted schemas.
421
1.13M
  deque<const FileDescriptor*> unemitted;
422
423
1.13M
  InsertOrDie(&processed, desc);
424
1.13M
  unemitted.push_front(desc);
425
9.42M
  while (!unemitted.empty()) {
426
8.29M
    const FileDescriptor* proto = unemitted.front();
427
428
    // The current schema is emitted iff we've processed (i.e. emitted) all
429
    // of its dependencies.
430
8.29M
    bool emit = true;
431
20.3M
    for (int i = 0; i < proto->dependency_count(); i++) {
432
12.0M
      const FileDescriptor* dep = proto->dependency(i);
433
12.0M
      if (InsertIfNotPresent(&processed, dep)) {
434
5.22M
        unemitted.push_front(dep);
435
5.22M
        emit = false;
436
5.22M
      }
437
12.0M
    }
438
8.29M
    if (emit) {
439
6.35M
      unemitted.pop_front();
440
6.35M
      proto->CopyTo(all_descs.mutable_file()->Add());
441
6.35M
    }
442
8.29M
  }
443
1.13M
  all_descs.Swap(output);
444
1.13M
}
445
446
ReadablePBContainerFile::ReadablePBContainerFile(std::unique_ptr<RandomAccessFile> reader)
447
  : offset_(0),
448
197k
    reader_(std::move(reader)) {
449
197k
}
450
451
197k
ReadablePBContainerFile::~ReadablePBContainerFile() {
452
197k
  WARN_NOT_OK(Close(), "Could not Close() when destroying file");
453
197k
}
454
455
197k
Status ReadablePBContainerFile::Init() {
456
  // Read header data.
457
197k
  Slice header;
458
197k
  std::unique_ptr<uint8_t[]> scratch;
459
197k
  RETURN_NOT_OK_PREPEND(ValidateAndRead(kPBContainerHeaderLen, EOF_NOT_OK, &header, &scratch),
460
197k
                        Substitute("Could not read header for proto container file $0",
461
197k
                                   reader_->filename()));
462
463
  // Validate magic number.
464
197k
  if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic, header.data(), kPBContainerMagicLen))) {
465
1
    string file_magic(reinterpret_cast<const char*>(header.data()), kPBContainerMagicLen);
466
1
    return STATUS(Corruption, "Invalid magic number",
467
1
                              Substitute("Expected: $0, found: $1",
468
1
                                         Utf8SafeCEscape(kPBContainerMagic),
469
1
                                         Utf8SafeCEscape(file_magic)));
470
1
  }
471
472
  // Validate container file version.
473
197k
  uint32_t version = DecodeFixed32(header.data() + kPBContainerMagicLen);
474
197k
  if (PREDICT_FALSE(version != kPBContainerVersion)) {
475
    // We only support version 1.
476
1
    return STATUS(NotSupported,
477
1
        Substitute("Protobuf container has version $0, we only support version $1",
478
1
                   version, kPBContainerVersion));
479
1
  }
480
481
  // Read the supplemental header.
482
197k
  ContainerSupHeaderPB sup_header;
483
197k
  RETURN_NOT_OK_PREPEND(ReadNextPB(&sup_header), Substitute(
484
197k
      "Could not read supplemental header from proto container file $0",
485
197k
      reader_->filename()));
486
197k
  protos_.reset(sup_header.release_protos());
487
197k
  pb_type_ = sup_header.pb_type();
488
489
197k
  return Status::OK();
490
197k
}
491
492
395k
Status ReadablePBContainerFile::ReadNextPB(Message* msg) {
493
126
  VLOG(1) << "Reading PB from offset " << offset_;
494
495
  // Read the size from the file. EOF here is acceptable: it means we're
496
  // out of PB entries.
497
395k
  Slice size;
498
395k
  std::unique_ptr<uint8_t[]> size_scratch;
499
395k
  RETURN_NOT_OK_PREPEND(ValidateAndRead(sizeof(uint32_t), EOF_OK, &size, &size_scratch),
500
395k
                        Substitute("Could not read data size from proto container file $0",
501
395k
                                   reader_->filename()));
502
395k
  uint32_t data_size = DecodeFixed32(size.data());
503
504
  // Read body into buffer for checksum & parsing.
505
395k
  Slice body;
506
395k
  std::unique_ptr<uint8_t[]> body_scratch;
507
395k
  RETURN_NOT_OK_PREPEND(ValidateAndRead(data_size, EOF_NOT_OK, &body, &body_scratch),
508
395k
                        Substitute("Could not read body from proto container file $0",
509
395k
                                   reader_->filename()));
510
511
  // Read checksum.
512
395k
  uint32_t expected_checksum = 0;
513
395k
  {
514
395k
    Slice encoded_checksum;
515
395k
    std::unique_ptr<uint8_t[]> encoded_checksum_scratch;
516
395k
    RETURN_NOT_OK_PREPEND(ValidateAndRead(kPBContainerChecksumLen, EOF_NOT_OK,
517
395k
                                          &encoded_checksum, &encoded_checksum_scratch),
518
395k
                          Substitute("Could not read checksum from proto container file $0",
519
395k
                                     reader_->filename()));
520
395k
    expected_checksum = DecodeFixed32(encoded_checksum.data());
521
395k
  }
522
523
  // Validate CRC32C checksum.
524
395k
  Crc* crc32c = crc::GetCrc32cInstance();
525
395k
  uint64_t actual_checksum = 0;
526
  // Compute a rolling checksum over the two byte arrays (size, body).
527
395k
  crc32c->Compute(size.data(), size.size(), &actual_checksum);
528
395k
  crc32c->Compute(body.data(), body.size(), &actual_checksum);
529
395k
  if (PREDICT_FALSE(actual_checksum != expected_checksum)) {
530
2
    return STATUS(Corruption, Substitute("Incorrect checksum of file $0: actually $1, expected $2",
531
2
                                         reader_->filename(), actual_checksum, expected_checksum));
532
2
  }
533
534
  // The checksum is correct. Time to decode the body.
535
  //
536
  // We could compare pb_type_ against msg.GetTypeName(), but:
537
  // 1. pb_type_ is not available when reading the supplemental header,
538
  // 2. ParseFromArray() should fail if the data cannot be parsed into the
539
  //    provided message type.
540
541
  // To permit parsing of very large PB messages, we must use parse through a
542
  // CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs
543
  // say that 512MB is the shortest theoretical message length that may produce
544
  // integer overflow warnings, so that's what we'll use.
545
395k
  ArrayInputStream ais(body.data(), narrow_cast<int>(body.size()));
546
395k
  CodedInputStream cis(&ais);
547
395k
  cis.SetTotalBytesLimit(512 * 1024 * 1024, -1);
548
395k
  if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) {
549
0
    return STATUS(IOError, "Unable to parse PB from path", reader_->filename());
550
0
  }
551
552
395k
  return Status::OK();
553
395k
}
554
555
2
Status ReadablePBContainerFile::Dump(ostream* os, bool oneline) {
556
  // Use the embedded protobuf information from the container file to
557
  // create the appropriate kind of protobuf Message.
558
  //
559
  // Loading the schemas into a DescriptorDatabase (and not directly into
560
  // a DescriptorPool) defers resolution until FindMessageTypeByName()
561
  // below, allowing for schemas to be loaded in any order.
562
2
  SimpleDescriptorDatabase db;
563
8
  for (int i = 0; i < protos()->file_size(); i++) {
564
6
    if (!db.Add(protos()->file(i))) {
565
0
      return STATUS(Corruption, "Descriptor not loaded", Substitute(
566
0
          "Could not load descriptor for PB type $0 referenced in container file",
567
0
          pb_type()));
568
0
    }
569
6
  }
570
2
  DescriptorPool pool(&db);
571
2
  const Descriptor* desc = pool.FindMessageTypeByName(pb_type());
572
2
  if (!desc) {
573
0
    return STATUS(NotFound, "Descriptor not found", Substitute(
574
0
        "Could not find descriptor for PB type $0 referenced in container file",
575
0
        pb_type()));
576
0
  }
577
2
  DynamicMessageFactory factory;
578
2
  const Message* prototype = factory.GetPrototype(desc);
579
2
  if (!prototype) {
580
0
    return STATUS(NotSupported, "Descriptor not supported", Substitute(
581
0
        "Descriptor $0 referenced in container file not supported",
582
0
        pb_type()));
583
0
  }
584
2
  std::unique_ptr<Message> msg(prototype->New());
585
586
  // Dump each message in the container file.
587
2
  int count = 0;
588
2
  Status s;
589
2
  for (s = ReadNextPB(msg.get());
590
6
      s.ok();
591
4
      s = ReadNextPB(msg.get())) {
592
4
    if (oneline) {
593
2
      *os << count++ << "\t" << msg->ShortDebugString() << endl;
594
2
    } else {
595
2
      *os << pb_type_ << " " << count << endl;
596
2
      *os << "-------" << endl;
597
2
      *os << msg->DebugString() << endl;
598
2
      count++;
599
2
    }
600
4
  }
601
2
  return s.IsEndOfFile() ? Status::OK() : s;
602
2
}
603
604
395k
Status ReadablePBContainerFile::Close() {
605
395k
  std::unique_ptr<RandomAccessFile> deleter;
606
395k
  deleter.swap(reader_);
607
395k
  return Status::OK();
608
395k
}
609
610
Status ReadablePBContainerFile::ValidateAndRead(size_t length, EofOK eofOK,
611
                                                Slice* result,
612
1.38M
                                                std::unique_ptr<uint8_t[]>* scratch) {
613
  // Validate the read length using the file size.
614
1.38M
  uint64_t file_size = VERIFY_RESULT(reader_->Size());
615
1.38M
  if (offset_ + length > file_size) {
616
7
    switch (eofOK) {
617
4
      case EOF_OK:
618
4
        return STATUS(EndOfFile, "Reached end of file");
619
3
      case EOF_NOT_OK:
620
3
        return STATUS(Corruption, "File size not large enough to be valid",
621
0
                                  Substitute("Proto container file $0: "
622
0
                                      "tried to read $0 bytes at offset "
623
0
                                      "$1 but file size is only $2",
624
0
                                      reader_->filename(), length,
625
0
                                      offset_, file_size));
626
0
      default:
627
0
        LOG(FATAL) << "Unknown value for eofOK: " << eofOK;
628
7
    }
629
7
  }
630
631
  // Perform the read.
632
1.38M
  Slice s;
633
1.38M
  std::unique_ptr<uint8_t[]> local_scratch(new uint8_t[length]);
634
1.38M
  RETURN_NOT_OK(reader_->Read(offset_, length, &s, local_scratch.get()));
635
636
  // Sanity check the result.
637
1.38M
  if (PREDICT_FALSE(s.size() < length)) {
638
0
    return STATUS(Corruption, "Unexpected short read", Substitute(
639
0
        "Proto container file $0: tried to read $1 bytes; got $2 bytes",
640
0
        reader_->filename(), length, s.size()));
641
0
  }
642
643
1.38M
  *result = s;
644
1.38M
  scratch->swap(local_scratch);
645
1.38M
  offset_ += s.size();
646
1.38M
  return Status::OK();
647
1.38M
}
648
649
namespace {
650
651
Status ReadPBContainer(
652
215k
    Env* env, const std::string& path, Message* msg, const std::string* pb_type_name = nullptr) {
653
215k
  std::unique_ptr<RandomAccessFile> file;
654
215k
  RETURN_NOT_OK(env->NewRandomAccessFile(path, &file));
655
656
198k
  ReadablePBContainerFile pb_file(std::move(file));
657
198k
  RETURN_NOT_OK(pb_file.Init());
658
659
198k
  if (pb_type_name && pb_file.pb_type() != *pb_type_name) {
660
0
    WARN_NOT_OK(pb_file.Close(), "Could not Close() PB container file");
661
0
    return STATUS(InvalidArgument,
662
0
                  Substitute("Wrong PB type: $0, expected $1", pb_file.pb_type(), *pb_type_name));
663
0
  }
664
665
198k
  RETURN_NOT_OK(pb_file.ReadNextPB(msg));
666
198k
  return pb_file.Close();
667
198k
}
668
669
} // namespace
670
671
215k
Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* msg) {
672
215k
  return ReadPBContainer(env, path, msg);
673
215k
}
674
675
Status ReadPBContainerFromPath(
676
0
    Env* env, const std::string& path, const std::string& pb_type_name, Message* msg) {
677
0
  return ReadPBContainer(env, path, msg, &pb_type_name);
678
0
}
679
680
Status WritePBContainerToPath(Env* env, const std::string& path,
681
                              const Message& msg,
682
                              CreateMode create,
683
1.13M
                              SyncMode sync) {
684
1.13M
  TRACE_EVENT2("io", "WritePBContainerToPath",
685
1.13M
               "path", path,
686
1.13M
               "msg_type", msg.GetTypeName());
687
688
1.13M
  if (create == NO_OVERWRITE && env->FileExists(path)) {
689
1
    return STATUS(AlreadyPresent, Substitute("File $0 already exists", path));
690
1
  }
691
692
1.13M
  const string tmp_template = path + kTmpTemplateSuffix;
693
1.13M
  string tmp_path;
694
695
1.13M
  std::unique_ptr<WritableFile> file;
696
1.13M
  RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file));
697
1.13M
  env_util::ScopedFileDeleter tmp_deleter(env, tmp_path);
698
699
1.13M
  WritablePBContainerFile pb_file(std::move(file));
700
1.13M
  RETURN_NOT_OK(pb_file.Init(msg));
701
1.13M
  RETURN_NOT_OK(pb_file.Append(msg));
702
1.13M
  if (sync == pb_util::SYNC) {
703
1.13M
    RETURN_NOT_OK(pb_file.Sync());
704
1.13M
  }
705
1.13M
  RETURN_NOT_OK(pb_file.Close());
706
1.13M
  RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path),
707
1.13M
                        "Failed to rename tmp file to " + path);
708
1.13M
  tmp_deleter.Cancel();
709
1.13M
  if (sync == pb_util::SYNC) {
710
1.13M
    RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)),
711
1.13M
                          "Failed to SyncDir() parent of " + path);
712
1.13M
  }
713
1.13M
  return Status::OK();
714
1.13M
}
715
716
bool ArePBsEqual(const google::protobuf::Message& prev_pb,
717
                 const google::protobuf::Message& new_pb,
718
437k
                 std::string* diff_str) {
719
437k
  google::protobuf::util::MessageDifferencer md;
720
437k
  if (diff_str) {
721
0
    md.ReportDifferencesToString(diff_str);
722
0
  }
723
437k
  return md.Compare(prev_pb, new_pb);
724
437k
}
725
726
} // namespace pb_util
727
} // namespace yb