/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/log_writer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/db/log_writer.h" |
25 | | |
26 | | #include <stdint.h> |
27 | | #include "yb/rocksdb/env.h" |
28 | | #include "yb/rocksdb/util/coding.h" |
29 | | #include "yb/rocksdb/util/crc32c.h" |
30 | | #include "yb/rocksdb/util/file_reader_writer.h" |
31 | | |
32 | | namespace rocksdb { |
33 | | namespace log { |
34 | | |
35 | | Writer::Writer(unique_ptr<WritableFileWriter>&& dest, |
36 | | uint64_t log_number, bool recycle_log_files) |
37 | | : dest_(std::move(dest)), |
38 | | block_offset_(0), |
39 | | log_number_(log_number), |
40 | 1.16M | recycle_log_files_(recycle_log_files) { |
41 | 11.6M | for (int i = 0; i <= kMaxRecordType; i++10.4M ) { |
42 | 10.4M | char t = static_cast<char>(i); |
43 | 10.4M | type_crc_[i] = crc32c::Value(&t, 1); |
44 | 10.4M | } |
45 | 1.16M | } |
46 | | |
47 | 1.11M | Writer::~Writer() { |
48 | 1.11M | } |
49 | | |
50 | 20.2M | Status Writer::AddRecord(const Slice& slice) { |
51 | 20.2M | const char* ptr = slice.cdata(); |
52 | 20.2M | size_t left = slice.size(); |
53 | | |
54 | | // Header size varies depending on whether we are recycling or not. |
55 | 20.2M | const int header_size = |
56 | 20.2M | recycle_log_files_ ? kRecyclableHeaderSize127k : kHeaderSize20.0M ; |
57 | | |
58 | | // Fragment the record if necessary and emit it. Note that if slice |
59 | | // is empty, we still want to iterate once to emit a single |
60 | | // zero-length record |
61 | 20.2M | Status s; |
62 | 20.2M | bool begin = true; |
63 | 20.8M | do { |
64 | 20.8M | const int64_t leftover = kBlockSize - block_offset_; |
65 | 20.8M | assert(leftover >= 0); |
66 | 20.8M | if (leftover < header_size) { |
67 | | // Switch to a new block |
68 | 611k | if (leftover > 0) { |
69 | | // Fill the trailer (literal below relies on kHeaderSize and |
70 | | // kRecyclableHeaderSize being <= 11) |
71 | 1.51k | assert(header_size <= 11); |
72 | 1.51k | RETURN_NOT_OK(dest_->Append( |
73 | 1.51k | Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", leftover))); |
74 | 1.51k | } |
75 | 611k | block_offset_ = 0; |
76 | 611k | } |
77 | | |
78 | | // Invariant: we never leave < header_size bytes in a block. |
79 | 20.8M | assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size); |
80 | | |
81 | 0 | const size_t avail = kBlockSize - block_offset_ - header_size; |
82 | 20.8M | const size_t fragment_length = (left < avail) ? left20.2M : avail610k ; |
83 | | |
84 | 20.8M | RecordType type; |
85 | 20.8M | const bool end = (left == fragment_length); |
86 | 20.8M | if (begin && end20.2M ) { |
87 | 19.8M | type = recycle_log_files_ ? kRecyclableFullType116k : kFullType19.7M ; |
88 | 19.8M | } else if (1.00M begin1.00M ) { |
89 | 398k | type = recycle_log_files_ ? kRecyclableFirstType11.2k : kFirstType386k ; |
90 | 607k | } else if (end) { |
91 | 398k | type = recycle_log_files_ ? kRecyclableLastType11.2k : kLastType386k ; |
92 | 398k | } else { |
93 | 209k | type = recycle_log_files_ ? kRecyclableMiddleType7.20k : kMiddleType202k ; |
94 | 209k | } |
95 | | |
96 | 20.8M | s = EmitPhysicalRecord(type, ptr, fragment_length); |
97 | 20.8M | ptr += fragment_length; |
98 | 20.8M | left -= fragment_length; |
99 | 20.8M | begin = false; |
100 | 20.8M | } while (s.ok()20.8M && left > 0); |
101 | 20.2M | return s; |
102 | 20.2M | } |
103 | | |
104 | 20.8M | Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { |
105 | 20.8M | assert(n <= 0xffff); // Must fit in two bytes |
106 | | |
107 | 0 | size_t header_size; |
108 | 20.8M | char buf[kRecyclableHeaderSize]; |
109 | | |
110 | | // Format the header |
111 | 20.8M | buf[4] = static_cast<char>(n & 0xff); |
112 | 20.8M | buf[5] = static_cast<char>(n >> 8); |
113 | 20.8M | buf[6] = static_cast<char>(t); |
114 | | |
115 | 20.8M | uint32_t crc = type_crc_[t]; |
116 | 20.8M | if (t < kRecyclableFullType) { |
117 | | // Legacy record format |
118 | 20.6M | assert(block_offset_ + kHeaderSize + n <= kBlockSize); |
119 | 0 | header_size = kHeaderSize; |
120 | 20.6M | } else { |
121 | | // Recyclable record format |
122 | 145k | assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize); |
123 | 0 | header_size = kRecyclableHeaderSize; |
124 | | |
125 | | // Only encode low 32-bits of the 64-bit log number. This means |
126 | | // we will fail to detect an old record if we recycled a log from |
127 | | // ~4 billion logs ago, but that is effectively impossible, and |
128 | | // even if it were we'd be far more likely to see a false positive |
129 | | // on the 32-bit CRC. |
130 | 145k | EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_)); |
131 | 145k | crc = crc32c::Extend(crc, buf + 7, 4); |
132 | 145k | } |
133 | | |
134 | | // Compute the crc of the record type and the payload. |
135 | 0 | crc = crc32c::Extend(crc, ptr, n); |
136 | 20.8M | crc = crc32c::Mask(crc); // Adjust for storage |
137 | 20.8M | EncodeFixed32(buf, crc); |
138 | | |
139 | | // Write the header and the payload |
140 | 20.8M | Status s = dest_->Append(Slice(buf, header_size)); |
141 | 20.8M | if (s.ok()) { |
142 | 20.8M | s = dest_->Append(Slice(ptr, n)); |
143 | 20.8M | if (s.ok()20.8M ) { |
144 | 20.8M | s = dest_->Flush(); |
145 | 20.8M | } |
146 | 20.8M | } |
147 | 20.8M | block_offset_ += header_size + n; |
148 | 20.8M | return s; |
149 | 20.8M | } |
150 | | |
151 | | } // namespace log |
152 | | } // namespace rocksdb |