YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/compressed_stream-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/ql-dml-test-base.h"
15
#include "yb/client/schema.h"
16
#include "yb/client/session.h"
17
#include "yb/client/table_handle.h"
18
#include "yb/client/yb_op.h"
19
20
#include "yb/common/ql_value.h"
21
22
#include "yb/rpc/compressed_stream.h"
23
#include "yb/rpc/messenger.h"
24
#include "yb/rpc/tcp_stream.h"
25
26
#include "yb/util/size_literals.h"
27
28
#include "yb/yql/cql/ql/util/statement_result.h"
29
30
using namespace std::literals;
31
32
DECLARE_int32(stream_compression_algo);
33
DECLARE_bool(enable_stream_compression);
34
35
namespace yb {
36
37
class CompressedStreamTest : public client::KeyValueTableTest<MiniCluster> {
38
 public:
39
2
  void SetUp() override {
40
2
    FLAGS_enable_stream_compression = true;
41
2
    FLAGS_stream_compression_algo = 1;
42
2
    KeyValueTableTest::SetUp();
43
2
  }
44
45
0
  CHECKED_STATUS CreateClient() override {
46
0
    auto host = "127.0.0.52";
47
0
    client_ = VERIFY_RESULT(DoCreateClient(host, host));
48
0
    return Status::OK();
49
0
  }
50
51
  Result<std::unique_ptr<client::YBClient>> DoCreateClient(
52
0
      const std::string& name, const std::string& host) {
53
0
    rpc::MessengerBuilder messenger_builder("test_client");
54
0
    messenger_builder.SetListenProtocol(rpc::CompressedStreamProtocol());
55
0
    messenger_builder.AddStreamFactory(
56
0
        rpc::CompressedStreamProtocol(),
57
0
        CompressedStreamFactory(rpc::TcpStream::Factory(), MemTracker::GetRootTracker()));
58
0
    auto messenger = VERIFY_RESULT(messenger_builder.Build());
59
0
    messenger->TEST_SetOutboundIpBase(VERIFY_RESULT(HostToAddress(host)));
60
0
    return cluster_->CreateClient(std::move(messenger));
61
0
  }
62
63
  void TestSimpleOps();
64
};
65
66
0
void CompressedStreamTest::TestSimpleOps() {
67
0
  CreateTable(client::Transactional::kFalse);
68
69
0
  const int32_t kKey = 1;
70
0
  const int32_t kValue = 2;
71
72
0
  {
73
0
    auto session = NewSession();
74
0
    auto op = ASSERT_RESULT(WriteRow(session, kKey, kValue));
75
0
    ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK);
76
0
  }
77
78
0
  {
79
0
    auto value = ASSERT_RESULT(SelectRow(NewSession(), kKey));
80
0
    ASSERT_EQ(kValue, value);
81
0
  }
82
0
}
83
84
0
TEST_F(CompressedStreamTest, Simple) {
85
0
  TestSimpleOps();
86
0
}
87
88
0
TEST_F(CompressedStreamTest, BigWrite) {
89
0
  client::YBSchemaBuilder builder;
90
0
  builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull();
91
0
  builder.AddColumn(kValueColumn)->Type(STRING);
92
93
0
  ASSERT_OK(table_.Create(client::kTableName, 1, client_.get(), &builder));
94
95
0
  const int32_t kKey = 1;
96
0
  const std::string kValue(64_KB, 'X');
97
98
0
  auto session = NewSession();
99
0
  {
100
0
    const auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
101
0
    auto* const req = op->mutable_request();
102
0
    QLAddInt32HashValue(req, kKey);
103
0
    table_.AddStringColumnValue(req, kValueColumn, kValue);
104
0
    ASSERT_OK(session->ApplyAndFlush(op));
105
0
    ASSERT_OK(CheckOp(op.get()));
106
0
  }
107
108
0
  {
109
0
    const auto op = table_.NewReadOp();
110
0
    auto* const req = op->mutable_request();
111
0
    QLAddInt32HashValue(req, kKey);
112
0
    table_.AddColumns({kValueColumn}, req);
113
0
    ASSERT_OK(session->ApplyAndFlush(op));
114
0
    ASSERT_OK(CheckOp(op.get()));
115
0
    auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock();
116
0
    ASSERT_EQ(rowblock->row_count(), 1);
117
0
    ASSERT_EQ(kValue, rowblock->row(0).column(0).string_value());
118
0
  }
119
0
}
120
121
} // namespace yb