YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_wrapper-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/client/yb_table_name.h"
14
15
#include "yb/common/wire_protocol.h"
16
17
#include "yb/master/master_admin.proxy.h"
18
#include "yb/master/master_ddl.pb.h"
19
20
#include "yb/rpc/rpc_controller.h"
21
22
#include "yb/util/path_util.h"
23
#include "yb/util/result.h"
24
#include "yb/util/tsan_util.h"
25
26
#include "yb/yql/pgwrapper/pg_wrapper_test_base.h"
27
28
using yb::master::FlushTablesRequestPB;
29
using yb::master::FlushTablesResponsePB;
30
using yb::master::IsFlushTablesDoneRequestPB;
31
using yb::master::IsFlushTablesDoneResponsePB;
32
using yb::StatusFromPB;
33
34
using std::string;
35
using std::vector;
36
using std::unique_ptr;
37
38
using yb::client::YBTableName;
39
using yb::rpc::RpcController;
40
41
using namespace std::literals;
42
43
namespace yb {
44
namespace pgwrapper {
45
namespace {
46
47
template<bool Auth, bool Encrypted>
48
struct ConnectionStrategy {
49
  static const bool UseAuth = Auth;
50
  static const bool EncryptConnection = Encrypted;
51
};
52
53
template<class Strategy>
54
class PgWrapperTestHelper: public PgCommandTestBase {
55
 protected:
56
0
  PgWrapperTestHelper() : PgCommandTestBase(Strategy::UseAuth, Strategy::EncryptConnection) {}
Unexecuted instantiation: pg_wrapper-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_119PgWrapperTestHelperINS1_18ConnectionStrategyILb1ELb0EEEEC2Ev
Unexecuted instantiation: pg_wrapper-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_119PgWrapperTestHelperINS1_18ConnectionStrategyILb0ELb1EEEEC2Ev
Unexecuted instantiation: pg_wrapper-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_119PgWrapperTestHelperINS1_18ConnectionStrategyILb1ELb1EEEEC2Ev
Unexecuted instantiation: pg_wrapper-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_119PgWrapperTestHelperINS1_18ConnectionStrategyILb0ELb0EEEEC2Ev
57
};
58
59
} // namespace
60
61
62
YB_DEFINE_ENUM(FlushOrCompaction, (kFlush)(kCompaction));
63
64
class PgWrapperTest : public PgWrapperTestHelper<ConnectionStrategy<false, false>> {
65
 protected:
66
0
  void FlushOrCompact(string table_id, FlushOrCompaction flush_or_compaction) {
67
0
    RpcController rpc;
68
0
    auto master_proxy = cluster_->GetMasterProxy<master::MasterAdminProxy>();
69
70
0
    FlushTablesResponsePB flush_tables_resp;
71
0
    FlushTablesRequestPB compaction_req;
72
0
    compaction_req.add_tables()->set_table_id(table_id);
73
0
    compaction_req.set_is_compaction(flush_or_compaction == FlushOrCompaction::kCompaction);
74
0
    LOG(INFO) << "Initiating a " << flush_or_compaction << " request for table " << table_id;
75
0
    ASSERT_OK(master_proxy.FlushTables(compaction_req, &flush_tables_resp, &rpc));
76
0
    LOG(INFO) << "Initiated a " << flush_or_compaction << " request for table " << table_id;
77
78
0
    if (flush_tables_resp.has_error()) {
79
0
      FAIL() << flush_tables_resp.error().ShortDebugString();
80
0
    }
81
82
0
    IsFlushTablesDoneRequestPB wait_req;
83
0
    IsFlushTablesDoneResponsePB wait_resp;
84
0
    wait_req.set_flush_request_id(flush_tables_resp.flush_request_id());
85
86
0
    ASSERT_OK(WaitFor(
87
0
        [&]() -> Result<bool> {
88
0
          rpc.Reset();
89
0
          RETURN_NOT_OK(master_proxy.IsFlushTablesDone(wait_req, &wait_resp, &rpc));
90
91
0
          if (wait_resp.has_error()) {
92
0
            if (wait_resp.error().status().code() == AppStatusPB::NOT_FOUND) {
93
0
              return STATUS_FORMAT(
94
0
                  NotFound, "$0 request was deleted: $1",
95
0
                  flush_or_compaction, flush_tables_resp.flush_request_id());
96
0
            }
97
98
0
            return StatusFromPB(wait_resp.error().status());
99
0
          }
100
101
0
          if (wait_resp.done()) {
102
0
            if (!wait_resp.success()) {
103
0
              return STATUS_FORMAT(
104
0
                  InternalError, "$0 request failed: $1", flush_or_compaction,
105
0
                  wait_resp.ShortDebugString());
106
0
            }
107
0
            return true;
108
0
          }
109
0
          return false;
110
0
        },
111
0
        MonoDelta::FromSeconds(20),
112
0
        "Compaction"
113
0
    ));
114
0
    LOG(INFO) << "Table " << table_id << " " << flush_or_compaction << " finished";
115
0
  }
116
};
117
118
using PgWrapperTestAuth = PgWrapperTestHelper<ConnectionStrategy<true, false>>;
119
using PgWrapperTestSecure = PgWrapperTestHelper<ConnectionStrategy<false, true>>;
120
using PgWrapperTestAuthSecure = PgWrapperTestHelper<ConnectionStrategy<true, true>>;
121
122
0
TEST_F(PgWrapperTestAuth, YB_DISABLE_TEST_IN_TSAN(TestConnectionAuth)) {
123
0
  ASSERT_NO_FATALS(RunPsqlCommand(
124
0
      "SELECT clientdn FROM pg_stat_ssl WHERE ssl=true",
125
0
      R"#(
126
0
         clientdn
127
0
        ----------
128
0
        (0 rows)
129
0
      )#"
130
0
  ));
131
0
}
132
133
0
TEST_F(PgWrapperTestSecure, YB_DISABLE_TEST_IN_TSAN(TestConnectionTLS)) {
134
0
  ASSERT_NO_FATALS(RunPsqlCommand(
135
0
      "SELECT clientdn FROM pg_stat_ssl WHERE ssl=true",
136
0
      R"#(
137
0
                clientdn
138
0
        -------------------------
139
0
         /O=YugaByte/CN=yugabyte
140
0
        (1 row)
141
0
      )#"
142
0
  ));
143
0
}
144
145
146
0
TEST_F(PgWrapperTestAuthSecure, YB_DISABLE_TEST_IN_TSAN(TestConnectionAuthTLS)) {
147
0
  ASSERT_NO_FATALS(RunPsqlCommand(
148
0
      "SELECT clientdn FROM pg_stat_ssl WHERE ssl=true",
149
0
      R"#(
150
0
                clientdn
151
0
        -------------------------
152
0
         /O=YugaByte/CN=yugabyte
153
0
        (1 row)
154
0
      )#"
155
0
  ));
