/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_wrapper-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/client/yb_table_name.h" |
14 | | |
15 | | #include "yb/common/wire_protocol.h" |
16 | | |
17 | | #include "yb/master/master_admin.proxy.h" |
18 | | #include "yb/master/master_ddl.pb.h" |
19 | | |
20 | | #include "yb/rpc/rpc_controller.h" |
21 | | |
22 | | #include "yb/util/path_util.h" |
23 | | #include "yb/util/result.h" |
24 | | #include "yb/util/tsan_util.h" |
25 | | |
26 | | #include "yb/yql/pgwrapper/pg_wrapper_test_base.h" |
27 | | |
28 | | using yb::master::FlushTablesRequestPB; |
29 | | using yb::master::FlushTablesResponsePB; |
30 | | using yb::master::IsFlushTablesDoneRequestPB; |
31 | | using yb::master::IsFlushTablesDoneResponsePB; |
32 | | using yb::StatusFromPB; |
33 | | |
34 | | using std::string; |
35 | | using std::vector; |
36 | | using std::unique_ptr; |
37 | | |
38 | | using yb::client::YBTableName; |
39 | | using yb::rpc::RpcController; |
40 | | |
41 | | using namespace std::literals; |
42 | | |
43 | | namespace yb { |
44 | | namespace pgwrapper { |
45 | | namespace { |
46 | | |
47 | | template<bool Auth, bool Encrypted> |
48 | | struct ConnectionStrategy { |
49 | | static const bool UseAuth = Auth; |
50 | | static const bool EncryptConnection = Encrypted; |
51 | | }; |
52 | | |
53 | | template<class Strategy> |
54 | | class PgWrapperTestHelper: public PgCommandTestBase { |
55 | | protected: |
56 | 0 | PgWrapperTestHelper() : PgCommandTestBase(Strategy::UseAuth, Strategy::EncryptConnection) {} Unexecuted instantiation: pg_wrapper-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_119PgWrapperTestHelperINS1_18ConnectionStrategyILb1ELb0EEEEC2Ev Unexecuted instantiation: pg_wrapper-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_119PgWrapperTestHelperINS1_18ConnectionStrategyILb0ELb1EEEEC2Ev Unexecuted instantiation: pg_wrapper-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_119PgWrapperTestHelperINS1_18ConnectionStrategyILb1ELb1EEEEC2Ev Unexecuted instantiation: pg_wrapper-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_119PgWrapperTestHelperINS1_18ConnectionStrategyILb0ELb0EEEEC2Ev |
57 | | }; |
58 | | |
59 | | } // namespace |
60 | | |
61 | | |
62 | | YB_DEFINE_ENUM(FlushOrCompaction, (kFlush)(kCompaction)); |
63 | | |
64 | | class PgWrapperTest : public PgWrapperTestHelper<ConnectionStrategy<false, false>> { |
65 | | protected: |
66 | 0 | void FlushOrCompact(string table_id, FlushOrCompaction flush_or_compaction) { |
67 | 0 | RpcController rpc; |
68 | 0 | auto master_proxy = cluster_->GetMasterProxy<master::MasterAdminProxy>(); |
69 | |
|
70 | 0 | FlushTablesResponsePB flush_tables_resp; |
71 | 0 | FlushTablesRequestPB compaction_req; |
72 | 0 | compaction_req.add_tables()->set_table_id(table_id); |
73 | 0 | compaction_req.set_is_compaction(flush_or_compaction == FlushOrCompaction::kCompaction); |
74 | 0 | LOG(INFO) << "Initiating a " << flush_or_compaction << " request for table " << table_id; |
75 | 0 | ASSERT_OK(master_proxy.FlushTables(compaction_req, &flush_tables_resp, &rpc)); |
76 | 0 | LOG(INFO) << "Initiated a " << flush_or_compaction << " request for table " << table_id; |
77 | |
|
78 | 0 | if (flush_tables_resp.has_error()) { |
79 | 0 | FAIL() << flush_tables_resp.error().ShortDebugString(); |
80 | 0 | } |
81 | | |
82 | 0 | IsFlushTablesDoneRequestPB wait_req; |
83 | 0 | IsFlushTablesDoneResponsePB wait_resp; |
84 | 0 | wait_req.set_flush_request_id(flush_tables_resp.flush_request_id()); |
85 | |
|
86 | 0 | ASSERT_OK(WaitFor( |
87 | 0 | [&]() -> Result<bool> { |
88 | 0 | rpc.Reset(); |
89 | 0 | RETURN_NOT_OK(master_proxy.IsFlushTablesDone(wait_req, &wait_resp, &rpc)); |
90 | |
|
91 | 0 | if (wait_resp.has_error()) { |
92 | 0 | if (wait_resp.error().status().code() == AppStatusPB::NOT_FOUND) { |
93 | 0 | return STATUS_FORMAT( |
94 | 0 | NotFound, "$0 request was deleted: $1", |
95 | 0 | flush_or_compaction, flush_tables_resp.flush_request_id()); |
96 | 0 | } |
97 | |
|
98 | 0 | return StatusFromPB(wait_resp.error().status()); |
99 | 0 | } |
100 | |
|
101 | 0 | if (wait_resp.done()) { |
102 | 0 | if (!wait_resp.success()) { |
103 | 0 | return STATUS_FORMAT( |
104 | 0 | InternalError, "$0 request failed: $1", flush_or_compaction, |
105 | 0 | wait_resp.ShortDebugString()); |
106 | 0 | } |
107 | 0 | return true; |
108 | 0 | } |
109 | 0 | return false; |
110 | 0 | }, |
111 | 0 | MonoDelta::FromSeconds(20), |
112 | 0 | "Compaction" |
113 | 0 | )); |
114 | 0 | LOG(INFO) << "Table " << table_id << " " << flush_or_compaction << " finished"; |
115 | 0 | } |
116 | | }; |
117 | | |
118 | | using PgWrapperTestAuth = PgWrapperTestHelper<ConnectionStrategy<true, false>>; |
119 | | using PgWrapperTestSecure = PgWrapperTestHelper<ConnectionStrategy<false, true>>; |
120 | | using PgWrapperTestAuthSecure = PgWrapperTestHelper<ConnectionStrategy<true, true>>; |
121 | | |
122 | 0 | TEST_F(PgWrapperTestAuth, YB_DISABLE_TEST_IN_TSAN(TestConnectionAuth)) { |
123 | 0 | ASSERT_NO_FATALS(RunPsqlCommand( |
124 | 0 | "SELECT clientdn FROM pg_stat_ssl WHERE ssl=true", |
125 | 0 | R"#( |
126 | 0 | clientdn |
127 | 0 | ---------- |
128 | 0 | (0 rows) |
129 | 0 | )#" |
130 | 0 | )); |
131 | 0 | } |
132 | | |
133 | 0 | TEST_F(PgWrapperTestSecure, YB_DISABLE_TEST_IN_TSAN(TestConnectionTLS)) { |
134 | 0 | ASSERT_NO_FATALS(RunPsqlCommand( |
135 | 0 | "SELECT clientdn FROM pg_stat_ssl WHERE ssl=true", |
136 | 0 | R"#( |
137 | 0 | clientdn |
138 | 0 | ------------------------- |
139 | 0 | /O=YugaByte/CN=yugabyte |
140 | 0 | (1 row) |
141 | 0 | )#" |
142 | 0 | )); |
143 | 0 | } |
144 | | |
145 | | |
146 | 0 | TEST_F(PgWrapperTestAuthSecure, YB_DISABLE_TEST_IN_TSAN(TestConnectionAuthTLS)) { |
147 | 0 | ASSERT_NO_FATALS(RunPsqlCommand( |
148 | 0 | "SELECT clientdn FROM pg_stat_ssl WHERE ssl=true", |
149 | 0 | R"#( |
150 | 0 | clientdn |
151 | 0 | ------------------------- |
152 | 0 | /O=YugaByte/CN=yugabyte |
153 | 0 | (1 row) |
154 | 0 | )#" |
155 | 0 | )); |
156 | 0 | } |
157 | | |
158 | | TEST_F(PgWrapperTest, YB_DISABLE_TEST_IN_TSAN(TestStartStop)) { |
159 | | ASSERT_NO_FATALS(CreateTable("CREATE TABLE mytbl (k INT PRIMARY KEY, v TEXT)")); |
160 | | ASSERT_NO_FATALS(InsertOneRow("INSERT INTO mytbl (k, v) VALUES (100, 'foo')")); |
161 | | ASSERT_NO_FATALS(InsertOneRow("INSERT INTO mytbl (k, v) VALUES (200, 'bar')")); |
162 | | ASSERT_NO_FATALS(RunPsqlCommand( |
163 | | "SELECT k, v FROM mytbl ORDER BY k", |
164 | | R"#( |
165 | | k | v |
166 | | -----+----- |
167 | | 100 | foo |
168 | | 200 | bar |
169 | | (2 rows) |
170 | | )#" |
171 | | )); |
172 | | } |
173 | | |
174 | 0 | TEST_F(PgWrapperTest, YB_DISABLE_TEST_IN_TSAN(TestCompactHistoryWithTxn)) { |
175 | 0 | RpcController rpc; |
176 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(60)); |
177 | 0 | ASSERT_NO_FATALS(CreateTable("CREATE TABLE mytbl (k INT PRIMARY KEY, v TEXT)")); |
178 | |
|
179 | 0 | ASSERT_NO_FATALS(InsertOneRow("INSERT INTO mytbl (k, v) VALUES (100, 'value1')")); |
180 | 0 | string table_id; |
181 | 0 | LOG(INFO) << "Preparing to force a compaction on the table we created"; |
182 | 0 | { |
183 | 0 | auto client = ASSERT_RESULT(cluster_->CreateClient()); |
184 | |
|
185 | 0 | const auto tables = ASSERT_RESULT(client->ListTables()); |
186 | 0 | for (const auto& table : tables) { |
187 | 0 | if (table.has_table() && table.table_name() == "mytbl") { |
188 | 0 | table_id = table.table_id(); |
189 | 0 | break; |
190 | 0 | } |
191 | 0 | } |
192 | 0 | } |
193 | 0 | ASSERT_TRUE(!table_id.empty()); |
194 | |
|
195 | 0 | for (int i = 2; i <= 5; ++i) { |
196 | 0 | ASSERT_NO_FATALS(UpdateOneRow( |
197 | 0 | Format("UPDATE mytbl SET v = 'value$0' WHERE k = 100", i))); |
198 | 0 | ASSERT_NO_FATALS(FlushOrCompact(table_id, FlushOrCompaction::kFlush)); |
199 | 0 | } |
200 | |
|
201 | 0 | ASSERT_NO_FATALS(RunPsqlCommand( |
202 | 0 | "SELECT k, v FROM mytbl WHERE k = 100", |
203 | 0 | R"#( |
204 | 0 | k | v |
205 | 0 | -----+-------- |
206 | 0 | 100 | value5 |
207 | 0 | (1 row) |
208 | 0 | )#" |
209 | 0 | )); |
210 | |
|
211 | 0 | std::thread read_txn_thread([this]{ |
212 | 0 | LOG(INFO) << "Starting transaction to read data and wait"; |
213 | 0 | RunPsqlCommand( |
214 | 0 | "BEGIN; " |
215 | 0 | "SELECT * FROM mytbl WHERE k = 100; " |
216 | 0 | "SELECT pg_sleep(30); " |
217 | 0 | "SELECT * FROM mytbl WHERE k = 100; " |
218 | 0 | "COMMIT; " |
219 | 0 | "SELECT * FROM mytbl WHERE k = 100;", |
220 | 0 | R"#( |
221 | 0 | BEGIN |
222 | 0 | k | v |
223 | 0 | -----+-------- |
224 | 0 | 100 | value5 |
225 | 0 | (1 row) |
226 | 0 |
|
227 | 0 | pg_sleep |
228 | 0 | ---------- |
229 | 0 |
|
230 | 0 | (1 row) |
231 | 0 |
|
232 | 0 | ROLLBACK |
233 | 0 | k | v |
234 | 0 | -----+-------- |
235 | 0 | 100 | value7 |
236 | 0 | (1 row) |
237 | 0 | )#"); |
238 | 0 | LOG(INFO) << "Transaction finished"; |
239 | 0 | }); |
240 | | |
241 | | // Give our transaction a chance to start up the backend and perform the first read. |
242 | 0 | std::this_thread::sleep_for(5s); |
243 | | |
244 | | // Generate a few more files. |
245 | 0 | for (int i = 6; i <= 7; ++i) { |
246 | 0 | ASSERT_NO_FATALS(UpdateOneRow( |
247 | 0 | Format("UPDATE mytbl SET v = 'value$0' WHERE k = 100", i))); |
248 | 0 | ASSERT_NO_FATALS(FlushOrCompact(table_id, FlushOrCompaction::kFlush)); |
249 | 0 | } |
250 | |
|
251 | 0 | ASSERT_NO_FATALS(FlushOrCompact(table_id, FlushOrCompaction::kCompaction)); |
252 | |
|
253 | 0 | read_txn_thread.join(); |
254 | 0 | } |
255 | | |
256 | | TEST_F(PgWrapperTest, YB_DISABLE_TEST_IN_TSAN(InsertSelect)) { |
257 | | ASSERT_NO_FATALS(CreateTable("CREATE TABLE mytbl (k INT, v TEXT)")); |
258 | | |
259 | | ASSERT_NO_FATALS(InsertOneRow("INSERT INTO mytbl (k, v) VALUES (1, 'abc')")); |
260 | | for (size_t i = 0; i != RegularBuildVsSanitizers(7U, 1U); ++i) { |
261 | | ASSERT_NO_FATALS(InsertRows( |
262 | | "INSERT INTO mytbl SELECT * FROM mytbl", 1 << i /* expected_rows */)); |
263 | | } |
264 | | } |
265 | | |
266 | | class PgWrapperOneNodeClusterTest : public YBMiniClusterTestBase<ExternalMiniCluster> { |
267 | | public: |
268 | 0 | void SetUp() { |
269 | 0 | YBMiniClusterTestBase::SetUp(); |
270 | |
|
271 | 0 | ExternalMiniClusterOptions opts; |
272 | 0 | opts.enable_ysql = true; |
273 | 0 | opts.num_tablet_servers = 1; |
274 | |
|
275 | 0 | cluster_.reset(new ExternalMiniCluster(opts)); |
276 | 0 | ASSERT_OK(cluster_->Start()); |
277 | |
|
278 | 0 | pg_ts_ = cluster_->tablet_server(0); |
279 | | |
280 | | // TODO: fix cluster verification for PostgreSQL tables. |
281 | 0 | DontVerifyClusterBeforeNextTearDown(); |
282 | 0 | } |
283 | | |
284 | | protected: |
285 | | ExternalTabletServer* pg_ts_ = nullptr; |
286 | | |
287 | | }; |
288 | | |
289 | 0 | TEST_F(PgWrapperOneNodeClusterTest, YB_DISABLE_TEST_IN_TSAN(TestPostgresPid)) { |
290 | 0 | MonoDelta timeout = 15s; |
291 | 0 | int tserver_count = 1; |
292 | |
|
293 | 0 | std::string pid_file = JoinPathSegments(pg_ts_->GetRootDir(), "pg_data", "postmaster.pid"); |
294 | | // Wait for postgres server to start and setup postmaster.pid file |
295 | 0 | ASSERT_OK(LoggedWaitFor( |
296 | 0 | [this, &pid_file] { |
297 | 0 | return env_->FileExists(pid_file); |
298 | 0 | }, timeout, "Waiting for postgres server to create postmaster.pid file")); |
299 | 0 | ASSERT_TRUE(env_->FileExists(pid_file)); |
300 | | |
301 | | // Shutdown tserver and wait for postgres server to shut down and delete postmaster.pid file |
302 | 0 | pg_ts_->Shutdown(); |
303 | 0 | ASSERT_OK(LoggedWaitFor( |
304 | 0 | [this, &pid_file] { |
305 | 0 | return !env_->FileExists(pid_file); |
306 | 0 | }, timeout, "Waiting for postgres server to shutdown")); |
307 | 0 | ASSERT_FALSE(env_->FileExists(pid_file)); |
308 | | |
309 | | // Create empty postmaster.pid file and ensure that tserver can start up |
310 | | // Use sync_on_close flag to ensure that the file is flushed to disk when tserver tries to read it |
311 | 0 | std::unique_ptr<RWFile> file; |
312 | 0 | RWFileOptions opts; |
313 | 0 | opts.sync_on_close = true; |
314 | 0 | opts.mode = Env::CREATE_IF_NON_EXISTING_TRUNCATE; |
315 | |
|
316 | 0 | ASSERT_OK(env_->NewRWFile(opts, pid_file, &file)); |
317 | 0 | ASSERT_OK(pg_ts_->Start(false /* start_cql_proxy */)); |
318 | 0 | ASSERT_OK(cluster_->WaitForTabletServerCount(tserver_count, timeout)); |
319 | | |
320 | | // Shutdown tserver and wait for postgres server to shutdown and delete postmaster.pid file |
321 | 0 | pg_ts_->Shutdown(); |
322 | 0 | ASSERT_OK(LoggedWaitFor( |
323 | 0 | [this, &pid_file] { |
324 | 0 | return !env_->FileExists(pid_file); |
325 | 0 | }, timeout, "Waiting for postgres server to shutdown", 100ms)); |
326 | 0 | ASSERT_FALSE(env_->FileExists(pid_file)); |
327 | | |
328 | | // Create postmaster.pid file with string pid (invalid) and ensure that tserver can start up |
329 | 0 | ASSERT_OK(env_->NewRWFile(opts, pid_file, &file)); |
330 | 0 | ASSERT_OK(file->Write(0, "abcde\n" + pid_file)); |
331 | 0 | ASSERT_OK(file->Close()); |
332 | |
|
333 | 0 | ASSERT_OK(pg_ts_->Start(false /* start_cql_proxy */)); |
334 | 0 | ASSERT_OK(cluster_->WaitForTabletServerCount(tserver_count, timeout)); |
335 | | |
336 | | // Shutdown tserver and wait for postgres server to shutdown and delete postmaster.pid file |
337 | 0 | pg_ts_->Shutdown(); |
338 | 0 | ASSERT_OK(LoggedWaitFor( |
339 | 0 | [this, &pid_file] { |
340 | 0 | return !env_->FileExists(pid_file); |
341 | 0 | }, timeout, "Waiting for postgres server to shutdown", 100ms)); |
342 | 0 | ASSERT_FALSE(env_->FileExists(pid_file)); |
343 | | |
344 | | // Create postgres pid file with integer pid (valid) and ensure that tserver can start up |
345 | 0 | ASSERT_OK(env_->NewRWFile(opts, pid_file, &file)); |
346 | 0 | ASSERT_OK(file->Write(0, "1002\n" + pid_file)); |
347 | 0 | ASSERT_OK(file->Close()); |
348 | |
|
349 | 0 | ASSERT_OK(pg_ts_->Start(false /* start_cql_proxy */)); |
350 | 0 | ASSERT_OK(cluster_->WaitForTabletServerCount(tserver_count, timeout)); |
351 | 0 | } |
352 | | |
353 | | } // namespace pgwrapper |
354 | | } // namespace yb |