/Users/deen/code/yugabyte-db/src/yb/integration-tests/compressed_stream-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/ql-dml-test-base.h" |
15 | | #include "yb/client/schema.h" |
16 | | #include "yb/client/session.h" |
17 | | #include "yb/client/table_handle.h" |
18 | | #include "yb/client/yb_op.h" |
19 | | |
20 | | #include "yb/common/ql_value.h" |
21 | | |
22 | | #include "yb/rpc/compressed_stream.h" |
23 | | #include "yb/rpc/messenger.h" |
24 | | #include "yb/rpc/tcp_stream.h" |
25 | | |
26 | | #include "yb/util/size_literals.h" |
27 | | |
28 | | #include "yb/yql/cql/ql/util/statement_result.h" |
29 | | |
30 | | using namespace std::literals; |
31 | | |
32 | | DECLARE_int32(stream_compression_algo); |
33 | | DECLARE_bool(enable_stream_compression); |
34 | | |
35 | | namespace yb { |
36 | | |
37 | | class CompressedStreamTest : public client::KeyValueTableTest<MiniCluster> { |
38 | | public: |
39 | 2 | void SetUp() override { |
40 | 2 | FLAGS_enable_stream_compression = true; |
41 | 2 | FLAGS_stream_compression_algo = 1; |
42 | 2 | KeyValueTableTest::SetUp(); |
43 | 2 | } |
44 | | |
45 | 0 | CHECKED_STATUS CreateClient() override { |
46 | 0 | auto host = "127.0.0.52"; |
47 | 0 | client_ = VERIFY_RESULT(DoCreateClient(host, host)); |
48 | 0 | return Status::OK(); |
49 | 0 | } |
50 | | |
51 | | Result<std::unique_ptr<client::YBClient>> DoCreateClient( |
52 | 0 | const std::string& name, const std::string& host) { |
53 | 0 | rpc::MessengerBuilder messenger_builder("test_client"); |
54 | 0 | messenger_builder.SetListenProtocol(rpc::CompressedStreamProtocol()); |
55 | 0 | messenger_builder.AddStreamFactory( |
56 | 0 | rpc::CompressedStreamProtocol(), |
57 | 0 | CompressedStreamFactory(rpc::TcpStream::Factory(), MemTracker::GetRootTracker())); |
58 | 0 | auto messenger = VERIFY_RESULT(messenger_builder.Build()); |
59 | 0 | messenger->TEST_SetOutboundIpBase(VERIFY_RESULT(HostToAddress(host))); |
60 | 0 | return cluster_->CreateClient(std::move(messenger)); |
61 | 0 | } |
62 | | |
63 | | void TestSimpleOps(); |
64 | | }; |
65 | | |
66 | 0 | void CompressedStreamTest::TestSimpleOps() { |
67 | 0 | CreateTable(client::Transactional::kFalse); |
68 | |
|
69 | 0 | const int32_t kKey = 1; |
70 | 0 | const int32_t kValue = 2; |
71 | |
|
72 | 0 | { |
73 | 0 | auto session = NewSession(); |
74 | 0 | auto op = ASSERT_RESULT(WriteRow(session, kKey, kValue)); |
75 | 0 | ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK); |
76 | 0 | } |
77 | |
|
78 | 0 | { |
79 | 0 | auto value = ASSERT_RESULT(SelectRow(NewSession(), kKey)); |
80 | 0 | ASSERT_EQ(kValue, value); |
81 | 0 | } |
82 | 0 | } |
83 | | |
84 | 0 | TEST_F(CompressedStreamTest, Simple) { |
85 | 0 | TestSimpleOps(); |
86 | 0 | } |
87 | | |
88 | 0 | TEST_F(CompressedStreamTest, BigWrite) { |
89 | 0 | client::YBSchemaBuilder builder; |
90 | 0 | builder.AddColumn(kKeyColumn)->Type(INT32)->HashPrimaryKey()->NotNull(); |
91 | 0 | builder.AddColumn(kValueColumn)->Type(STRING); |
92 | |
|
93 | 0 | ASSERT_OK(table_.Create(client::kTableName, 1, client_.get(), &builder)); |
94 | |
|
95 | 0 | const int32_t kKey = 1; |
96 | 0 | const std::string kValue(64_KB, 'X'); |
97 | |
|
98 | 0 | auto session = NewSession(); |
99 | 0 | { |
100 | 0 | const auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
101 | 0 | auto* const req = op->mutable_request(); |
102 | 0 | QLAddInt32HashValue(req, kKey); |
103 | 0 | table_.AddStringColumnValue(req, kValueColumn, kValue); |
104 | 0 | ASSERT_OK(session->ApplyAndFlush(op)); |
105 | 0 | ASSERT_OK(CheckOp(op.get())); |
106 | 0 | } |
107 | |
|
108 | 0 | { |
109 | 0 | const auto op = table_.NewReadOp(); |
110 | 0 | auto* const req = op->mutable_request(); |
111 | 0 | QLAddInt32HashValue(req, kKey); |
112 | 0 | table_.AddColumns({kValueColumn}, req); |
113 | 0 | ASSERT_OK(session->ApplyAndFlush(op)); |
114 | 0 | ASSERT_OK(CheckOp(op.get())); |
115 | 0 | auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock(); |
116 | 0 | ASSERT_EQ(rowblock->row_count(), 1); |
117 | 0 | ASSERT_EQ(kValue, rowblock->row(0).column(0).string_value()); |
118 | 0 | } |
119 | 0 | } |
120 | | |
121 | | } // namespace yb |