156
0
}
157
158
TEST_F(PgWrapperTest, YB_DISABLE_TEST_IN_TSAN(TestStartStop)) {
159
  ASSERT_NO_FATALS(CreateTable("CREATE TABLE mytbl (k INT PRIMARY KEY, v TEXT)"));
160
  ASSERT_NO_FATALS(InsertOneRow("INSERT INTO mytbl (k, v) VALUES (100, 'foo')"));
161
  ASSERT_NO_FATALS(InsertOneRow("INSERT INTO mytbl (k, v) VALUES (200, 'bar')"));
162
  ASSERT_NO_FATALS(RunPsqlCommand(
163
      "SELECT k, v FROM mytbl ORDER BY k",
164
      R"#(
165
          k  |  v
166
        -----+-----
167
         100 | foo
168
         200 | bar
169
        (2 rows)
170
      )#"
171
  ));
172
}
173
174
0
TEST_F(PgWrapperTest, YB_DISABLE_TEST_IN_TSAN(TestCompactHistoryWithTxn)) {
175
0
  RpcController rpc;
176
0
  rpc.set_timeout(MonoDelta::FromSeconds(60));
177
0
  ASSERT_NO_FATALS(CreateTable("CREATE TABLE mytbl (k INT PRIMARY KEY, v TEXT)"));
178
179
0
  ASSERT_NO_FATALS(InsertOneRow("INSERT INTO mytbl (k, v) VALUES (100, 'value1')"));
180
0
  string table_id;
181
0
  LOG(INFO) << "Preparing to force a compaction on the table we created";
182
0
  {
183
0
    auto client = ASSERT_RESULT(cluster_->CreateClient());
184
185
0
    const auto tables = ASSERT_RESULT(client->ListTables());
186
0
    for (const auto& table : tables) {
187
0
      if (table.has_table() && table.table_name() == "mytbl") {
188
0
        table_id = table.table_id();
189
0
        break;
190
0
      }
191
0
    }
192
0
  }
193
0
  ASSERT_TRUE(!table_id.empty());
194
195
0
  for (int i = 2; i <= 5; ++i) {
196
0
    ASSERT_NO_FATALS(UpdateOneRow(
197
0
        Format("UPDATE mytbl SET v = 'value$0' WHERE k = 100", i)));
198
0
    ASSERT_NO_FATALS(FlushOrCompact(table_id, FlushOrCompaction::kFlush));
199
0
  }
200
201
0
  ASSERT_NO_FATALS(RunPsqlCommand(
202
0
      "SELECT k, v FROM mytbl WHERE k = 100",
203
0
      R"#(
204
0
          k  |   v
205
0
        -----+--------
206
0
         100 | value5
207
0
        (1 row)
208
0
      )#"
209
0
  ));
