YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/compressed_stream.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/compressed_stream.h"
15
16
#include <lz4.h>
17
#include <snappy-sinksource.h>
18
#include <snappy.h>
19
#include <zlib.h>
20
21
#include <boost/preprocessor/cat.hpp>
22
#include <boost/range/iterator_range.hpp>
23
24
#include "yb/gutil/casts.h"
25
26
#include "yb/rpc/circular_read_buffer.h"
27
#include "yb/rpc/outbound_data.h"
28
#include "yb/rpc/refined_stream.h"
29
30
#include "yb/util/logging.h"
31
#include "yb/util/result.h"
32
#include "yb/util/size_literals.h"
33
#include "yb/util/status_format.h"
34
35
using namespace std::literals;
36
37
DEFINE_int32(stream_compression_algo, 0, "Algorithm used for stream compression. "
38
                                         "0 - no compression, 1 - gzip, 2 - snappy, 3 - lz4.");
39
40
namespace yb {
41
namespace rpc {
42
43
using SmallRefCntBuffers = boost::container::small_vector_base<RefCntBuffer>;
44
45
namespace {
46
47
class Compressor {
48
 public:
49
  virtual std::string ToString() const = 0;
50
51
  // Initialize compressor, required since we don't use exceptions to return error from ctor.
52
  virtual CHECKED_STATUS Init() = 0;
53
54
  // Compress specified vector of input buffers into single output buffer.
55
  virtual CHECKED_STATUS Compress(
56
      const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) = 0;
57
58
  // Decompress specified input slice to specified output buffer.
59
  virtual Result<ReadBufferFull> Decompress(StreamReadBuffer* inp, StreamReadBuffer* out) = 0;
60
61
  // Connection header associated with this compressor.
62
  virtual OutboundDataPtr ConnectionHeader() = 0;
63
64
298
  virtual ~Compressor() = default;
65
};
66
67
40.2k
size_t EntrySize(const RefCntBuffer& buffer) {
68
40.2k
  return buffer.size();
69
40.2k
}
70
71
214k
size_t EntrySize(const iovec& iov) {
72
214k
  return iov.iov_len;
73
214k
}
74
75
13.7k
char* EntryData(const RefCntBuffer& buffer) {
76
13.7k
  return buffer.data();
77
13.7k
}
78
79
104k
char* EntryData(const iovec& iov) {
80
104k
  return static_cast<char*>(iov.iov_base);
81
104k
}
82
83
template <class Collection>
84
18.0k
size_t TotalLen(const Collection& input) {
85
18.0k
  size_t result = 0;
86
18.3k
  for (const auto& buf : input) {
87
18.3k
    result += EntrySize(buf);
88
18.3k
  }
89
18.0k
  return result;
90
18.0k
}
compressed_stream.cc:unsigned long yb::rpc::(anonymous namespace)::TotalLen<boost::container::small_vector_base<yb::RefCntBuffer, void, void> >(boost::container::small_vector_base<yb::RefCntBuffer, void, void> const&)
Line
Count
Source
84
8.58k
size_t TotalLen(const Collection& input) {
85
8.58k
  size_t result = 0;
86
8.58k
  for (const auto& buf : input) {
87
8.58k
    result += EntrySize(buf);
88
8.58k
  }
89
8.58k
  return result;
90
8.58k
}
compressed_stream.cc:unsigned long yb::rpc::(anonymous namespace)::TotalLen<boost::iterator_range<boost::container::vec_iterator<yb::RefCntBuffer*, true> > >(boost::iterator_range<boost::container::vec_iterator<yb::RefCntBuffer*, true> > const&)
Line
Count
Source
84
4.14k
size_t TotalLen(const Collection& input) {
85
4.14k
  size_t result = 0;
86
4.14k
  for (const auto& buf : input) {
87
4.14k
    result += EntrySize(buf);
88
4.14k
  }
89
4.14k
  return result;
90
4.14k
}
compressed_stream.cc:unsigned long yb::rpc::(anonymous namespace)::TotalLen<boost::iterator_range<boost::container::vec_iterator<iovec*, true> > >(boost::iterator_range<boost::container::vec_iterator<iovec*, true> > const&)
Line
Count
Source
84
5.28k
size_t TotalLen(const Collection& input) {
85
5.28k
  size_t result = 0;
86
5.64k
  for (const auto& buf : input) {
87
5.64k
    result += EntrySize(buf);
88
5.64k
  }
89
5.28k
  return result;
90
5.28k
}
91
92
template <class Compressor>
93
196
OutboundDataPtr GetConnectionHeader() {
94
  // Compressed stream header has signature YBx, where x - compressor identifier.
95
196
  static auto result = std::make_shared<StringOutboundData>(
96
196
      "YB"s + Compressor::kId, Compressor::kId + "ConnectionHeader"s);
97
196
  return result;
98
196
}
compressed_stream.cc:std::__1::shared_ptr<yb::rpc::OutboundData> yb::rpc::(anonymous namespace)::GetConnectionHeader<yb::rpc::(anonymous namespace)::ZlibCompressor>()
Line
Count
Source
93
126
OutboundDataPtr GetConnectionHeader() {
94
  // Compressed stream header has signature YBx, where x - compressor identifier.
95
126
  static auto result = std::make_shared<StringOutboundData>(
96
126
      "YB"s + Compressor::kId, Compressor::kId + "ConnectionHeader"s);
97
126
  return result;
98
126
}
compressed_stream.cc:std::__1::shared_ptr<yb::rpc::OutboundData> yb::rpc::(anonymous namespace)::GetConnectionHeader<yb::rpc::(anonymous namespace)::SnappyCompressor>()
Line
Count
Source
93
35
OutboundDataPtr GetConnectionHeader() {
94
  // Compressed stream header has signature YBx, where x - compressor identifier.
95
35
  static auto result = std::make_shared<StringOutboundData>(
96
35
      "YB"s + Compressor::kId, Compressor::kId + "ConnectionHeader"s);
97
35
  return result;
98
35
}
compressed_stream.cc:std::__1::shared_ptr<yb::rpc::OutboundData> yb::rpc::(anonymous namespace)::GetConnectionHeader<yb::rpc::(anonymous namespace)::LZ4Compressor>()
Line
Count
Source
93
35
OutboundDataPtr GetConnectionHeader() {
94
  // Compressed stream header has signature YBx, where x - compressor identifier.
95
35
  static auto result = std::make_shared<StringOutboundData>(
96
35
      "YB"s + Compressor::kId, Compressor::kId + "ConnectionHeader"s);
97
35
  return result;
98
35
}
99
100
template <class SliceDecompressor>
101
Result<ReadBufferFull> DecompressBySlices(
102
10.2k
    StreamReadBuffer* inp, StreamReadBuffer* out, const SliceDecompressor& slice_decompressor) {
103
10.2k
  size_t consumed = 0;
104
10.2k
  auto out_vecs = VERIFY_RESULT(out->PrepareAppend());
105
0
  auto out_it = out_vecs.begin();
106
10.2k
  size_t appended = 0;
107
108
10.2k
  for (const auto& iov : inp->AppendedVecs()) {
109
10.2k
    Slice slice(static_cast<char*>(iov.iov_base), iov.iov_len);
110
11.3k
    for (;;) {
111
11.3k
      if (out_it->iov_len == 0) {
112
1.09k
        if (++out_it == out_vecs.end()) {
113
1.09k
          break;
114
1.09k
        }
115
1.09k
      }
116
10.2k
      size_t len = VERIFY_RESULT(slice_decompressor(&slice, out_it->iov_base, out_it->iov_len));
117
0
      appended += len;
118
10.2k
      IoVecRemovePrefix(len, &*out_it);
119
10.2k
      if (slice.empty()) {
120
9.12k
        break;
121
9.12k
      }
122
10.2k
    }
123
10.2k
    consumed += iov.iov_len - slice.size();
124
10.2k
    if (!slice.empty()) {
125
1.09k
      break;
126
1.09k
    }
127
10.2k
  }
128
10.2k
  out->DataAppended(appended);
129
10.2k
  inp->Consume(consumed, Slice());
130
10.2k
  return ReadBufferFull(out->Full());
131
10.2k
}
132
133
class ZlibCompressor : public Compressor {
134
 public:
135
  static const char kId = 'G';
136
  static const int kIndex = 1;
137
138
240
  explicit ZlibCompressor(MemTrackerPtr mem_tracker) {
139
240
  }
140
141
159
  ~ZlibCompressor() {
142
159
    if (deflate_inited_) {
143
159
      int res = deflateEnd(&deflate_stream_);
144
159
      LOG_IF(WARNING, res != Z_OK && res != Z_DATA_ERROR)
145
1
          << "Failed to destroy deflate stream: " << res;
146
159
    }
147
159
    if (inflate_inited_) {
148
158
      int res = inflateEnd(&inflate_stream_);
149
18.4E
      LOG_IF(WARNING, res != Z_OK) << "Failed to destroy inflate stream: " << res;
150
158
    }
151
159
  }
152
153
126
  OutboundDataPtr ConnectionHeader() override {
154
126
    return GetConnectionHeader<ZlibCompressor>();
155
126
  }
156
157
240
  CHECKED_STATUS Init() override {
158
240
    memset(&deflate_stream_, 0, sizeof(deflate_stream_));
159
240
    int res = deflateInit(&deflate_stream_, /* level= */ Z_DEFAULT_COMPRESSION);
160
240
    if (res != Z_OK) {
161
0
      return STATUS_FORMAT(RuntimeError, "Cannot init deflate stream: $0", res);
162
0
    }
163
240
    deflate_inited_ = true;
164
165
240
    memset(&inflate_stream_, 0, sizeof(inflate_stream_));
166
240
    res = inflateInit(&inflate_stream_);
167
240
    if (res != Z_OK) {
168
0
      return STATUS_FORMAT(RuntimeError, "Cannot init inflate stream: $0", res);
169
0
    }
170
240
    inflate_inited_ = true;
171
172
240
    return Status::OK();
173
240
  }
174
175
0
  std::string ToString() const override {
176
0
    return "Zlib";
177
0
  }
178
179
  CHECKED_STATUS Compress(
180
8.58k
      const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) override {
181
8.58k
    RefCntBuffer output(deflateBound(&deflate_stream_, TotalLen(input)));
182
8.58k
    deflate_stream_.avail_out = static_cast<unsigned int>(output.size());
183
8.58k
    deflate_stream_.next_out = output.udata();
184
185
17.1k
    for (auto it = input.begin(); it != input.end();) {
186
8.58k
      const auto& buf = *it++;
187
8.58k
      deflate_stream_.next_in = const_cast<Bytef*>(buf.udata());
188
8.58k
      deflate_stream_.avail_in = static_cast<unsigned int>(buf.size());
189
190
8.58k
      for (;;) {
191
8.58k
        auto res = deflate(&deflate_stream_, it == input.end() ? Z_PARTIAL_FLUSH : Z_NO_FLUSH);
192
8.58k
        if (res == Z_STREAM_END) {
193
0
          if (deflate_stream_.avail_in != 0) {
194
0
            return STATUS_FORMAT(
195
0
                RuntimeError, "Stream end when input data still available: $0",
196
0
                deflate_stream_.avail_in);
197
0
          }
198
0
          break;
199
0
        }
200
8.58k
        if (res != Z_OK) {
201
0
          return STATUS_FORMAT(RuntimeError, "Compression failed: $0", res);
202
0
        }
203
8.58k
        if (deflate_stream_.avail_in == 0) {
204
8.58k
          break;
205
8.58k
        }
206
8.58k
      }
207
8.58k
    }
208
209
8.58k
    output.Shrink(deflate_stream_.next_out - output.udata());
210
211
    // Send compressed data to underlying stream.
212
8.58k
    return stream->SendToLower(std::make_shared<SingleBufferOutboundData>(
213
8.58k
        std::move(output), std::move(data)));
214
8.58k
  }
215
216
10.2k
  Result<ReadBufferFull> Decompress(StreamReadBuffer* inp, StreamReadBuffer* out) override {
217
10.2k
    return DecompressBySlices(
218
10.2k
        inp, out, [this](Slice* input, void* out, size_t outlen) -> Result<size_t> {
219
10.2k
      inflate_stream_.next_in = const_cast<Bytef*>(pointer_cast<const Bytef*>(input->data()));
220
10.2k
      inflate_stream_.avail_in = narrow_cast<uInt>(input->size());
221
10.2k
      inflate_stream_.next_out = static_cast<Bytef*>(out);
222
10.2k
      inflate_stream_.avail_out = narrow_cast<uInt>(outlen);
223
224
10.2k
      int res = inflate(&inflate_stream_, Z_NO_FLUSH);
225
10.2k
      if (res != Z_OK && 
res != Z_BUF_ERROR28
) {
226
0
        return STATUS_FORMAT(RuntimeError, "Decompression failed: $0", res);
227
0
      }
228
229
10.2k
      input->remove_prefix(input->size() - inflate_stream_.avail_in);
230
10.2k
      return outlen - inflate_stream_.avail_out;
231
10.2k
    });
232
10.2k
  }
233
234
 private:
235
  z_stream deflate_stream_;
236
  z_stream inflate_stream_;
237
  bool deflate_inited_ = false;
238
  bool inflate_inited_ = false;
239
};
240
241
// Source implementation that provides input from range of buffers.
242
template <class It>
243
class RangeSource : public snappy::Source {
244
 public:
245
  explicit RangeSource(const It& begin, const It& end)
246
      : current_(begin), end_(end),
247
9.43k
        available_(TotalLen(boost::make_iterator_range(begin, end))) {}
compressed_stream.cc:yb::rpc::(anonymous namespace)::RangeSource<boost::container::vec_iterator<yb::RefCntBuffer*, true> >::RangeSource(boost::container::vec_iterator<yb::RefCntBuffer*, true> const&, boost::container::vec_iterator<yb::RefCntBuffer*, true> const&)
Line
Count
Source
247
4.14k
        available_(TotalLen(boost::make_iterator_range(begin, end))) {}
compressed_stream.cc:yb::rpc::(anonymous namespace)::RangeSource<boost::container::vec_iterator<iovec*, true> >::RangeSource(boost::container::vec_iterator<iovec*, true> const&, boost::container::vec_iterator<iovec*, true> const&)
Line
Count
Source
247
5.28k
        available_(TotalLen(boost::make_iterator_range(begin, end))) {}
248
249
42.9k
  void Limit(size_t value) {
250
42.9k
    available_ = value;
251
42.9k
  }
compressed_stream.cc:yb::rpc::(anonymous namespace)::RangeSource<boost::container::vec_iterator<yb::RefCntBuffer*, true> >::Limit(unsigned long)
Line
Count
Source
249
13.7k
  void Limit(size_t value) {
250
13.7k
    available_ = value;
251
13.7k
  }
compressed_stream.cc:yb::rpc::(anonymous namespace)::RangeSource<boost::container::vec_iterator<iovec*, true> >::Limit(unsigned long)
Line
Count
Source
249
29.1k
  void Limit(size_t value) {
250
29.1k
    available_ = value;
251
29.1k
  }
252
253
64.9k
  size_t Available() const override {
254
64.9k
    return available_;
255
64.9k
  }
compressed_stream.cc:yb::rpc::(anonymous namespace)::RangeSource<boost::container::vec_iterator<yb::RefCntBuffer*, true> >::Available() const
Line
Count
Source
253
31.6k
  size_t Available() const override {
254
31.6k
    return available_;
255
31.6k
  }
compressed_stream.cc:yb::rpc::(anonymous namespace)::RangeSource<boost::container::vec_iterator<iovec*, true> >::Available() const
Line
Count
Source
253
33.2k
  size_t Available() const override {
254
33.2k
    return available_;
255
33.2k
  }
256
257
132k
  const char* Peek(size_t* len) override {
258
132k
    if (available_ == 0) {
259
13.7k
      *len = 0;
260
13.7k
      return nullptr;
261
13.7k
    }
262
118k
    *len = std::min(EntrySize(*current_) - current_pos_, available_);
263
118k
    return EntryData(*current_) + current_pos_;
264
132k
  }
compressed_stream.cc:yb::rpc::(anonymous namespace)::RangeSource<boost::container::vec_iterator<yb::RefCntBuffer*, true> >::Peek(unsigned long*)
Line
Count
Source
257
13.7k
  const char* Peek(size_t* len) override {
258
13.7k
    if (available_ == 0) {
259
0
      *len = 0;
260
0
      return nullptr;
261
0
    }
262
13.7k
    *len = std::min(EntrySize(*current_) - current_pos_, available_);
263
13.7k
    return EntryData(*current_) + current_pos_;
264
13.7k
  }
compressed_stream.cc:yb::rpc::(anonymous namespace)::RangeSource<boost::container::vec_iterator<iovec*, true> >::Peek(unsigned long*)
Line
Count
Source
257
118k
  const char* Peek(size_t* len) override {
258
118k
    if (available_ == 0) {
259
13.7k
      *len = 0;
260
13.7k
      return nullptr;
261
13.7k
    }
262
104k
    *len = std::min(EntrySize(*current_) - current_pos_, available_);
263
104k
    return EntryData(*current_) + current_pos_;
264
118k
  }
265
266
13.7k
  void Rewind(size_t n) {
267
13.7k
    available_ += n;
268
13.7k
    for (;;) {
269
13.7k
      if (current_pos_ >= n) {
270
13.7k
        current_pos_ -= n;
271
13.7k
        break;
272
13.7k
      }
273
0
      n -= current_pos_;
274
0
      --current_;
275
0
      current_pos_ = EntrySize(*current_);
276
0
    }
277
13.7k
  }
278
279
169k
  void Skip(size_t n) override {
280
169k
    if (!n) {
281
51.2k
      return;
282
51.2k
    }
283
118k
    available_ -= n;
284
118k
    if ((current_pos_ += n) >= EntrySize(*current_)) {
285
8.44k
      current_pos_ = 0;
286
8.44k
      ++current_;
287
8.44k
      DCHECK(available_ == 0 ? current_ == end_ : current_ < end_)
288
0
          << "Available: " << available_ << ", left buffers: " << std::distance(current_, end_)
289
0
          << ", n: " << n;
290
8.44k
    }
291
118k
  }
compressed_stream.cc:yb::rpc::(anonymous namespace)::RangeSource<boost::container::vec_iterator<yb::RefCntBuffer*, true> >::Skip(unsigned long)
Line
Count
Source
279
13.7k
  void Skip(size_t n) override {
280
13.7k
    if (!n) {
281
0
      return;
282
0
    }
283
13.7k
    available_ -= n;
284
13.7k
    if ((current_pos_ += n) >= EntrySize(*current_)) {
285
4.14k
      current_pos_ = 0;
286
4.14k
      ++current_;
287
4.14k
      DCHECK(available_ == 0 ? current_ == end_ : current_ < end_)
288
0
          << "Available: " << available_ << ", left buffers: " << std::distance(current_, end_)
289
0
          << ", n: " << n;
290
4.14k
    }
291
13.7k
  }
compressed_stream.cc:yb::rpc::(anonymous namespace)::RangeSource<boost::container::vec_iterator<iovec*, true> >::Skip(unsigned long)
Line
Count
Source
279
155k
  void Skip(size_t n) override {
280
155k
    if (!n) {
281
51.2k
      return;
282
51.2k
    }
283
104k
    available_ -= n;
284
104k
    if ((current_pos_ += n) >= EntrySize(*current_)) {
285
4.29k
      current_pos_ = 0;
286
4.29k
      ++current_;
287
4.29k
      DCHECK(available_ == 0 ? current_ == end_ : current_ < end_)
288
0
          << "Available: " << available_ << ", left buffers: " << std::distance(current_, end_)
289
0
          << ", n: " << n;
290
4.29k
    }
291
104k
  }
292
293
 private:
294
  It current_; // Current buffer that we are reading.
295
  It end_;
296
  size_t current_pos_ = 0; // Position in current buffer.
297
  size_t available_; // How many bytes left in all buffers.
298
};
299
300
// Sink implementation that provides output to io vecs.
301
class IoVecsSink : public snappy::Sink {
302
 public:
303
5.28k
  explicit IoVecsSink(IoVecs* out) : out_(out->begin()) {}
304
305
19.5k
  size_t total_appended() const {
306
19.5k
    return total_appended_;
307
19.5k
  }
308
309
13.7k
  void Append(const char* bytes, size_t n) override {
310
13.7k
    total_appended_ += n;
311
312
13.7k
    if (bytes == out_->iov_base) {
313
13.7k
      if (out_->iov_len == n) {
314
54
        ++out_;
315
13.6k
      } else {
316
13.6k
        IoVecRemovePrefix(n, &*out_);
317
13.6k
      }
318
13.7k
      return;
319
13.7k
    }
320
321
0
    while (n > 0) {
322
0
      size_t current_len = out_->iov_len;
323
0
      if (current_len >= n) {
324
0
        memcpy(out_->iov_base, bytes, n);
325
0
        IoVecRemovePrefix(n, &*out_);
326
0
        n = 0;
327
0
      } else {
328
0
        memcpy(out_->iov_base, bytes, current_len);
329
0
        bytes += current_len;
330
0
        n -= current_len;
331
0
        ++out_;
332
0
      }
333
0
    }
334
0
  }
335
336
  char* GetAppendBufferVariable(
337
      size_t min_size, size_t desired_size_hint, char* scratch,
338
13.7k
      size_t scratch_size, size_t* allocated_size) override {
339
13.7k
    if (min_size <= out_->iov_len) {
340
13.7k
      *allocated_size = out_->iov_len;
341
13.7k
      return static_cast<char*>(out_->iov_base);
342
13.7k
    }
343
0
    return Sink::GetAppendBufferVariable(
344
0
        min_size, desired_size_hint, scratch, scratch_size, allocated_size);
345
13.7k
  }
346
347
 private:
348
  IoVecs::iterator out_;
349
  size_t total_appended_ = 0;
350
};
351
352
// Binary search to find max value so that max_compressed_len(value) fits into header_len bytes.
353
template <class F>
354
56.5k
static size_t FindMaxChunkSize(size_t header_len, const F& max_compressed_len) {
355
56.5k
  size_t max_value = (1ULL << (8 * header_len)) - 1;
356
56.5k
  size_t l = 1;
357
56.5k
  size_t r = max_value;
358
960k
  while (r > l) {
359
904k
    size_t m = (l + r + 1) / 2;
360
904k
    if (implicit_cast<size_t>(max_compressed_len(narrow_cast<int>(m))) > max_value) {
361
339k
      r = m - 1;
362
565k
    } else {
363
565k
      l = m;
364
565k
    }
365
904k
  }
366
56.5k
  return l;
367
56.5k
}
compressed_stream.cc:unsigned long yb::rpc::(anonymous namespace)::FindMaxChunkSize<unsigned long (*)(unsigned long)>(unsigned long, unsigned long (* const&)(unsigned long))
Line
Count
Source
354
28.2k
static size_t FindMaxChunkSize(size_t header_len, const F& max_compressed_len) {
355
28.2k
  size_t max_value = (1ULL << (8 * header_len)) - 1;
356
28.2k
  size_t l = 1;
357
28.2k
  size_t r = max_value;
358
480k
  while (r > l) {
359
452k
    size_t m = (l + r + 1) / 2;
360
452k
    if (implicit_cast<size_t>(max_compressed_len(narrow_cast<int>(m))) > max_value) {
361
197k
      r = m - 1;
362
254k
    } else {
363
254k
      l = m;
364
254k
    }
365
452k
  }
366
28.2k
  return l;
367
28.2k
}
compressed_stream.cc:unsigned long yb::rpc::(anonymous namespace)::FindMaxChunkSize<int (*)(int)>(unsigned long, int (* const&)(int))
Line
Count
Source
354
28.2k
static size_t FindMaxChunkSize(size_t header_len, const F& max_compressed_len) {
355
28.2k
  size_t max_value = (1ULL << (8 * header_len)) - 1;
356
28.2k
  size_t l = 1;
357
28.2k
  size_t r = max_value;
358
480k
  while (r > l) {
359
452k
    size_t m = (l + r + 1) / 2;
360
452k
    if (implicit_cast<size_t>(max_compressed_len(narrow_cast<int>(m))) > max_value) {
361
141k
      r = m - 1;
362
310k
    } else {
363
310k
      l = m;
364
310k
    }
365
452k
  }
366
28.2k
  return l;
367
28.2k
}
368
369
// Snappy does not support stream compression.
370
// So we have to add block len, to start decompression only from block start.
371
// We use 2 bytes for block len, so have to limit uncompressed block so that in worst case
372
// compressed block size would fit into 2 bytes.
373
constexpr size_t kSnappyHeaderLen = 2;
374
const size_t kSnappyMaxChunkSize = FindMaxChunkSize(kSnappyHeaderLen, &snappy::MaxCompressedLength);
375
376
class SnappyCompressor : public Compressor {
377
 public:
378
  static constexpr char kId = 'S';
379
  static constexpr int kIndex = 2;
380
  static constexpr size_t kHeaderLen = kSnappyHeaderLen;
381
382
70
  explicit SnappyCompressor(MemTrackerPtr mem_tracker) {
383
70
  }
384
385
70
  ~SnappyCompressor() {
386
70
  }
387
388
35
  OutboundDataPtr ConnectionHeader() override {
389
35
    return GetConnectionHeader<SnappyCompressor>();
390
35
  }
391
392
70
  CHECKED_STATUS Init() override {
393
70
    return Status::OK();
394
70
  }
395
396
0
  std::string ToString() const override {
397
0
    return "Snappy";
398
0
  }
399
400
  CHECKED_STATUS Compress(
401
4.14k
      const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) override {
402
4.14k
    RangeSource<SmallRefCntBuffers::const_iterator> source(input.begin(), input.end());
403
4.14k
    auto input_size = source.Available();
404
4.14k
    bool stop = false;
405
17.8k
    while (!stop) {
406
      // Split input into chunks of size kSnappyMaxChunkSize or less.
407
13.7k
      if (input_size > kSnappyMaxChunkSize) {
408
9.59k
        source.Limit(kSnappyMaxChunkSize);
409
9.59k
        input_size -= kSnappyMaxChunkSize;
410
9.59k
      } else {
411
4.14k
        source.Limit(input_size);
412
4.14k
        stop = true;
413
4.14k
      }
414
13.7k
      RefCntBuffer output(kHeaderLen + snappy::MaxCompressedLength(source.Available()));
415
13.7k
      snappy::UncheckedByteArraySink sink(output.data() + kHeaderLen);
416
13.7k
      auto compressed_len = snappy::Compress(&source, &sink);
417
13.7k
      BigEndian::Store16(output.data(), compressed_len);
418
13.7k
      output.Shrink(kHeaderLen + compressed_len);
419
13.7k
      RETURN_NOT_OK(stream->SendToLower(std::make_shared<SingleBufferOutboundData>(
420
13.7k
          std::move(output),
421
          // We processed last buffer, attach data to it, so it will be notified when this buffer
422
          // is transferred.
423
13.7k
          stop ? std::move(data) : nullptr)));
424
13.7k
    }
425
4.14k
    return Status::OK();
426
4.14k
  }
427
428
5.28k
  Result<ReadBufferFull> Decompress(StreamReadBuffer* inp, StreamReadBuffer* out) override {
429
5.28k
    auto inp_vecs = inp->AppendedVecs();
430
5.28k
    RangeSource<IoVecs::const_iterator> source(inp_vecs.begin(), inp_vecs.end());
431
432
5.28k
    auto total_consumed = 0;
433
5.28k
    auto out_vecs = VERIFY_RESULT(out->PrepareAppend());
434
0
    auto total_output = IoVecsFullSize(out_vecs);
435
5.28k
    IoVecsSink sink(&out_vecs);
436
437
5.28k
    size_t input_size = source.Available();
438
5.28k
    auto result = ReadBufferFull::kFalse;
439
19.0k
    while (total_consumed + kHeaderLen <= input_size) {
440
14.9k
      source.Limit(input_size - total_consumed);
441
      // Fetch chunk len.
442
14.9k
      size_t len = 0;
443
14.9k
      auto data = source.Peek(&len);
444
14.9k
      size_t size;
445
14.9k
      if (len >= kHeaderLen) {
446
        // Size fully contained in the first buffer.
447
14.9k
        size = BigEndian::Load16(data);
448
14.9k
        source.Skip(kHeaderLen);
449
14.9k
      } else {
450
        // Size distributed between 2 blocks.
451
        // Since blocks are not empty and size exactly 2, we should expect that first block
452
        // contains 1 byte of size.
453
0
        if (len != 1) {
454
0
          return STATUS(RuntimeError, "Expected to peek at least one byte");
455
0
        }
456
0
        char buf[kHeaderLen];
457
0
        buf[0] = *data;
458
0
        source.Skip(1);
459
0
        data = source.Peek(&len);
460
0
        buf[1] = *data;
461
0
        source.Skip(1);
462
0
        size = BigEndian::Load16(buf);
463
0
      }
464
      // Check whether we already received full chunk.
465
14.9k
      
VLOG_WITH_FUNC0
(4)
466
0
          << "Total consumed: " << total_consumed << ", size: " << size << ", input: "
467
0
          << input_size;
468
14.9k
      if (total_consumed + kHeaderLen + size > input_size) {
469
644
        break;
470
644
      }
471
14.2k
      source.Limit(size);
472
14.2k
      uint32_t length = 0;
473
14.2k
      auto old_available = source.Available();
474
14.2k
      if (!snappy::GetUncompressedLength(&source, &length)) {
475
0
        return STATUS(RuntimeError, "GetUncompressedLength failed");
476
0
      }
477
478
      // Check if we have space for decompressed block.
479
14.2k
      
VLOG_WITH_FUNC0
(4)
480
0
          << "Total appended: " << sink.total_appended() << ", length: " << length
481
0
          << ", total_output: " << total_output;
482
14.2k
      if (sink.total_appended() + length > total_output) {
483
524
        result = ReadBufferFull::kTrue;
484
524
        break;
485
524
      }
486
      // Rollback to state before GetUncompressedLength.
487
13.7k
      source.Rewind(old_available - source.Available());
488
13.7k
      if (!snappy::Uncompress(&source, &sink)) {
489
0
        return STATUS(RuntimeError, "Decompression failed");
490
0
      }
491
13.7k
      total_consumed += kHeaderLen + size;
492
13.7k
    }
493
494
5.28k
    out->DataAppended(sink.total_appended());
495
5.28k
    inp->Consume(total_consumed, Slice());
496
497
5.28k
    return result;
498
5.28k
  }
499
};
500
501
// LZ4 could compress/decompress only continuous block of memory into similar block.
502
// So we do that same as for Snappy, but decompression is even more complex.
503
constexpr size_t kLZ4HeaderLen = 2;
504
const size_t kLZ4MaxChunkSize = FindMaxChunkSize(kLZ4HeaderLen, &LZ4_compressBound);
505
const size_t kLZ4BufferSize = 64_KB;
506
507
class LZ4DecompressState {
508
 public:
509
  LZ4DecompressState(char* input_buffer, char* output_buffer, Slice* prev_decompress_data_left)
510
      : input_buffer_(input_buffer), output_buffer_(output_buffer),
511
5.09k
        prev_decompress_data_left_(prev_decompress_data_left) {}
512
513
5.09k
  Result<ReadBufferFull> Execute(StreamReadBuffer* inp, StreamReadBuffer* out) {
514
5.09k
    outvecs_ = VERIFY_RESULT(out->PrepareAppend());
515
0
    out_it_ = outvecs_.begin();
516
517
    // Check if we previously decompressed some data that did not fit into output buffer.
518
    // So copy it now. See DecompressChunk for details.
519
5.59k
    while (!prev_decompress_data_left_->empty()) {
520
508
      auto len = std::min(prev_decompress_data_left_->size(), out_it_->iov_len);
521
508
      memcpy(out_it_->iov_base, prev_decompress_data_left_->data(), len);
522
508
      total_add_ += len;
523
508
      IoVecRemovePrefix(len, &*out_it_);
524
508
      prev_decompress_data_left_->remove_prefix(len);
525
508
      if (out_it_->iov_len == 0) {
526
10
        if (++out_it_ == outvecs_.end()) {
527
10
          out->DataAppended(total_add_);
528
10
          return ReadBufferFull(out->Full());
529
10
        }
530
10
      }
531
508
    }
532
533
    // Remaining piece of data in the previous vec.
534
5.08k
    Slice prev_input_slice;
535
5.31k
    for (const auto& input_vec : inp->AppendedVecs()) {
536
5.31k
      Slice input_slice(static_cast<char*>(input_vec.iov_base), input_vec.iov_len);
537
5.31k
      if (!prev_input_slice.empty()) {
538
223
        size_t chunk_size;
539
223
        if (prev_input_slice.size() >= kLZ4HeaderLen) {
540
          // Size fully contained in the previous block.
541
223
          chunk_size = BigEndian::Load16(prev_input_slice.data());
542
223
        } else 
if (0
prev_input_slice.size() + input_slice.size() < kLZ4HeaderLen0
) {
543
          // Did not receive header yet. So exit and wait for more data received.
544
0
          break;
545
0
        } else {
546
          // Size distributed between 2 blocks.
547
0
          char buf[kLZ4HeaderLen];
548
0
          memcpy(buf, prev_input_slice.data(), prev_input_slice.size());
549
0
          memcpy(buf + prev_input_slice.size(), input_slice.data(),
550
0
                 kLZ4HeaderLen - prev_input_slice.size());
551
0
          chunk_size = BigEndian::Load16(buf);
552
0
        }
553
223
        if (kLZ4HeaderLen + chunk_size > prev_input_slice.size() + input_slice.size()) {
554
          // Did not receive full block yet. So exit and wait for more data received.
555
          // TODO Here we rely on the fact that we use circular buffer and could have at most 2
556
          //      blocks. In general case chunk could be distributed across several buffers.
557
54
          break;
558
54
        }
559
169
        if (prev_input_slice.size() > kLZ4HeaderLen) {
560
          // Data is distributed between 2 buffers.
561
          // Have to copy it into separate input buffer so it will be a single continuous block.
562
169
          size_t size_in_prev_slice = prev_input_slice.size() - kLZ4HeaderLen;
563
169
          size_t size_in_current_slice = chunk_size - size_in_prev_slice;
564
169
          memcpy(input_buffer_, prev_input_slice.data() + kLZ4HeaderLen, size_in_prev_slice);
565
169
          memcpy(input_buffer_ + size_in_prev_slice, input_slice.data(), size_in_current_slice);
566
169
          RETURN_NOT_OK(DecompressChunk(Slice(input_buffer_, chunk_size)));
567
169
          input_slice.remove_prefix(size_in_current_slice);
568
169
        } else {
569
          // Only header (or part of header) was in previous buffer.
570
          // Could decompress from current buffer.
571
0
          input_slice.remove_prefix(kLZ4HeaderLen - prev_input_slice.size());
572
0
          RETURN_NOT_OK(DecompressChunk(input_slice.Prefix(chunk_size)));
573
0
          input_slice.remove_prefix(chunk_size);
574
0
        }
575
169
      }
576
      // Decompress all chunks contained in current buffer.
577
17.4k
      
while (5.25k
input_slice.size() >= kLZ4HeaderLen &&
out_it_ != outvecs_.end()13.3k
) {
578
12.8k
        size_t chunk_size = BigEndian::Load16(input_slice.data());
579
12.8k
        if (input_slice.size() < kLZ4HeaderLen + chunk_size) {
580
618
          break;
581
618
        }
582
12.2k
        input_slice.remove_prefix(kLZ4HeaderLen);
583
12.2k
        RETURN_NOT_OK(DecompressChunk(input_slice.Prefix(chunk_size)));
584
12.2k
        input_slice.remove_prefix(chunk_size);
585
12.2k
      }
586
5.25k
      prev_input_slice = input_slice;
587
      // DecompressChunk could increase out_it_, so check whether we still have output space.
588
5.25k
      if (out_it_ == outvecs_.end()) {
589
548
        break;
590
548
      }
591
5.25k
    }
592
5.08k
    inp->Consume(total_consumed_, Slice());
593
5.08k
    out->DataAppended(total_add_);
594
5.08k
    return ReadBufferFull(out->Full());
595
5.08k
  }
596
597
 private:
598
12.4k
  CHECKED_STATUS DecompressChunk(const Slice& input) {
599
12.4k
    int res = LZ4_decompress_safe(
600
12.4k
        input.cdata(), static_cast<char*>(out_it_->iov_base), narrow_cast<int>(input.size()),
601
12.4k
        narrow_cast<int>(out_it_->iov_len));
602
12.4k
    if (res <= 0) {
603
      // Unfortunately LZ4 does not provide information whether decryption failed because
604
      // of wrong data or it just does not fit into output buffer.
605
      // Try to decode to buffer that is big enough for max possible decompressed chunk.
606
500
      res = LZ4_decompress_safe(
607
500
          input.cdata(), output_buffer_, narrow_cast<int>(input.size()), kLZ4BufferSize);
608
500
      if (res <= 0) {
609
0
        return STATUS_FORMAT(RuntimeError, "Decompress failed: $0", res);
610
0
      }
611
612
      // Copy data from output buffer to provided read buffer.
613
500
      size_t size = res;
614
500
      char* buf = output_buffer_;
615
1.00k
      while (out_it_ != outvecs_.end()) {
616
500
        size_t len = std::min<size_t>(size, out_it_->iov_len);
617
500
        memcpy(out_it_->iov_base, buf, len);
618
500
        IoVecRemovePrefix(len, &*out_it_);
619
500
        size -= len;
620
500
        total_add_ += len;
621
500
        buf += len;
622
500
        if (out_it_->iov_len == 0) {
623
          // Need to fill next output io vec.
624
500
          ++out_it_;
625
500
        }
626
500
        if (size == 0) {
627
          // Fully copied all decompressed data.
628
0
          break;
629
0
        }
630
500
      }
631
500
      if (size != 0) {
632
        // Have more decompressed data than provided read buffer could accept.
633
500
        *prev_decompress_data_left_ = Slice(buf, size);
634
500
      }
635
11.9k
    } else {
636
11.9k
      IoVecRemovePrefix(res, &*out_it_);
637
11.9k
      total_add_ += res;
638
11.9k
      if (out_it_->iov_len == 0) {
639
52
        ++out_it_;
640
52
      }
641
11.9k
    }
642
643
    // Header was consumed by the caller, so we also add it here.
644
12.4k
    total_consumed_ += kLZ4HeaderLen + input.size();
645
12.4k
    return Status::OK();
646
12.4k
  }
647
648
  // LZ4 operates on continuous chunks of memory, so we use input and output buffers to combine
649
  // several iovecs into one continuous chunk when necessary.
650
  char* input_buffer_;
651
  char* output_buffer_;
652
  Slice* prev_decompress_data_left_;
653
654
  IoVecs outvecs_;
655
  IoVecs::iterator out_it_;
656
  size_t total_add_ = 0;
657
  size_t total_consumed_ = 0;
658
};
659
660
class LZ4Compressor : public Compressor {
661
 public:
662
  static constexpr char kId = 'L';
663
  static constexpr int kIndex = 3;
664
  static constexpr size_t kHeaderLen = kLZ4HeaderLen;
665
666
70
  explicit LZ4Compressor(MemTrackerPtr mem_tracker) {
667
70
    if (mem_tracker) {
668
0
      consumption_ = ScopedTrackedConsumption(std::move(mem_tracker), 2 * kLZ4BufferSize);
669
0
    }
670
70
  }
671
672
35
  OutboundDataPtr ConnectionHeader() override {
673
35
    return GetConnectionHeader<LZ4Compressor>();
674
35
  }
675
676
70
  CHECKED_STATUS Init() override {
677
70
    return Status::OK();
678
70
  }
679
680
0
  std::string ToString() const override {
681
0
    return "LZ4";
682
0
  }
683
684
  CHECKED_STATUS Compress(
685
4.14k
      const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) override {
686
    // Increment iterator in loop body to be able to check whether it is last iteration or not.
687
8.29k
    for (auto input_it = input.begin(); input_it != input.end();) {
688
4.14k
      Slice input_slice = input_it->AsSlice();
689
4.14k
      ++input_it;
690
16.5k
      while (!input_slice.empty()) {
691
12.4k
        Slice chunk;
692
        // Split input into chunks of size kLZ4MaxChunkSize or less.
693
12.4k
        if (input_slice.size() > kLZ4MaxChunkSize) {
694
8.25k
          chunk = input_slice.Prefix(kLZ4MaxChunkSize);
695
8.25k
        } else {
696
4.14k
          chunk = input_slice;
697
4.14k
        }
698
12.4k
        input_slice.remove_prefix(chunk.size());
699
12.4k
        RefCntBuffer output(kHeaderLen + LZ4_compressBound(narrow_cast<int>(chunk.size())));
700
12.4k
        int res = LZ4_compress(
701
12.4k
            chunk.cdata(), output.data() + kHeaderLen, narrow_cast<int>(chunk.size()));
702
12.4k
        if (res <= 0) {
703
0
          return STATUS_FORMAT(RuntimeError, "LZ4 compression failed: $0", res);
704
0
        }
705
12.4k
        BigEndian::Store16(output.data(), res);
706
12.4k
        output.Shrink(kHeaderLen + res);
707
12.4k
        RETURN_NOT_OK(stream->SendToLower(std::make_shared<SingleBufferOutboundData>(
708
12.4k
            std::move(output),
709
            // We processed last buffer, attach data to it, so it will be notified when this buffer
710
            // is transferred.
711
12.4k
            input_slice.empty() && input_it == input.end() ? std::move(data) : nullptr)));
712
12.4k
      }
713
4.14k
    }
714
715
4.14k
    return Status::OK();
716
4.14k
  }
717
718
5.09k
  Result<ReadBufferFull> Decompress(StreamReadBuffer* inp, StreamReadBuffer* out) override {
719
5.09k
    LZ4DecompressState state(
720
5.09k
        decompress_input_buf_, decompress_output_buf_, &prev_decompress_data_left_);
721
5.09k
    return state.Execute(inp, out);
722
5.09k
  }
723
724
 private:
725
  char decompress_input_buf_[kLZ4BufferSize];
726
  char decompress_output_buf_[kLZ4BufferSize];
727
  Slice prev_decompress_data_left_;
728
  ScopedTrackedConsumption consumption_;
729
};
730
731
#undef LZ4
732
#define YB_COMPRESSION_ALGORITHMS (Zlib)(Snappy)(LZ4)
733
734
#define YB_CREATE_COMPRESSOR_CASE(r, data, name) \
735
380
  case BOOST_PP_CAT(name, Compressor)::data: \
736
380
      return std::make_unique<BOOST_PP_CAT(name, Compressor)>(std::move(mem_tracker));
737
738
609k
std::unique_ptr<Compressor> CreateCompressor(char sign, MemTrackerPtr mem_tracker) {
739
609k
  switch (sign) {
740
184
BOOST_PP_SEQ_FOR_EACH
(0
YB_CREATE_COMPRESSOR_CASE, kId, YB_COMPRESSION_ALGORITHMS)
741
609k
    default:
742
609k
      return nullptr;
743
609k
  }
744
609k
}
745
746
572k
std::unique_ptr<Compressor> CreateOutboundCompressor(MemTrackerPtr mem_tracker) {
747
572k
  auto algo = FLAGS_stream_compression_algo;
748
572k
  if (!algo) {
749
572k
    return nullptr;
750
572k
  }
751
14
  switch (algo) {
752
196
BOOST_PP_SEQ_FOR_EACH
(0
YB_CREATE_COMPRESSOR_CASE, kIndex, YB_COMPRESSION_ALGORITHMS)
753
0
    default:
754
0
      YB_LOG_EVERY_N_SECS(DFATAL, 5) << "Unknown compression algorithm: " << algo;
755
0
      return nullptr;
756
14
  }
757
14
}
758
759
class CompressedRefiner : public StreamRefiner {
760
 public:
761
3.68M
  CompressedRefiner() = default;
762
763
 private:
764
3.68M
  void Start(RefinedStream* stream) override {
765
3.68M
    stream_ = stream;
766
3.68M
  }
767
768
619k
  CHECKED_STATUS ProcessHeader() override {
769
619k
    constexpr int kHeaderLen = 3;
770
771
619k
    auto data = stream_->ReadBuffer().AppendedVecs();
772
619k
    if (data.empty() || 
data[0].iov_len < kHeaderLen619k
) {
773
      // Did not receive enough bytes to make a decision.
774
      // So just wait more bytes.
775
0
      return Status::OK();
776
0
    }
777
778
619k
    const auto* bytes = static_cast<const uint8_t*>(data[0].iov_base);
779
619k
    if (bytes[0] == 'Y' && 
bytes[1] == 'B'609k
) {
780
609k
      compressor_ = CreateCompressor(bytes[2], stream_->buffer_tracker());
781
609k
      if (compressor_) {
782
184
        RETURN_NOT_OK(compressor_->Init());
783
184
        RETURN_NOT_OK(stream_->StartHandshake());
784
184
        stream_->ReadBuffer().Consume(kHeaderLen, Slice());
785
184
        return Status::OK();
786
184
      }
787
609k
    }
788
789
    // Don't use compression on this stream.
790
619k
    return stream_->Established(RefinedStreamState::kDisabled);
791
619k
  }
792
793
16.8k
  CHECKED_STATUS Send(OutboundDataPtr data) override {
794
16.8k
    boost::container::small_vector<RefCntBuffer, 10> input;
795
16.8k
    data->Serialize(&input);
796
16.8k
    return compressor_->Compress(input, stream_, std::move(data));
797
16.8k
  }
798
799
575k
  CHECKED_STATUS Handshake() override {
800
575k
    if (stream_->local_side() == LocalSide::kClient) {
801
572k
      compressor_ = CreateOutboundCompressor(stream_->buffer_tracker());
802
574k
      if (
!compressor_572k
) {
803
574k
        return stream_->Established(RefinedStreamState::kDisabled);
804
574k
      }
805
18.4E
      RETURN_NOT_OK(compressor_->Init());
806
18.4E
      RETURN_NOT_OK(stream_->SendToLower(compressor_->ConnectionHeader()));
807
18.4E
    }
808
809
992
    return stream_->Established(RefinedStreamState::kEnabled);
810
575k
  }
811
812
20.5k
  Result<ReadBufferFull> Read(StreamReadBuffer* out) override {
813
20.5k
    
VLOG_WITH_PREFIX0
(4) << __func__0
;
814
815
20.5k
    return compressor_->Decompress(&stream_->ReadBuffer(), out);
816
20.5k
  }
817
818
22.6M
  const Protocol* GetProtocol() override {
819
22.6M
    return CompressedStreamProtocol();
820
22.6M
  }
821
822
0
  std::string ToString() const override {
823
0
    return compressor_ ? compressor_->ToString() : "PLAIN";
824
0
  }
825
826
0
  const std::string& LogPrefix() const {
827
0
    return stream_->LogPrefix();
828
0
  }
829
830
  RefinedStream* stream_ = nullptr;
831
  std::unique_ptr<Compressor> compressor_ = nullptr;
832
};
833
834
} // namespace
835
836
22.7M
const Protocol* CompressedStreamProtocol() {
837
22.7M
  static Protocol result("tcpc");
838
22.7M
  return &result;
839
22.7M
}
840
841
StreamFactoryPtr CompressedStreamFactory(
842
23.5k
    StreamFactoryPtr lower_layer_factory, const MemTrackerPtr& buffer_tracker) {
843
23.5k
  return std::make_shared<RefinedStreamFactory>(
844
3.67M
      std::move(lower_layer_factory), buffer_tracker, [](const StreamCreateData& data) {
845
3.67M
    return std::make_unique<CompressedRefiner>();
846
3.67M
  });
847
23.5k
}
848
849
}  // namespace rpc
850
}  // namespace yb