/Users/deen/code/yugabyte-db/src/yb/rpc/compressed_stream.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/rpc/compressed_stream.h" |
15 | | |
16 | | #include <lz4.h> |
17 | | #include <snappy-sinksource.h> |
18 | | #include <snappy.h> |
19 | | #include <zlib.h> |
20 | | |
21 | | #include <boost/preprocessor/cat.hpp> |
22 | | #include <boost/range/iterator_range.hpp> |
23 | | |
24 | | #include "yb/gutil/casts.h" |
25 | | |
26 | | #include "yb/rpc/circular_read_buffer.h" |
27 | | #include "yb/rpc/outbound_data.h" |
28 | | #include "yb/rpc/refined_stream.h" |
29 | | |
30 | | #include "yb/util/logging.h" |
31 | | #include "yb/util/result.h" |
32 | | #include "yb/util/size_literals.h" |
33 | | #include "yb/util/status_format.h" |
34 | | |
35 | | using namespace std::literals; |
36 | | |
37 | | DEFINE_int32(stream_compression_algo, 0, "Algorithm used for stream compression. " |
38 | | "0 - no compression, 1 - gzip, 2 - snappy, 3 - lz4."); |
39 | | |
40 | | namespace yb { |
41 | | namespace rpc { |
42 | | |
43 | | using SmallRefCntBuffers = boost::container::small_vector_base<RefCntBuffer>; |
44 | | |
45 | | namespace { |
46 | | |
47 | | class Compressor { |
48 | | public: |
49 | | virtual std::string ToString() const = 0; |
50 | | |
51 | | // Initialize compressor, required since we don't use exceptions to return error from ctor. |
52 | | virtual CHECKED_STATUS Init() = 0; |
53 | | |
54 | | // Compress specified vector of input buffers into single output buffer. |
55 | | virtual CHECKED_STATUS Compress( |
56 | | const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) = 0; |
57 | | |
58 | | // Decompress specified input slice to specified output buffer. |
59 | | virtual Result<ReadBufferFull> Decompress(StreamReadBuffer* inp, StreamReadBuffer* out) = 0; |
60 | | |
61 | | // Connection header associated with this compressor. |
62 | | virtual OutboundDataPtr ConnectionHeader() = 0; |
63 | | |
64 | 284 | virtual ~Compressor() = default; |
65 | | }; |
66 | | |
67 | 40.0k | size_t EntrySize(const RefCntBuffer& buffer) { |
68 | 40.0k | return buffer.size(); |
69 | 40.0k | } |
70 | | |
71 | 214k | size_t EntrySize(const iovec& iov) { |
72 | 214k | return iov.iov_len; |
73 | 214k | } |
74 | | |
75 | 13.7k | char* EntryData(const RefCntBuffer& buffer) { |
76 | 13.7k | return buffer.data(); |
77 | 13.7k | } |
78 | | |
79 | 104k | char* EntryData(const iovec& iov) { |
80 | 104k | return static_cast<char*>(iov.iov_base); |
81 | 104k | } |
82 | | |
83 | | template <class Collection> |
84 | 9.38k | size_t TotalLen(const Collection& input) { |
85 | 9.38k | size_t result = 0; |
86 | 9.74k | for (const auto& buf : input) { |
87 | 9.74k | result += EntrySize(buf); |
88 | 9.74k | } |
89 | 9.38k | return result; |
90 | 9.38k | } Unexecuted instantiation: compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_18TotalLenIN5boost9container17small_vector_baseINS_12RefCntBufferEvvEEEEmRKT_ compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_18TotalLenIN5boost14iterator_rangeINS3_9container12vec_iteratorIPNS_12RefCntBufferELb1EEEEEEEmRKT_ Line | Count | Source | 84 | 4.14k | size_t TotalLen(const Collection& input) { | 85 | 4.14k | size_t result = 0; | 86 | 4.14k | for (const auto& buf : input) { | 87 | 4.14k | result += EntrySize(buf); | 88 | 4.14k | } | 89 | 4.14k | return result; | 90 | 4.14k | } |
compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_18TotalLenIN5boost14iterator_rangeINS3_9container12vec_iteratorIP5iovecLb1EEEEEEEmRKT_ Line | Count | Source | 84 | 5.23k | size_t TotalLen(const Collection& input) { | 85 | 5.23k | size_t result = 0; | 86 | 5.59k | for (const auto& buf : input) { | 87 | 5.59k | result += EntrySize(buf); | 88 | 5.59k | } | 89 | 5.23k | return result; | 90 | 5.23k | } |
|
91 | | |
92 | | template <class Compressor> |
93 | 166 | OutboundDataPtr GetConnectionHeader() { |
94 | | // Compressed stream header has signature YBx, where x - compressor identifier. |
95 | 166 | static auto result = std::make_shared<StringOutboundData>( |
96 | 166 | "YB"s + Compressor::kId, Compressor::kId + "ConnectionHeader"s); |
97 | 166 | return result; |
98 | 166 | } compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_119GetConnectionHeaderINS1_14ZlibCompressorEEENSt3__110shared_ptrINS0_12OutboundDataEEEv Line | Count | Source | 93 | 96 | OutboundDataPtr GetConnectionHeader() { | 94 | | // Compressed stream header has signature YBx, where x - compressor identifier. | 95 | 96 | static auto result = std::make_shared<StringOutboundData>( | 96 | 96 | "YB"s + Compressor::kId, Compressor::kId + "ConnectionHeader"s); | 97 | 96 | return result; | 98 | 96 | } |
compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_119GetConnectionHeaderINS1_16SnappyCompressorEEENSt3__110shared_ptrINS0_12OutboundDataEEEv Line | Count | Source | 93 | 35 | OutboundDataPtr GetConnectionHeader() { | 94 | | // Compressed stream header has signature YBx, where x - compressor identifier. | 95 | 35 | static auto result = std::make_shared<StringOutboundData>( | 96 | 35 | "YB"s + Compressor::kId, Compressor::kId + "ConnectionHeader"s); | 97 | 35 | return result; | 98 | 35 | } |
compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_119GetConnectionHeaderINS1_13LZ4CompressorEEENSt3__110shared_ptrINS0_12OutboundDataEEEv Line | Count | Source | 93 | 35 | OutboundDataPtr GetConnectionHeader() { | 94 | | // Compressed stream header has signature YBx, where x - compressor identifier. | 95 | 35 | static auto result = std::make_shared<StringOutboundData>( | 96 | 35 | "YB"s + Compressor::kId, Compressor::kId + "ConnectionHeader"s); | 97 | 35 | return result; | 98 | 35 | } |
|
99 | | |
100 | | template <class SliceDecompressor> |
101 | | Result<ReadBufferFull> DecompressBySlices( |
102 | 10.1k | StreamReadBuffer* inp, StreamReadBuffer* out, const SliceDecompressor& slice_decompressor) { |
103 | 10.1k | size_t consumed = 0; |
104 | 10.1k | auto out_vecs = VERIFY_RESULT(out->PrepareAppend()); |
105 | 10.1k | auto out_it = out_vecs.begin(); |
106 | 10.1k | size_t appended = 0; |
107 | | |
108 | 10.1k | for (const auto& iov : inp->AppendedVecs()) { |
109 | 10.1k | Slice slice(static_cast<char*>(iov.iov_base), iov.iov_len); |
110 | 11.2k | for (;;) { |
111 | 11.2k | if (out_it->iov_len == 0) { |
112 | 1.09k | if (++out_it == out_vecs.end()) { |
113 | 1.09k | break; |
114 | 1.09k | } |
115 | 10.1k | } |
116 | 10.1k | size_t len = VERIFY_RESULT(slice_decompressor(&slice, out_it->iov_base, out_it->iov_len)); |
117 | 10.1k | appended += len; |
118 | 10.1k | IoVecRemovePrefix(len, &*out_it); |
119 | 10.1k | if (slice.empty()) { |
120 | 9.03k | break; |
121 | 9.03k | } |
122 | 10.1k | } |
123 | 10.1k | consumed += iov.iov_len - slice.size(); |
124 | 10.1k | if (!slice.empty()) { |
125 | 1.09k | break; |
126 | 1.09k | } |
127 | 10.1k | } |
128 | 10.1k | out->DataAppended(appended); |
129 | 10.1k | inp->Consume(consumed, Slice()); |
130 | 10.1k | return ReadBufferFull(out->Full()); |
131 | 10.1k | } |
132 | | |
133 | | class ZlibCompressor : public Compressor { |
134 | | public: |
135 | | static const char kId = 'G'; |
136 | | static const int kIndex = 1; |
137 | | |
138 | 192 | explicit ZlibCompressor(MemTrackerPtr mem_tracker) { |
139 | 192 | } |
140 | | |
141 | 144 | ~ZlibCompressor() { |
142 | 144 | if (deflate_inited_) { |
143 | 144 | int res = deflateEnd(&deflate_stream_); |
144 | 0 | LOG_IF(WARNING, res != Z_OK && res != Z_DATA_ERROR) |
145 | 0 | << "Failed to destroy deflate stream: " << res; |
146 | 144 | } |
147 | 144 | if (inflate_inited_) { |
148 | 144 | int res = inflateEnd(&inflate_stream_); |
149 | 0 | LOG_IF(WARNING, res != Z_OK) << "Failed to destroy inflate stream: " << res; |
150 | 144 | } |
151 | 144 | } |
152 | | |
153 | 96 | OutboundDataPtr ConnectionHeader() override { |
154 | 96 | return GetConnectionHeader<ZlibCompressor>(); |
155 | 96 | } |
156 | | |
157 | 192 | CHECKED_STATUS Init() override { |
158 | 192 | memset(&deflate_stream_, 0, sizeof(deflate_stream_)); |
159 | 192 | int res = deflateInit(&deflate_stream_, /* level= */ Z_DEFAULT_COMPRESSION); |
160 | 192 | if (res != Z_OK) { |
161 | 0 | return STATUS_FORMAT(RuntimeError, "Cannot init deflate stream: $0", res); |
162 | 0 | } |
163 | 192 | deflate_inited_ = true; |
164 | | |
165 | 192 | memset(&inflate_stream_, 0, sizeof(inflate_stream_)); |
166 | 192 | res = inflateInit(&inflate_stream_); |
167 | 192 | if (res != Z_OK) { |
168 | 0 | return STATUS_FORMAT(RuntimeError, "Cannot init inflate stream: $0", res); |
169 | 0 | } |
170 | 192 | inflate_inited_ = true; |
171 | | |
172 | 192 | return Status::OK(); |
173 | 192 | } |
174 | | |
175 | 0 | std::string ToString() const override { |
176 | 0 | return "Zlib"; |
177 | 0 | } |
178 | | |
179 | | CHECKED_STATUS Compress( |
180 | 0 | const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) override { |
181 | 0 | RefCntBuffer output(deflateBound(&deflate_stream_, TotalLen(input))); |
182 | 0 | deflate_stream_.avail_out = static_cast<unsigned int>(output.size()); |
183 | 0 | deflate_stream_.next_out = output.udata(); |
184 | |
|
185 | 0 | for (auto it = input.begin(); it != input.end();) { |
186 | 0 | const auto& buf = *it++; |
187 | 0 | deflate_stream_.next_in = const_cast<Bytef*>(buf.udata()); |
188 | 0 | deflate_stream_.avail_in = static_cast<unsigned int>(buf.size()); |
189 | |
|
190 | 0 | for (;;) { |
191 | 0 | auto res = deflate(&deflate_stream_, it == input.end() ? Z_PARTIAL_FLUSH : Z_NO_FLUSH); |
192 | 0 | if (res == Z_STREAM_END) { |
193 | 0 | if (deflate_stream_.avail_in != 0) { |
194 | 0 | return STATUS_FORMAT( |
195 | 0 | RuntimeError, "Stream end when input data still available: $0", |
196 | 0 | deflate_stream_.avail_in); |
197 | 0 | } |
198 | 0 | break; |
199 | 0 | } |
200 | 0 | if (res != Z_OK) { |
201 | 0 | return STATUS_FORMAT(RuntimeError, "Compression failed: $0", res); |
202 | 0 | } |
203 | 0 | if (deflate_stream_.avail_in == 0) { |
204 | 0 | break; |
205 | 0 | } |
206 | 0 | } |
207 | 0 | } |
208 | |
|
209 | 0 | output.Shrink(deflate_stream_.next_out - output.udata()); |
210 | | |
211 | | // Send compressed data to underlying stream. |
212 | 0 | return stream->SendToLower(std::make_shared<SingleBufferOutboundData>( |
213 | 0 | std::move(output), std::move(data))); |
214 | 0 | } |
215 | | |
216 | 10.1k | Result<ReadBufferFull> Decompress(StreamReadBuffer* inp, StreamReadBuffer* out) override { |
217 | 10.1k | return DecompressBySlices( |
218 | 10.1k | inp, out, [this](Slice* input, void* out, size_t outlen) -> Result<size_t> { |
219 | 10.1k | inflate_stream_.next_in = const_cast<Bytef*>(pointer_cast<const Bytef*>(input->data())); |
220 | 10.1k | inflate_stream_.avail_in = narrow_cast<uInt>(input->size()); |
221 | 10.1k | inflate_stream_.next_out = static_cast<Bytef*>(out); |
222 | 10.1k | inflate_stream_.avail_out = narrow_cast<uInt>(outlen); |
223 | | |
224 | 10.1k | int res = inflate(&inflate_stream_, Z_NO_FLUSH); |
225 | 10.1k | if (res != Z_OK && res != Z_BUF_ERROR) { |
226 | 0 | return STATUS_FORMAT(RuntimeError, "Decompression failed: $0", res); |
227 | 0 | } |
228 | | |
229 | 10.1k | input->remove_prefix(input->size() - inflate_stream_.avail_in); |
230 | 10.1k | return outlen - inflate_stream_.avail_out; |
231 | 10.1k | }); |
232 | 10.1k | } |
233 | | |
234 | | private: |
235 | | z_stream deflate_stream_; |
236 | | z_stream inflate_stream_; |
237 | | bool deflate_inited_ = false; |
238 | | bool inflate_inited_ = false; |
239 | | }; |
240 | | |
241 | | // Source implementation that provides input from range of buffers. |
242 | | template <class It> |
243 | | class RangeSource : public snappy::Source { |
244 | | public: |
245 | | explicit RangeSource(const It& begin, const It& end) |
246 | | : current_(begin), end_(end), |
247 | 9.38k | available_(TotalLen(boost::make_iterator_range(begin, end))) {} compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_111RangeSourceIN5boost9container12vec_iteratorIPNS_12RefCntBufferELb1EEEEC2ERKS8_SB_ Line | Count | Source | 247 | 4.14k | available_(TotalLen(boost::make_iterator_range(begin, end))) {} |
compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_111RangeSourceIN5boost9container12vec_iteratorIP5iovecLb1EEEEC2ERKS8_SB_ Line | Count | Source | 247 | 5.23k | available_(TotalLen(boost::make_iterator_range(begin, end))) {} |
|
248 | | |
249 | 42.8k | void Limit(size_t value) { |
250 | 42.8k | available_ = value; |
251 | 42.8k | } compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_111RangeSourceIN5boost9container12vec_iteratorIPNS_12RefCntBufferELb1EEEE5LimitEm Line | Count | Source | 249 | 13.7k | void Limit(size_t value) { | 250 | 13.7k | available_ = value; | 251 | 13.7k | } |
compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_111RangeSourceIN5boost9container12vec_iteratorIP5iovecLb1EEEE5LimitEm Line | Count | Source | 249 | 29.1k | void Limit(size_t value) { | 250 | 29.1k | available_ = value; | 251 | 29.1k | } |
|
252 | | |
253 | 64.8k | size_t Available() const override { |
254 | 64.8k | return available_; |
255 | 64.8k | } compressed_stream.cc:_ZNK2yb3rpc12_GLOBAL__N_111RangeSourceIN5boost9container12vec_iteratorIPNS_12RefCntBufferELb1EEEE9AvailableEv Line | Count | Source | 253 | 31.6k | size_t Available() const override { | 254 | 31.6k | return available_; | 255 | 31.6k | } |
compressed_stream.cc:_ZNK2yb3rpc12_GLOBAL__N_111RangeSourceIN5boost9container12vec_iteratorIP5iovecLb1EEEE9AvailableEv Line | Count | Source | 253 | 33.2k | size_t Available() const override { | 254 | 33.2k | return available_; | 255 | 33.2k | } |
|
256 | | |
257 | 132k | const char* Peek(size_t* len) override { |
258 | 132k | if (available_ == 0) { |
259 | 13.7k | *len = 0; |
260 | 13.7k | return nullptr; |
261 | 13.7k | } |
262 | 118k | *len = std::min(EntrySize(*current_) - current_pos_, available_); |
263 | 118k | return EntryData(*current_) + current_pos_; |
264 | 118k | } compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_111RangeSourceIN5boost9container12vec_iteratorIPNS_12RefCntBufferELb1EEEE4PeekEPm Line | Count | Source | 257 | 13.7k | const char* Peek(size_t* len) override { | 258 | 13.7k | if (available_ == 0) { | 259 | 0 | *len = 0; | 260 | 0 | return nullptr; | 261 | 0 | } | 262 | 13.7k | *len = std::min(EntrySize(*current_) - current_pos_, available_); | 263 | 13.7k | return EntryData(*current_) + current_pos_; | 264 | 13.7k | } |
compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_111RangeSourceIN5boost9container12vec_iteratorIP5iovecLb1EEEE4PeekEPm Line | Count | Source | 257 | 118k | const char* Peek(size_t* len) override { | 258 | 118k | if (available_ == 0) { | 259 | 13.7k | *len = 0; | 260 | 13.7k | return nullptr; | 261 | 13.7k | } | 262 | 104k | *len = std::min(EntrySize(*current_) - current_pos_, available_); | 263 | 104k | return EntryData(*current_) + current_pos_; | 264 | 104k | } |
|
265 | | |
266 | 13.7k | void Rewind(size_t n) { |
267 | 13.7k | available_ += n; |
268 | 13.7k | for (;;) { |
269 | 13.7k | if (current_pos_ >= n) { |
270 | 13.7k | current_pos_ -= n; |
271 | 13.7k | break; |
272 | 13.7k | } |
273 | 0 | n -= current_pos_; |
274 | 0 | --current_; |
275 | 0 | current_pos_ = EntrySize(*current_); |
276 | 0 | } |
277 | 13.7k | } |
278 | | |
279 | 169k | void Skip(size_t n) override { |
280 | 169k | if (!n) { |
281 | 51.2k | return; |
282 | 51.2k | } |
283 | 118k | available_ -= n; |
284 | 118k | if ((current_pos_ += n) >= EntrySize(*current_)) { |
285 | 8.44k | current_pos_ = 0; |
286 | 8.44k | ++current_; |
287 | 0 | DCHECK(available_ == 0 ? current_ == end_ : current_ < end_) |
288 | 0 | << "Available: " << available_ << ", left buffers: " << std::distance(current_, end_) |
289 | 0 | << ", n: " << n; |
290 | 8.44k | } |
291 | 118k | } compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_111RangeSourceIN5boost9container12vec_iteratorIPNS_12RefCntBufferELb1EEEE4SkipEm Line | Count | Source | 279 | 13.7k | void Skip(size_t n) override { | 280 | 13.7k | if (!n) { | 281 | 0 | return; | 282 | 0 | } | 283 | 13.7k | available_ -= n; | 284 | 13.7k | if ((current_pos_ += n) >= EntrySize(*current_)) { | 285 | 4.14k | current_pos_ = 0; | 286 | 4.14k | ++current_; | 287 | 0 | DCHECK(available_ == 0 ? current_ == end_ : current_ < end_) | 288 | 0 | << "Available: " << available_ << ", left buffers: " << std::distance(current_, end_) | 289 | 0 | << ", n: " << n; | 290 | 4.14k | } | 291 | 13.7k | } |
compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_111RangeSourceIN5boost9container12vec_iteratorIP5iovecLb1EEEE4SkipEm Line | Count | Source | 279 | 155k | void Skip(size_t n) override { | 280 | 155k | if (!n) { | 281 | 51.2k | return; | 282 | 51.2k | } | 283 | 104k | available_ -= n; | 284 | 104k | if ((current_pos_ += n) >= EntrySize(*current_)) { | 285 | 4.29k | current_pos_ = 0; | 286 | 4.29k | ++current_; | 287 | 0 | DCHECK(available_ == 0 ? current_ == end_ : current_ < end_) | 288 | 0 | << "Available: " << available_ << ", left buffers: " << std::distance(current_, end_) | 289 | 0 | << ", n: " << n; | 290 | 4.29k | } | 291 | 104k | } |
|
292 | | |
293 | | private: |
294 | | It current_; // Current buffer that we are reading. |
295 | | It end_; |
296 | | size_t current_pos_ = 0; // Position in current buffer. |
297 | | size_t available_; // How many bytes left in all buffers. |
298 | | }; |
299 | | |
300 | | // Sink implementation that provides output to io vecs. |
301 | | class IoVecsSink : public snappy::Sink { |
302 | | public: |
303 | 0 | explicit IoVecsSink(IoVecs* out) : out_(out->begin()) {} |
304 | | |
305 | 19.5k | size_t total_appended() const { |
306 | 19.5k | return total_appended_; |
307 | 19.5k | } |
308 | | |
309 | 13.7k | void Append(const char* bytes, size_t n) override { |
310 | 13.7k | total_appended_ += n; |
311 | | |
312 | 13.7k | if (bytes == out_->iov_base) { |
313 | 13.7k | if (out_->iov_len == n) { |
314 | 54 | ++out_; |
315 | 13.6k | } else { |
316 | 13.6k | IoVecRemovePrefix(n, &*out_); |
317 | 13.6k | } |
318 | 13.7k | return; |
319 | 13.7k | } |
320 | | |
321 | 0 | while (n > 0) { |
322 | 0 | size_t current_len = out_->iov_len; |
323 | 0 | if (current_len >= n) { |
324 | 0 | memcpy(out_->iov_base, bytes, n); |
325 | 0 | IoVecRemovePrefix(n, &*out_); |
326 | 0 | n = 0; |
327 | 0 | } else { |
328 | 0 | memcpy(out_->iov_base, bytes, current_len); |
329 | 0 | bytes += current_len; |
330 | 0 | n -= current_len; |
331 | 0 | ++out_; |
332 | 0 | } |
333 | 0 | } |
334 | 0 | } |
335 | | |
336 | | char* GetAppendBufferVariable( |
337 | | size_t min_size, size_t desired_size_hint, char* scratch, |
338 | 13.7k | size_t scratch_size, size_t* allocated_size) override { |
339 | 13.7k | if (min_size <= out_->iov_len) { |
340 | 13.7k | *allocated_size = out_->iov_len; |
341 | 13.7k | return static_cast<char*>(out_->iov_base); |
342 | 13.7k | } |
343 | 0 | return Sink::GetAppendBufferVariable( |
344 | 0 | min_size, desired_size_hint, scratch, scratch_size, allocated_size); |
345 | 0 | } |
346 | | |
347 | | private: |
348 | | IoVecs::iterator out_; |
349 | | size_t total_appended_ = 0; |
350 | | }; |
351 | | |
352 | | // Binary search to find max value so that max_compressed_len(value) fits into header_len bytes. |
353 | | template <class F> |
354 | 34.4k | static size_t FindMaxChunkSize(size_t header_len, const F& max_compressed_len) { |
355 | 34.4k | size_t max_value = (1ULL << (8 * header_len)) - 1; |
356 | 34.4k | size_t l = 1; |
357 | 34.4k | size_t r = max_value; |
358 | 586k | while (r > l) { |
359 | 551k | size_t m = (l + r + 1) / 2; |
360 | 551k | if (implicit_cast<size_t>(max_compressed_len(narrow_cast<int>(m))) > max_value) { |
361 | 206k | r = m - 1; |
362 | 344k | } else { |
363 | 344k | l = m; |
364 | 344k | } |
365 | 551k | } |
366 | 34.4k | return l; |
367 | 34.4k | } compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_116FindMaxChunkSizeIPFmmEEEmmRKT_ Line | Count | Source | 354 | 17.2k | static size_t FindMaxChunkSize(size_t header_len, const F& max_compressed_len) { | 355 | 17.2k | size_t max_value = (1ULL << (8 * header_len)) - 1; | 356 | 17.2k | size_t l = 1; | 357 | 17.2k | size_t r = max_value; | 358 | 293k | while (r > l) { | 359 | 275k | size_t m = (l + r + 1) / 2; | 360 | 275k | if (implicit_cast<size_t>(max_compressed_len(narrow_cast<int>(m))) > max_value) { | 361 | 120k | r = m - 1; | 362 | 155k | } else { | 363 | 155k | l = m; | 364 | 155k | } | 365 | 275k | } | 366 | 17.2k | return l; | 367 | 17.2k | } |
compressed_stream.cc:_ZN2yb3rpc12_GLOBAL__N_116FindMaxChunkSizeIPFiiEEEmmRKT_ Line | Count | Source | 354 | 17.2k | static size_t FindMaxChunkSize(size_t header_len, const F& max_compressed_len) { | 355 | 17.2k | size_t max_value = (1ULL << (8 * header_len)) - 1; | 356 | 17.2k | size_t l = 1; | 357 | 17.2k | size_t r = max_value; | 358 | 293k | while (r > l) { | 359 | 275k | size_t m = (l + r + 1) / 2; | 360 | 275k | if (implicit_cast<size_t>(max_compressed_len(narrow_cast<int>(m))) > max_value) { | 361 | 86.1k | r = m - 1; | 362 | 189k | } else { | 363 | 189k | l = m; | 364 | 189k | } | 365 | 275k | } | 366 | 17.2k | return l; | 367 | 17.2k | } |
|
368 | | |
369 | | // Snappy does not support stream compression. |
370 | | // So we have to add block len, to start decompression only from block start. |
371 | | // We use 2 bytes for block len, so have to limit uncompressed block so that in worst case |
372 | | // compressed block size would fit into 2 bytes. |
373 | | constexpr size_t kSnappyHeaderLen = 2; |
374 | | const size_t kSnappyMaxChunkSize = FindMaxChunkSize(kSnappyHeaderLen, &snappy::MaxCompressedLength); |
375 | | |
376 | | class SnappyCompressor : public Compressor { |
377 | | public: |
378 | | static constexpr char kId = 'S'; |
379 | | static constexpr int kIndex = 2; |
380 | | static constexpr size_t kHeaderLen = kSnappyHeaderLen; |
381 | | |
382 | 70 | explicit SnappyCompressor(MemTrackerPtr mem_tracker) { |
383 | 70 | } |
384 | | |
385 | 70 | ~SnappyCompressor() { |
386 | 70 | } |
387 | | |
388 | 35 | OutboundDataPtr ConnectionHeader() override { |
389 | 35 | return GetConnectionHeader<SnappyCompressor>(); |
390 | 35 | } |
391 | | |
392 | 70 | CHECKED_STATUS Init() override { |
393 | 70 | return Status::OK(); |
394 | 70 | } |
395 | | |
396 | 0 | std::string ToString() const override { |
397 | 0 | return "Snappy"; |
398 | 0 | } |
399 | | |
400 | | CHECKED_STATUS Compress( |
401 | 0 | const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) override { |
402 | 0 | RangeSource<SmallRefCntBuffers::const_iterator> source(input.begin(), input.end()); |
403 | 0 | auto input_size = source.Available(); |
404 | 0 | bool stop = false; |
405 | 0 | while (!stop) { |
406 | | // Split input into chunks of size kSnappyMaxChunkSize or less. |
407 | 0 | if (input_size > kSnappyMaxChunkSize) { |
408 | 0 | source.Limit(kSnappyMaxChunkSize); |
409 | 0 | input_size -= kSnappyMaxChunkSize; |
410 | 0 | } else { |
411 | 0 | source.Limit(input_size); |
412 | 0 | stop = true; |
413 | 0 | } |
414 | 0 | RefCntBuffer output(kHeaderLen + snappy::MaxCompressedLength(source.Available())); |
415 | 0 | snappy::UncheckedByteArraySink sink(output.data() + kHeaderLen); |
416 | 0 | auto compressed_len = snappy::Compress(&source, &sink); |
417 | 0 | BigEndian::Store16(output.data(), compressed_len); |
418 | 0 | output.Shrink(kHeaderLen + compressed_len); |
419 | 0 | RETURN_NOT_OK(stream->SendToLower(std::make_shared<SingleBufferOutboundData>( |
420 | 0 | std::move(output), |
421 | | // We processed last buffer, attach data to it, so it will be notified when this buffer |
422 | | // is transferred. |
423 | 0 | stop ? std::move(data) : nullptr))); |
424 | 0 | } |
425 | 0 | return Status::OK(); |
426 | 0 | } |
427 | | |
428 | 5.23k | Result<ReadBufferFull> Decompress(StreamReadBuffer* inp, StreamReadBuffer* out) override { |
429 | 5.23k | auto inp_vecs = inp->AppendedVecs(); |
430 | 5.23k | RangeSource<IoVecs::const_iterator> source(inp_vecs.begin(), inp_vecs.end()); |
431 | | |
432 | 5.23k | auto total_consumed = 0; |
433 | 5.23k | auto out_vecs = VERIFY_RESULT(out->PrepareAppend()); |
434 | 5.23k | auto total_output = IoVecsFullSize(out_vecs); |
435 | 5.23k | IoVecsSink sink(&out_vecs); |
436 | | |
437 | 5.23k | size_t input_size = source.Available(); |
438 | 5.23k | auto result = ReadBufferFull::kFalse; |
439 | 18.9k | while (total_consumed + kHeaderLen <= input_size) { |
440 | 14.8k | source.Limit(input_size - total_consumed); |
441 | | // Fetch chunk len. |
442 | 14.8k | size_t len = 0; |
443 | 14.8k | auto data = source.Peek(&len); |
444 | 14.8k | size_t size; |
445 | 14.8k | if (len >= kHeaderLen) { |
446 | | // Size fully contained in the first buffer. |
447 | 14.8k | size = BigEndian::Load16(data); |
448 | 14.8k | source.Skip(kHeaderLen); |
449 | 0 | } else { |
450 | | // Size distributed between 2 blocks. |
451 | | // Since blocks are not empty and size exactly 2, we should expect that first block |
452 | | // contains 1 byte of size. |
453 | 0 | if (len != 1) { |
454 | 0 | return STATUS(RuntimeError, "Expected to peek at least one byte"); |
455 | 0 | } |
456 | 0 | char buf[kHeaderLen]; |
457 | 0 | buf[0] = *data; |
458 | 0 | source.Skip(1); |
459 | 0 | data = source.Peek(&len); |
460 | 0 | buf[1] = *data; |
461 | 0 | source.Skip(1); |
462 | 0 | size = BigEndian::Load16(buf); |
463 | 0 | } |
464 | | // Check whether we already received full chunk. |
465 | 0 | VLOG_WITH_FUNC(4) |
466 | 0 | << "Total consumed: " << total_consumed << ", size: " << size << ", input: " |
467 | 0 | << input_size; |
468 | 14.8k | if (total_consumed + kHeaderLen + size > input_size) { |
469 | 600 | break; |
470 | 600 | } |
471 | 14.2k | source.Limit(size); |
472 | 14.2k | uint32_t length = 0; |
473 | 14.2k | auto old_available = source.Available(); |
474 | 14.2k | if (!snappy::GetUncompressedLength(&source, &length)) { |
475 | 0 | return STATUS(RuntimeError, "GetUncompressedLength failed"); |
476 | 0 | } |
477 | | |
478 | | // Check if we have space for decompressed block. |
479 | 14.2k | VLOG_WITH_FUNC(4) |
480 | 0 | << "Total appended: " << sink.total_appended() << ", length: " << length |
481 | 0 | << ", total_output: " << total_output; |
482 | 14.2k | if (sink.total_appended() + length > total_output) { |
483 | 524 | result = ReadBufferFull::kTrue; |
484 | 524 | break; |
485 | 524 | } |
486 | | // Rollback to state before GetUncompressedLength. |
487 | 13.7k | source.Rewind(old_available - source.Available()); |
488 | 13.7k | if (!snappy::Uncompress(&source, &sink)) { |
489 | 0 | return STATUS(RuntimeError, "Decompression failed"); |
490 | 0 | } |
491 | 13.7k | total_consumed += kHeaderLen + size; |
492 | 13.7k | } |
493 | | |
494 | 5.23k | out->DataAppended(sink.total_appended()); |
495 | 5.23k | inp->Consume(total_consumed, Slice()); |
496 | | |
497 | 5.23k | return result; |
498 | 5.23k | } |
499 | | }; |
500 | | |
501 | | // LZ4 could compress/decompress only continuous block of memory into similar block. |
502 | | // So we do that same as for Snappy, but decompression is even more complex. |
503 | | constexpr size_t kLZ4HeaderLen = 2; |
504 | | const size_t kLZ4MaxChunkSize = FindMaxChunkSize(kLZ4HeaderLen, &LZ4_compressBound); |
505 | | const size_t kLZ4BufferSize = 64_KB; |
506 | | |
507 | | class LZ4DecompressState { |
508 | | public: |
509 | | LZ4DecompressState(char* input_buffer, char* output_buffer, Slice* prev_decompress_data_left) |
510 | | : input_buffer_(input_buffer), output_buffer_(output_buffer), |
511 | 5.06k | prev_decompress_data_left_(prev_decompress_data_left) {} |
512 | | |
513 | 5.06k | Result<ReadBufferFull> Execute(StreamReadBuffer* inp, StreamReadBuffer* out) { |
514 | 5.06k | outvecs_ = VERIFY_RESULT(out->PrepareAppend()); |
515 | 5.06k | out_it_ = outvecs_.begin(); |
516 | | |
517 | | // Check if we previously decompressed some data that did not fit into output buffer. |
518 | | // So copy it now. See DecompressChunk for details. |
519 | 5.55k | while (!prev_decompress_data_left_->empty()) { |
520 | 507 | auto len = std::min(prev_decompress_data_left_->size(), out_it_->iov_len); |
521 | 507 | memcpy(out_it_->iov_base, prev_decompress_data_left_->data(), len); |
522 | 507 | total_add_ += len; |
523 | 507 | IoVecRemovePrefix(len, &*out_it_); |
524 | 507 | prev_decompress_data_left_->remove_prefix(len); |
525 | 507 | if (out_it_->iov_len == 0) { |
526 | 17 | if (++out_it_ == outvecs_.end()) { |
527 | 17 | out->DataAppended(total_add_); |
528 | 17 | return ReadBufferFull(out->Full()); |
529 | 17 | } |
530 | 17 | } |
531 | 507 | } |
532 | | |
533 | | // Remaining piece of data in the previous vec. |
534 | 5.04k | Slice prev_input_slice; |
535 | 5.24k | for (const auto& input_vec : inp->AppendedVecs()) { |
536 | 5.24k | Slice input_slice(static_cast<char*>(input_vec.iov_base), input_vec.iov_len); |
537 | 5.24k | if (!prev_input_slice.empty()) { |
538 | 201 | size_t chunk_size; |
539 | 201 | if (prev_input_slice.size() >= kLZ4HeaderLen) { |
540 | | // Size fully contained in the previous block. |
541 | 201 | chunk_size = BigEndian::Load16(prev_input_slice.data()); |
542 | 0 | } else if (prev_input_slice.size() + input_slice.size() < kLZ4HeaderLen) { |
543 | | // Did not receive header yet. So exit and wait for more data received. |
544 | 0 | break; |
545 | 0 | } else { |
546 | | // Size distributed between 2 blocks. |
547 | 0 | char buf[kLZ4HeaderLen]; |
548 | 0 | memcpy(buf, prev_input_slice.data(), prev_input_slice.size()); |
549 | 0 | memcpy(buf + prev_input_slice.size(), input_slice.data(), |
550 | 0 | kLZ4HeaderLen - prev_input_slice.size()); |
551 | 0 | chunk_size = BigEndian::Load16(buf); |
552 | 0 | } |
553 | 201 | if (kLZ4HeaderLen + chunk_size > prev_input_slice.size() + input_slice.size()) { |
554 | | // Did not receive full block yet. So exit and wait for more data received. |
555 | | // TODO Here we rely on the fact that we use circular buffer and could have at most 2 |
556 | | // blocks. In general case chunk could be distributed across several buffers. |
557 | 32 | break; |
558 | 32 | } |
559 | 169 | if (prev_input_slice.size() > kLZ4HeaderLen) { |
560 | | // Data is distributed between 2 buffers. |
561 | | // Have to copy it into separate input buffer so it will be a single continuous block. |
562 | 169 | size_t size_in_prev_slice = prev_input_slice.size() - kLZ4HeaderLen; |
563 | 169 | size_t size_in_current_slice = chunk_size - size_in_prev_slice; |
564 | 169 | memcpy(input_buffer_, prev_input_slice.data() + kLZ4HeaderLen, size_in_prev_slice); |
565 | 169 | memcpy(input_buffer_ + size_in_prev_slice, input_slice.data(), size_in_current_slice); |
566 | 169 | RETURN_NOT_OK(DecompressChunk(Slice(input_buffer_, chunk_size))); |
567 | 169 | input_slice.remove_prefix(size_in_current_slice); |
568 | 0 | } else { |
569 | | // Only header (or part of header) was in previous buffer. |
570 | | // Could decompress from current buffer. |
571 | 0 | input_slice.remove_prefix(kLZ4HeaderLen - prev_input_slice.size()); |
572 | 0 | RETURN_NOT_OK(DecompressChunk(input_slice.Prefix(chunk_size))); |
573 | 0 | input_slice.remove_prefix(chunk_size); |
574 | 0 | } |
575 | 169 | } |
576 | | // Decompress all chunks contained in current buffer. |
577 | 17.4k | while (input_slice.size() >= kLZ4HeaderLen && out_it_ != outvecs_.end()) { |
578 | 12.7k | size_t chunk_size = BigEndian::Load16(input_slice.data()); |
579 | 12.7k | if (input_slice.size() < kLZ4HeaderLen + chunk_size) { |
580 | 558 | break; |
581 | 558 | } |
582 | 12.2k | input_slice.remove_prefix(kLZ4HeaderLen); |
583 | 12.2k | RETURN_NOT_OK(DecompressChunk(input_slice.Prefix(chunk_size))); |
584 | 12.2k | input_slice.remove_prefix(chunk_size); |
585 | 12.2k | } |
586 | 5.21k | prev_input_slice = input_slice; |
587 | | // DecompressChunk could increase out_it_, so check whether we still have output space. |
588 | 5.21k | if (out_it_ == outvecs_.end()) { |
589 | 540 | break; |
590 | 540 | } |
591 | 5.21k | } |
592 | 5.04k | inp->Consume(total_consumed_, Slice()); |
593 | 5.04k | out->DataAppended(total_add_); |
594 | 5.04k | return ReadBufferFull(out->Full()); |
595 | 5.04k | } |
596 | | |
597 | | private: |
598 | 12.4k | CHECKED_STATUS DecompressChunk(const Slice& input) { |
599 | 12.4k | int res = LZ4_decompress_safe( |
600 | 12.4k | input.cdata(), static_cast<char*>(out_it_->iov_base), narrow_cast<int>(input.size()), |
601 | 12.4k | narrow_cast<int>(out_it_->iov_len)); |
602 | 12.4k | if (res <= 0) { |
603 | | // Unfortunately LZ4 does not provide information whether decryption failed because |
604 | | // of wrong data or it just does not fit into output buffer. |
605 | | // Try to decode to buffer that is big enough for max possible decompressed chunk. |
606 | 499 | res = LZ4_decompress_safe( |
607 | 499 | input.cdata(), output_buffer_, narrow_cast<int>(input.size()), kLZ4BufferSize); |
608 | 499 | if (res <= 0) { |
609 | 0 | return STATUS_FORMAT(RuntimeError, "Decompress failed: $0", res); |
610 | 0 | } |
611 | | |
612 | | // Copy data from output buffer to provided read buffer. |
613 | 499 | size_t size = res; |
614 | 499 | char* buf = output_buffer_; |
615 | 998 | while (out_it_ != outvecs_.end()) { |
616 | 499 | size_t len = std::min<size_t>(size, out_it_->iov_len); |
617 | 499 | memcpy(out_it_->iov_base, buf, len); |
618 | 499 | IoVecRemovePrefix(len, &*out_it_); |
619 | 499 | size -= len; |
620 | 499 | total_add_ += len; |
621 | 499 | buf += len; |
622 | 499 | if (out_it_->iov_len == 0) { |
623 | | // Need to fill next output io vec. |
624 | 499 | ++out_it_; |
625 | 499 | } |
626 | 499 | if (size == 0) { |
627 | | // Fully copied all decompressed data. |
628 | 0 | break; |
629 | 0 | } |
630 | 499 | } |
631 | 499 | if (size != 0) { |
632 | | // Have more decompressed data than provided read buffer could accept. |
633 | 499 | *prev_decompress_data_left_ = Slice(buf, size); |
634 | 499 | } |
635 | 11.9k | } else { |
636 | 11.9k | IoVecRemovePrefix(res, &*out_it_); |
637 | 11.9k | total_add_ += res; |
638 | 11.9k | if (out_it_->iov_len == 0) { |
639 | 45 | ++out_it_; |
640 | 45 | } |
641 | 11.9k | } |
642 | | |
643 | | // Header was consumed by the caller, so we also add it here. |
644 | 12.4k | total_consumed_ += kLZ4HeaderLen + input.size(); |
645 | 12.4k | return Status::OK(); |
646 | 12.4k | } |
647 | | |
648 | | // LZ4 operates on continuous chunks of memory, so we use input and output buffers to combine |
649 | | // several iovecs into one continuous chunk when necessary. |
650 | | char* input_buffer_; |
651 | | char* output_buffer_; |
652 | | Slice* prev_decompress_data_left_; |
653 | | |
654 | | IoVecs outvecs_; |
655 | | IoVecs::iterator out_it_; |
656 | | size_t total_add_ = 0; |
657 | | size_t total_consumed_ = 0; |
658 | | }; |
659 | | |
660 | | class LZ4Compressor : public Compressor { |
661 | | public: |
662 | | static constexpr char kId = 'L'; |
663 | | static constexpr int kIndex = 3; |
664 | | static constexpr size_t kHeaderLen = kLZ4HeaderLen; |
665 | | |
666 | 70 | explicit LZ4Compressor(MemTrackerPtr mem_tracker) { |
667 | 70 | if (mem_tracker) { |
668 | 0 | consumption_ = ScopedTrackedConsumption(std::move(mem_tracker), 2 * kLZ4BufferSize); |
669 | 0 | } |
670 | 70 | } |
671 | | |
672 | 35 | OutboundDataPtr ConnectionHeader() override { |
673 | 35 | return GetConnectionHeader<LZ4Compressor>(); |
674 | 35 | } |
675 | | |
676 | 70 | CHECKED_STATUS Init() override { |
677 | 70 | return Status::OK(); |
678 | 70 | } |
679 | | |
680 | 0 | std::string ToString() const override { |
681 | 0 | return "LZ4"; |
682 | 0 | } |
683 | | |
684 | | CHECKED_STATUS Compress( |
685 | 0 | const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) override { |
686 | | // Increment iterator in loop body to be able to check whether it is last iteration or not. |
687 | 0 | for (auto input_it = input.begin(); input_it != input.end();) { |
688 | 0 | Slice input_slice = input_it->AsSlice(); |
689 | 0 | ++input_it; |
690 | 0 | while (!input_slice.empty()) { |
691 | 0 | Slice chunk; |
692 | | // Split input into chunks of size kLZ4MaxChunkSize or less. |
693 | 0 | if (input_slice.size() > kLZ4MaxChunkSize) { |
694 | 0 | chunk = input_slice.Prefix(kLZ4MaxChunkSize); |
695 | 0 | } else { |
696 | 0 | chunk = input_slice; |
697 | 0 | } |
698 | 0 | input_slice.remove_prefix(chunk.size()); |
699 | 0 | RefCntBuffer output(kHeaderLen + LZ4_compressBound(narrow_cast<int>(chunk.size()))); |
700 | 0 | int res = LZ4_compress( |
701 | 0 | chunk.cdata(), output.data() + kHeaderLen, narrow_cast<int>(chunk.size())); |
702 | 0 | if (res <= 0) { |
703 | 0 | return STATUS_FORMAT(RuntimeError, "LZ4 compression failed: $0", res); |
704 | 0 | } |
705 | 0 | BigEndian::Store16(output.data(), res); |
706 | 0 | output.Shrink(kHeaderLen + res); |
707 | 0 | RETURN_NOT_OK(stream->SendToLower(std::make_shared<SingleBufferOutboundData>( |
708 | 0 | std::move(output), |
709 | | // We processed last buffer, attach data to it, so it will be notified when this buffer |
710 | | // is transferred. |
711 | 0 | input_slice.empty() && input_it == input.end() ? std::move(data) : nullptr))); |
712 | 0 | } |
713 | 0 | } |
714 | |
|
715 | 0 | return Status::OK(); |
716 | 0 | } |
717 | | |
718 | 5.06k | Result<ReadBufferFull> Decompress(StreamReadBuffer* inp, StreamReadBuffer* out) override { |
719 | 5.06k | LZ4DecompressState state( |
720 | 5.06k | decompress_input_buf_, decompress_output_buf_, &prev_decompress_data_left_); |
721 | 5.06k | return state.Execute(inp, out); |
722 | 5.06k | } |
723 | | |
724 | | private: |
725 | | char decompress_input_buf_[kLZ4BufferSize]; |
726 | | char decompress_output_buf_[kLZ4BufferSize]; |
727 | | Slice prev_decompress_data_left_; |
728 | | ScopedTrackedConsumption consumption_; |
729 | | }; |
730 | | |
731 | | #undef LZ4 |
732 | | #define YB_COMPRESSION_ALGORITHMS (Zlib)(Snappy)(LZ4) |
733 | | |
734 | | #define YB_CREATE_COMPRESSOR_CASE(r, data, name) \ |
735 | 332 | case BOOST_PP_CAT(name, Compressor)::data: \ |
736 | 332 | return std::make_unique<BOOST_PP_CAT(name, Compressor)>(std::move(mem_tracker)); |
737 | | |
738 | 304k | std::unique_ptr<Compressor> CreateCompressor(char sign, MemTrackerPtr mem_tracker) { |
739 | 304k | switch (sign) { |
740 | 166 | BOOST_PP_SEQ_FOR_EACH(YB_CREATE_COMPRESSOR_CASE, kId, YB_COMPRESSION_ALGORITHMS) |
741 | 304k | default: |
742 | 304k | return nullptr; |
743 | 304k | } |
744 | 304k | } |
745 | | |
746 | 270k | std::unique_ptr<Compressor> CreateOutboundCompressor(MemTrackerPtr mem_tracker) { |
747 | 270k | auto algo = FLAGS_stream_compression_algo; |
748 | 271k | if (!algo) { |
749 | 271k | return nullptr; |
750 | 271k | } |
751 | 18.4E | switch (algo) { |
752 | 166 | BOOST_PP_SEQ_FOR_EACH(YB_CREATE_COMPRESSOR_CASE, kIndex, YB_COMPRESSION_ALGORITHMS) |
753 | 0 | default: |
754 | 0 | YB_LOG_EVERY_N_SECS(DFATAL, 5) << "Unknown compression algorithm: " << algo; |
755 | 0 | return nullptr; |
756 | 18.4E | } |
757 | 18.4E | } |
758 | | |
759 | | class CompressedRefiner : public StreamRefiner { |
760 | | public: |
761 | 874k | CompressedRefiner() = default; |
762 | | |
763 | | private: |
764 | 876k | void Start(RefinedStream* stream) override { |
765 | 876k | stream_ = stream; |
766 | 876k | } |
767 | | |
768 | 313k | CHECKED_STATUS ProcessHeader() override { |
769 | 313k | constexpr int kHeaderLen = 3; |
770 | | |
771 | 313k | auto data = stream_->ReadBuffer().AppendedVecs(); |
772 | 313k | if (data.empty() || data[0].iov_len < kHeaderLen) { |
773 | | // Did not receive enough bytes to make a decision. |
774 | | // So just wait more bytes. |
775 | 0 | return Status::OK(); |
776 | 0 | } |
777 | | |
778 | 313k | const auto* bytes = static_cast<const uint8_t*>(data[0].iov_base); |
779 | 313k | if (bytes[0] == 'Y' && bytes[1] == 'B') { |
780 | 304k | compressor_ = CreateCompressor(bytes[2], stream_->buffer_tracker()); |
781 | 304k | if (compressor_) { |
782 | 166 | RETURN_NOT_OK(compressor_->Init()); |
783 | 166 | RETURN_NOT_OK(stream_->StartHandshake()); |
784 | 166 | stream_->ReadBuffer().Consume(kHeaderLen, Slice()); |
785 | 166 | return Status::OK(); |
786 | 313k | } |
787 | 304k | } |
788 | | |
789 | | // Don't use compression on this stream. |
790 | 313k | return stream_->Established(RefinedStreamState::kDisabled); |
791 | 313k | } |
792 | | |
793 | 16.7k | CHECKED_STATUS Send(OutboundDataPtr data) override { |
794 | 16.7k | boost::container::small_vector<RefCntBuffer, 10> input; |
795 | 16.7k | data->Serialize(&input); |
796 | 16.7k | return compressor_->Compress(input, stream_, std::move(data)); |
797 | 16.7k | } |
798 | | |
799 | 272k | CHECKED_STATUS Handshake() override { |
800 | 272k | if (stream_->local_side() == LocalSide::kClient) { |
801 | 271k | compressor_ = CreateOutboundCompressor(stream_->buffer_tracker()); |
802 | 271k | if (!compressor_) { |
803 | 270k | return stream_->Established(RefinedStreamState::kDisabled); |
804 | 270k | } |
805 | 417 | RETURN_NOT_OK(compressor_->Init()); |
806 | 417 | RETURN_NOT_OK(stream_->SendToLower(compressor_->ConnectionHeader())); |
807 | 417 | } |
808 | | |
809 | 1.36k | return stream_->Established(RefinedStreamState::kEnabled); |
810 | 272k | } |
811 | | |
812 | 20.4k | Result<ReadBufferFull> Read(StreamReadBuffer* out) override { |
813 | 0 | VLOG_WITH_PREFIX(4) << __func__; |
814 | | |
815 | 20.4k | return compressor_->Decompress(&stream_->ReadBuffer(), out); |
816 | 20.4k | } |
817 | | |
818 | 3.32M | const Protocol* GetProtocol() override { |
819 | 3.32M | return CompressedStreamProtocol(); |
820 | 3.32M | } |
821 | | |
822 | 1 | std::string ToString() const override { |
823 | 1 | return compressor_ ? compressor_->ToString() : "PLAIN"; |
824 | 1 | } |
825 | | |
826 | 0 | const std::string& LogPrefix() const { |
827 | 0 | return stream_->LogPrefix(); |
828 | 0 | } |
829 | | |
830 | | RefinedStream* stream_ = nullptr; |
831 | | std::unique_ptr<Compressor> compressor_ = nullptr; |
832 | | }; |
833 | | |
834 | | } // namespace |
835 | | |
836 | 3.35M | const Protocol* CompressedStreamProtocol() { |
837 | 3.35M | static Protocol result("tcpc"); |
838 | 3.35M | return &result; |
839 | 3.35M | } |
840 | | |
841 | | StreamFactoryPtr CompressedStreamFactory( |
842 | 16.1k | StreamFactoryPtr lower_layer_factory, const MemTrackerPtr& buffer_tracker) { |
843 | 16.1k | return std::make_shared<RefinedStreamFactory>( |
844 | 870k | std::move(lower_layer_factory), buffer_tracker, [](const StreamCreateData& data) { |
845 | 870k | return std::make_unique<CompressedRefiner>(); |
846 | 870k | }); |
847 | 16.1k | } |
848 | | |
849 | | } // namespace rpc |
850 | | } // namespace yb |