210
211
0
  std::thread read_txn_thread([this]{
212
0
    LOG(INFO) << "Starting transaction to read data and wait";
213
0
    RunPsqlCommand(
214
0
        "BEGIN; "
215
0
        "SELECT * FROM mytbl WHERE k = 100; "
216
0
        "SELECT pg_sleep(30); "
217
0
        "SELECT * FROM mytbl WHERE k = 100; "
218
0
        "COMMIT; "
219
0
        "SELECT * FROM mytbl WHERE k = 100;",
220
0
        R"#(
221
0
          BEGIN
222
0
            k  |   v
223
0
          -----+--------
224
0
           100 | value5
225
0
          (1 row)
226
0
227
0
           pg_sleep
228
0
          ----------
229
0
230
0
          (1 row)
231
0
232
0
          ROLLBACK
233
0
            k  |   v
234
0
          -----+--------
235
0
           100 | value7
236
0
          (1 row)
237
0
        )#");
238
0
    LOG(INFO) << "Transaction finished";
239
0
  });
240
241
  // Give our transaction a chance to start up the backend and perform the first read.
242
0
  std::this_thread::sleep_for(5s);
243
244
  // Generate a few more files.
245
0
  for (int i = 6; i <= 7; ++i) {
246
0
    ASSERT_NO_FATALS(UpdateOneRow(
247
0
        Format("UPDATE mytbl SET v = 'value$0' WHERE k = 100", i)));
248
0
    ASSERT_NO_FATALS(FlushOrCompact(table_id, FlushOrCompaction::kFlush));
249
0
  }
250
251
0
  ASSERT_NO_FATALS(FlushOrCompact(table_id, FlushOrCompaction::kCompaction));
252
253
0
  read_txn_thread.join();
254
0
}
255
256
TEST_F(PgWrapperTest, YB_DISABLE_TEST_IN_TSAN(InsertSelect)) {
257
  ASSERT_NO_FATALS(CreateTable("CREATE TABLE mytbl (k INT, v TEXT)"));
258
259
  ASSERT_NO_FATALS(InsertOneRow("INSERT INTO mytbl (k, v) VALUES (1, 'abc')"));
260
  for (size_t i = 0; i != RegularBuildVsSanitizers(7U, 1U); ++i) {
261
     ASSERT_NO_FATALS(InsertRows(
262
         "INSERT INTO mytbl SELECT * FROM mytbl", 1 << i /* expected_rows */));
263
  }
264
}
265
266
class PgWrapperOneNodeClusterTest : public YBMiniClusterTestBase<ExternalMiniCluster> {
267
 public:
268
0
  void SetUp() {
269
0
    YBMiniClusterTestBase::SetUp();
270
271
0
    ExternalMiniClusterOptions opts;
272
0
    opts.enable_ysql = true;
273
0
    opts.num_tablet_servers = 1;
274
275
0
    cluster_.reset(new ExternalMiniCluster(opts));
276
0
    ASSERT_OK(cluster_->Start());
277
278
0
    pg_ts_ = cluster_->tablet_server(0);
279
280
    // TODO: fix cluster verification for PostgreSQL tables.
281
0
    DontVerifyClusterBeforeNextTearDown();
282
0
  }
283
284
 protected:
285
  ExternalTabletServer* pg_ts_ = nullptr;
286
287
};
288
289
0
TEST_F(PgWrapperOneNodeClusterTest, YB_DISABLE_TEST_IN_TSAN(TestPostgresPid)) {
290
0
  MonoDelta timeout = 15s;
291
0
  int tserver_count = 1;
292
293
0
  std::string pid_file = JoinPathSegments(pg_ts_->GetRootDir(), "pg_data", "postmaster.pid");
294
  // Wait for postgres server to start and setup postmaster.pid file
295
0
  ASSERT_OK(LoggedWaitFor(
296
0
      [this, &pid_file] {
297
0
        return env_->FileExists(pid_file);
298
0
      }, timeout, "Waiting for postgres server to create postmaster.pid file"));
299
0
  ASSERT_TRUE(env_->FileExists(pid_file));
300
301
  // Shutdown tserver and wait for postgres server to shut down and delete postmaster.pid file
302
0
  pg_ts_->Shutdown();
303
0
  ASSERT_OK(LoggedWaitFor(
304
0
      [this, &pid_file] {
305
0
        return !env_->FileExists(pid_file);
306
0
      }, timeout, "Waiting for postgres server to shutdown"));
307
0
  ASSERT_FALSE(env_->FileExists(pid_file));
308
309
  // Create empty postmaster.pid file and ensure that tserver can start up
310
  // Use sync_on_close flag to ensure that the file is flushed to disk when tserver tries to read it
311
0
  std::unique_ptr<RWFile> file;
312
0
  RWFileOptions opts;
313
0
  opts.sync_on_close = true;
314
0
  opts.mode = Env::CREATE_IF_NON_EXISTING_TRUNCATE;
315
316
0
  ASSERT_OK(env_->NewRWFile(opts, pid_file, &file));
317
0
  ASSERT_OK(pg_ts_->Start(false /* start_cql_proxy */));
318
0
  ASSERT_OK(cluster_->WaitForTabletServerCount(tserver_count, timeout));
319
320
  // Shutdown tserver and wait for postgres server to shutdown and delete postmaster.pid file
321
0
  pg_ts_->Shutdown();
322
0
  ASSERT_OK(LoggedWaitFor(
323
0
      [this, &pid_file] {
324
0
        return !env_->FileExists(pid_file);
325
0
      }, timeout, "Waiting for postgres server to shutdown", 100ms));
326
0
  ASSERT_FALSE(env_->FileExists(pid_file));
327
328
  // Create postmaster.pid file with string pid (invalid) and ensure that tserver can start up
329
0
  ASSERT_OK(env_->NewRWFile(opts, pid_file, &file));
330
0
  ASSERT_OK(file->Write(0, "abcde\n" + pid_file));
331
0
  ASSERT_OK(file->Close());
332
333
0
  ASSERT_OK(pg_ts_->Start(false /* start_cql_proxy */));
334
0
  ASSERT_OK(cluster_->WaitForTabletServerCount(tserver_count, timeout));
335
336
  // Shutdown tserver and wait for postgres server to shutdown and delete postmaster.pid file
337
0
  pg_ts_->Shutdown();
338
0
  ASSERT_OK(LoggedWaitFor(
339
0
      [this, &pid_file] {
340
0
        return !env_->FileExists(pid_file);
341
0
      }, timeout, "Waiting for postgres server to shutdown", 100ms));
342
0
  ASSERT_FALSE(env_->FileExists(pid_file));
343
344
  // Create postgres pid file with integer pid (valid) and ensure that tserver can start up
345
0
  ASSERT_OK(env_->NewRWFile(opts, pid_file, &file));
346
0
  ASSERT_OK(file->Write(0, "1002\n" + pid_file));
347
0
  ASSERT_OK(file->Close());
348
349
0
  ASSERT_OK(pg_ts_->Start(false /* start_cql_proxy */));
350
0
  ASSERT_OK(cluster_->WaitForTabletServerCount(tserver_count, timeout));
351
0
}
352
353
}  // namespace pgwrapper
354
}  // namespace yb