YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/libpq_utils.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/pgwrapper/libpq_utils.h"
15
16
#include <string>
17
#include <utility>
18
19
#include <boost/preprocessor/seq/for_each.hpp>
20
21
#include "yb/common/pgsql_error.h"
22
23
#include "yb/gutil/casts.h"
24
#include "yb/gutil/endian.h"
25
26
#include "yb/util/enums.h"
27
#include "yb/util/format.h"
28
#include "yb/util/logging.h"
29
#include "yb/util/monotime.h"
30
#include "yb/util/net/net_util.h"
31
#include "yb/util/status_format.h"
32
#include "yb/util/status_log.h"
33
34
using namespace std::literals;
35
36
namespace yb {
37
namespace pgwrapper {
38
39
namespace {
40
41
// Converts the given element of the ExecStatusType enum to a string.
42
3
std::string ExecStatusTypeToStr(ExecStatusType exec_status_type) {
43
3
#define EXEC_STATUS_SWITCH_CASE(r, data, item) case item: return #item;
44
3
#define EXEC_STATUS_TYPE_ENUM_ELEMENTS \
45
3
    (PGRES_EMPTY_QUERY) \
46
3
    (PGRES_COMMAND_OK) \
47
3
    (PGRES_TUPLES_OK) \
48
3
    (PGRES_COPY_OUT) \
49
3
    (PGRES_COPY_IN) \
50
3
    (PGRES_BAD_RESPONSE) \
51
3
    (PGRES_NONFATAL_ERROR) \
52
3
    (PGRES_FATAL_ERROR) \
53
3
    (PGRES_COPY_BOTH) \
54
3
    (PGRES_SINGLE_TUPLE)
55
3
  switch (exec_status_type) {
56
3
    BOOST_PP_SEQ_FOR_EACH(EXEC_STATUS_SWITCH_CASE, ~, EXEC_STATUS_TYPE_ENUM_ELEMENTS)
57
3
  }
58
0
#undef EXEC_STATUS_SWITCH_CASE
59
0
#undef EXEC_STATUS_TYPE_ENUM_ELEMENTS
60
0
  return Format("Unknown ExecStatusType ($0)", exec_status_type);
61
3
}
62
63
4.53k
YBPgErrorCode GetSqlState(PGresult* result) {
64
4.53k
  auto exec_status_type = PQresultStatus(result);
65
4.53k
  if (exec_status_type == ExecStatusType::PGRES_COMMAND_OK ||
66
4.53k
      exec_status_type == ExecStatusType::PGRES_TUPLES_OK) {
67
0
    return YBPgErrorCode::YB_PG_SUCCESSFUL_COMPLETION;
68
0
  }
69
70
4.53k
  const char* sqlstate_str = PQresultErrorField(result, PG_DIAG_SQLSTATE);
71
4.53k
  if (sqlstate_str == nullptr) {
72
33
    auto err_msg = PQresultErrorMessage(result);
73
33
    YB_LOG_EVERY_N_SECS(WARNING, 5)
74
3
        << "SQLSTATE is not defined for result with "
75
3
        << "error message: " << (err_msg ? err_msg : 
"N/A"0
) << ", "
76
3
        << "PQresultStatus: " << ExecStatusTypeToStr(exec_status_type);
77
33
    return YBPgErrorCode::YB_PG_INTERNAL_ERROR;
78
33
  }
79
80
4.49k
  CHECK_EQ(5, strlen(sqlstate_str))
81
0
      << "sqlstate_str: " << sqlstate_str
82
0
      << ", PQresultStatus: " << ExecStatusTypeToStr(exec_status_type);
83
84
4.49k
  uint32_t sqlstate = 0;
85
86
26.9k
  for (int i = 0; i < 5; 
++i22.4k
) {
87
22.4k
    sqlstate |= (sqlstate_str[i] - '0') << (6 * i);
88
22.4k
  }
89
4.49k
  return static_cast<YBPgErrorCode>(sqlstate);
90
4.53k
}
91
92
// Taken from <https://stackoverflow.com/a/24315631> by Gauthier Boaglio.
93
9.78k
inline void ReplaceAll(std::string* str, const std::string& from, const std::string& to) {
94
9.78k
  CHECK(str);
95
9.78k
  size_t start_pos = 0;
96
9.79k
  while ((start_pos = str->find(from, start_pos)) != std::string::npos) {
97
12
    str->replace(start_pos, from.length(), to);
98
12
    start_pos += to.length(); // Handles case where 'to' is a substring of 'from'
99
12
  }
100
9.78k
}
101
102
}  // anonymous namespace
103
104
105
142k
void PGConnClose::operator()(PGconn* conn) const {
106
142k
  PQfinish(conn);
107
142k
}
108
109
struct PGConn::CopyData {
110
  static constexpr size_t kBufferSize = 2048;
111
112
  Status error;
113
  char * pos;
114
  char buffer[kBufferSize];
115
116
500
  void Start() {
117
500
    pos = buffer;
118
500
    error = Status::OK();
119
500
  }
120
121
10.0k
  void WriteUInt16(uint16_t value) {
122
10.0k
    BigEndian::Store16(pos, value);
123
10.0k
    pos += 2;
124
10.0k
  }
125
126
90.0k
  void WriteUInt32(uint32_t value) {
127
90.0k
    BigEndian::Store32(pos, value);
128
90.0k
    pos += 4;
129
90.0k
  }
130
131
0
  void WriteUInt64(uint64_t value) {
132
0
    BigEndian::Store64(pos, value);
133
0
    pos += 8;
134
0
  }
135
136
50.1k
  void Write(const char* value, size_t len) {
137
50.1k
    memcpy(pos, value, len);
138
50.1k
    pos += len;
139
50.1k
  }
140
141
180k
  size_t left() const {
142
180k
    return buffer + kBufferSize - pos;
143
180k
  }
144
};
145
146
Result<PGConn> PGConn::Connect(
147
    const HostPort& host_port,
148
    const std::string& db_name,
149
    const std::string& user,
150
364
    bool simple_query_protocol) {
151
364
  auto conn_info = Format(
152
364
      "host=$0 port=$1 user=$2",
153
364
      host_port.host(),
154
364
      host_port.port(),
155
364
      PqEscapeLiteral(user));
156
364
  if (!db_name.empty()) {
157
64
    conn_info = Format("dbname=$0 $1", PqEscapeLiteral(db_name), conn_info);
158
64
  }
159
364
  return Connect(conn_info, simple_query_protocol);
160
364
}
161
162
Result<PGConn> PGConn::Connect(const std::string& conn_str,
163
                               CoarseTimePoint deadline,
164
395
                               bool simple_query_protocol) {
165
395
  auto start = CoarseMonoClock::now();
166
140k
  for (;;) {
167
140k
    PGConnPtr result(PQconnectdb(conn_str.c_str()));
168
140k
    if (!result) {
169
0
      return STATUS(NetworkError, "Failed to connect to DB");
170
0
    }
171
140k
    auto status = PQstatus(result.get());
172
140k
    if (status == ConnStatusType::CONNECTION_OK) {
173
394
      LOG(INFO) << "Connected to PG (" << conn_str << "), time taken: "
174
394
                << MonoDelta(CoarseMonoClock::Now() - start);
175
394
      return PGConn(std::move(result), simple_query_protocol);
176
394
    }
177
140k
    auto now = CoarseMonoClock::now();
178
140k
    if (now >= deadline) {
179
4
      std::string msg(yb::Format("$0", status));
180
4
      if (status == CONNECTION_BAD) {
181
4
        msg = PQerrorMessage(result.get());
182
        // Avoid double newline (postgres adds a newline after the error message).
183
4
        if (msg.back() == '\n') {
184
4
          msg.resize(msg.size() - 1);
185
4
        }
186
4
      }
187
4
      return STATUS_FORMAT(NetworkError, "Connect failed: $0, passed: $1",
188
4
                           msg, MonoDelta(now - start));
189
4
    }
190
140k
  }
191
395
}
192
193
PGConn::PGConn(PGConnPtr ptr, bool simple_query_protocol)
194
394
    : impl_(std::move(ptr)), simple_query_protocol_(simple_query_protocol) {
195
394
}
196
197
1.37k
PGConn::~PGConn() {
198
1.37k
}
199
200
PGConn::PGConn(PGConn&& rhs)
201
982
    : impl_(std::move(rhs.impl_)), simple_query_protocol_(rhs.simple_query_protocol_) {
202
982
}
203
204
4
PGConn& PGConn::operator=(PGConn&& rhs) {
205
4
  impl_ = std::move(rhs.impl_);
206
4
  simple_query_protocol_ = rhs.simple_query_protocol_;
207
4
  return *this;
208
4
}
209
210
65.7k
void PGResultClear::operator()(PGresult* result) const {
211
65.7k
  PQclear(result);
212
65.7k
}
213
214
60.1k
Status PGConn::Execute(const std::string& command, bool show_query_in_error) {
215
60.1k
  VLOG
(1) << __func__ << " " << command92
;
216
60.1k
  PGResultPtr res(PQexec(impl_.get(), command.c_str()));
217
60.1k
  auto status = PQresultStatus(res.get());
218
60.1k
  if (ExecStatusType::PGRES_COMMAND_OK != status) {
219
4.43k
    if (status == ExecStatusType::PGRES_TUPLES_OK) {
220
0
      return STATUS_FORMAT(IllegalState,
221
0
                           "Tuples received in Execute$0",
222
0
                           show_query_in_error ? Format(" of '$0'", command) : "");
223
0
    }
224
4.43k
    return STATUS(NetworkError,
225
4.43k
                  Format("Execute$0 failed: $1, message: $2",
226
4.43k
                         show_query_in_error ? Format(" of '$0'", command) : "",
227
4.43k
                         status,
228
4.43k
                         PQresultErrorMessage(res.get())),
229
4.43k
                  Slice() /* msg2 */,
230
4.43k
                  PgsqlError(GetSqlState(res.get())));
231
4.43k
  }
232
55.6k
  return Status::OK();
233
60.1k
}
234
235
3.31k
Result<PGResultPtr> CheckResult(PGResultPtr result, const std::string& command) {
236
3.31k
  auto status = PQresultStatus(result.get());
237
3.31k
  if (ExecStatusType::PGRES_TUPLES_OK != status && 
ExecStatusType::PGRES_COPY_IN != status102
) {
238
92
    return STATUS(NetworkError,
239
92
                  Format("Fetch '$0' failed: $1, message: $2",
240
92
                         command, status, PQresultErrorMessage(result.get())),
241
92
                  Slice() /* msg2 */,
242
92
                  PgsqlError(GetSqlState(result.get())));
243
92
  }
244
3.22k
  return result;
245
3.31k
}
246
247
3.30k
Result<PGResultPtr> PGConn::Fetch(const std::string& command) {
248
3.30k
  VLOG
(1) << __func__ << " " << command0
;
249
3.30k
  return CheckResult(
250
3.30k
      PGResultPtr(simple_query_protocol_
251
3.30k
          ? 
PQexec(impl_.get(), command.c_str())22
252
3.30k
          : 
PQexecParams(impl_.get(), command.c_str(), 0, nullptr, nullptr, nullptr, nullptr, 1)3.28k
),
253
3.30k
      command);
254
3.30k
}
255
256
88
Result<PGResultPtr> PGConn::FetchMatrix(const std::string& command, int rows, int columns) {
257
88
  auto res = 
VERIFY_RESULT86
(Fetch(command));86
258
259
0
  auto fetched_columns = PQnfields(res.get());
260
86
  if (fetched_columns != columns) {
261
1
    return STATUS_FORMAT(
262
1
        RuntimeError, "Fetched $0 columns, while $1 expected", fetched_columns, columns);
263
1
  }
264
265
85
  auto fetched_rows = PQntuples(res.get());
266
85
  if (fetched_rows != rows) {
267
0
    return STATUS_FORMAT(
268
0
        RuntimeError, "Fetched $0 rows, while $1 expected", fetched_rows, rows);
269
0
  }
270
271
85
  return res;
272
85
}
273
274
7.59k
CHECKED_STATUS PGConn::StartTransaction(IsolationLevel isolation_level) {
275
7.59k
  switch (isolation_level) {
276
0
    case IsolationLevel::NON_TRANSACTIONAL:
277
0
      return Status::OK();
278
0
    case IsolationLevel::READ_COMMITTED:
279
0
      return Execute("START TRANSACTION ISOLATION LEVEL READ COMMITTED");
280
2.36k
    case IsolationLevel::SNAPSHOT_ISOLATION:
281
2.36k
      return Execute("START TRANSACTION ISOLATION LEVEL REPEATABLE READ");
282
5.23k
    case IsolationLevel::SERIALIZABLE_ISOLATION:
283
5.23k
      return Execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE");
284
7.59k
  }
285
286
0
  FATAL_INVALID_ENUM_VALUE(IsolationLevel, isolation_level);
287
0
}
288
289
0
CHECKED_STATUS PGConn::CommitTransaction() {
290
0
  return Execute("COMMIT");
291
0
}
292
293
0
CHECKED_STATUS PGConn::RollbackTransaction() {
294
0
  return Execute("ROLLBACK");
295
0
}
296
297
27
Result<bool> PGConn::HasIndexScan(const std::string& query) {
298
27
  constexpr int kExpectedColumns = 1;
299
27
  auto res = VERIFY_RESULT(FetchFormat("EXPLAIN $0", query));
300
301
0
  {
302
27
    int fetched_columns = PQnfields(res.get());
303
27
    if (fetched_columns != kExpectedColumns) {
304
0
      return STATUS_FORMAT(
305
0
          InternalError, "Fetched $0 columns, expected $1", fetched_columns, kExpectedColumns);
306
0
    }
307
27
  }
308
309
57
  
for (int line = 0; 27
line < PQntuples(res.get());
++line30
) {
310
52
    std::string value = VERIFY_RESULT(GetString(res.get(), line, 0));
311
52
    if (value.find("Index Scan") != std::string::npos) {
312
11
      return true;
313
41
    } else if (value.find("Index Only Scan") != std::string::npos) {
314
11
      return true;
315
11
    }
316
52
  }
317
5
  return false;
318
27
}
319
320
321
10
Status PGConn::CopyBegin(const std::string& command) {
322
10
  auto result = VERIFY_RESULT(CheckResult(
323
10
      PGResultPtr(
324
10
          PQexecParams(impl_.get(), command.c_str(), 0, nullptr, nullptr, nullptr, nullptr, 0)),
325
10
      command));
326
327
10
  if (!copy_data_) {
328
1
    copy_data_.reset(new CopyData);
329
1
  }
330
10
  copy_data_->Start();
331
332
10
  static const char prefix[] = "PGCOPY\n\xff\r\n\0\0\0\0\0\0\0\0\0";
333
10
  copy_data_->Write(prefix, sizeof(prefix) - 1);
334
335
10
  return Status::OK();
336
10
}
337
338
80.0k
bool PGConn::CopyEnsureBuffer(size_t len) {
339
80.0k
  if (!copy_data_->error.ok()) {
340
0
    return false;
341
0
  }
342
80.0k
  if (copy_data_->left() < len) {
343
320
    return CopyFlushBuffer();
344
320
  }
345
79.6k
  return true;
346
80.0k
}
347
348
10.0k
void PGConn::CopyStartRow(int16_t columns) {
349
10.0k
  if (!CopyEnsureBuffer(2)) {
350
0
    return;
351
0
  }
352
10.0k
  copy_data_->WriteUInt16(columns);
353
10.0k
}
354
355
490
bool PGConn::CopyFlushBuffer() {
356
490
  if (!copy_data_->error.ok()) {
357
0
    return false;
358
0
  }
359
490
  ptrdiff_t len = copy_data_->pos - copy_data_->buffer;
360
490
  if (len) {
361
490
    int res = PQputCopyData(impl_.get(), copy_data_->buffer, narrow_cast<int>(len));
362
490
    if (res < 0) {
363
0
      copy_data_->error = STATUS_FORMAT(NetworkError, "Put copy data failed: $0", res);
364
0
      return false;
365
0
    }
366
490
  }
367
490
  copy_data_->Start();
368
490
  return true;
369
490
}
370
371
0
void PGConn::CopyPutInt16(int16_t value) {
372
0
  if (!CopyEnsureBuffer(6)) {
373
0
    return;
374
0
  }
375
0
  copy_data_->WriteUInt32(2);
376
0
  copy_data_->WriteUInt16(value);
377
0
}
378
379
20.0k
void PGConn::CopyPutInt32(int32_t value) {
380
20.0k
  if (!CopyEnsureBuffer(8)) {
381
0
    return;
382
0
  }
383
20.0k
  copy_data_->WriteUInt32(4);
384
20.0k
  copy_data_->WriteUInt32(value);
385
20.0k
}
386
387
0
void PGConn::CopyPutInt64(int64_t value) {
388
0
  if (!CopyEnsureBuffer(12)) {
389
0
    return;
390
0
  }
391
0
  copy_data_->WriteUInt32(8);
392
0
  copy_data_->WriteUInt64(value);
393
0
}
394
395
50.0k
void PGConn::CopyPut(const char* value, size_t len) {
396
50.0k
  if (!CopyEnsureBuffer(4)) {
397
0
    return;
398
0
  }
399
50.0k
  copy_data_->WriteUInt32(static_cast<uint32_t>(len));
400
50.1k
  for (;;) {
401
50.1k
    size_t left = copy_data_->left();
402
50.1k
    if (copy_data_->left() < len) {
403
160
      copy_data_->Write(value, left);
404
160
      value += left;
405
160
      len -= left;
406
160
      if (!CopyFlushBuffer()) {
407
0
        return;
408
0
      }
409
50.0k
    } else {
410
50.0k
      copy_data_->Write(value, len);
411
50.0k
      break;
412
50.0k
    }
413
50.1k
  }
414
50.0k
}
415
416
10
Result<PGResultPtr> PGConn::CopyEnd() {
417
10
  if (CopyEnsureBuffer(2)) {
418
10
    copy_data_->WriteUInt16(static_cast<uint16_t>(-1));
419
10
  }
420
10
  if (!CopyFlushBuffer()) {
421
0
    return copy_data_->error;
422
0
  }
423
10
  int res = PQputCopyEnd(impl_.get(), 0);
424
10
  if (res <= 0) {
425
0
    return STATUS_FORMAT(NetworkError, "Put copy end failed: $0", res);
426
0
  }
427
428
10
  return PGResultPtr(PQgetResult(impl_.get()));
429
10
}
430
431
7.17k
Result<char*> GetValueWithLength(PGresult* result, int row, int column, size_t size) {
432
7.17k
  size_t len = PQgetlength(result, row, column);
433
7.17k
  if (len != size) {
434
0
    return STATUS_FORMAT(Corruption, "Bad column length: $0, expected: $1, row: $2, column: $3",
435
0
                         len, size, row, column);
436
0
  }
437
7.17k
  return PQgetvalue(result, row, column);
438
7.17k
}
439
440
804
Result<bool> GetBool(PGresult* result, int row, int column) {
441
804
  return *VERIFY_RESULT(GetValueWithLength(result, row, column, sizeof(bool)));
442
804
}
443
444
4.64k
Result<int32_t> GetInt32(PGresult* result, int row, int column) {
445
4.64k
  return BigEndian::Load32(VERIFY_RESULT(GetValueWithLength(result, row, column, sizeof(int32_t))));
446
4.64k
}
447
448
1.71k
Result<int64_t> GetInt64(PGresult* result, int row, int column) {
449
1.71k
  return BigEndian::Load64(VERIFY_RESULT(GetValueWithLength(result, row, column, sizeof(int64_t))));
450
1.71k
}
451
452
0
Result<double> GetDouble(PGresult* result, int row, int column) {
453
0
  auto temp =
454
0
      BigEndian::Load64(VERIFY_RESULT(GetValueWithLength(result, row, column, sizeof(int64_t))));
455
0
  return *reinterpret_cast<double*>(&temp);
456
0
}
457
458
2.63k
Result<std::string> GetString(PGresult* result, int row, int column) {
459
2.63k
  auto len = PQgetlength(result, row, column);
460
2.63k
  auto value = PQgetvalue(result, row, column);
461
2.63k
  return std::string(value, len);
462
2.63k
}
463
464
41
Result<std::string> ToString(PGresult* result, int row, int column) {
465
41
  constexpr Oid INT8OID = 20;
466
41
  constexpr Oid INT4OID = 23;
467
41
  constexpr Oid TEXTOID = 25;
468
41
  constexpr Oid FLOAT8OID = 701;
469
41
  constexpr Oid BPCHAROID = 1042;
470
41
  constexpr Oid VARCHAROID = 1043;
471
472
41
  auto type = PQftype(result, column);
473
41
  switch (type) {
474
1
    case INT8OID:
475
1
      return std::to_string(VERIFY_RESULT(GetInt64(result, row, column)));
476
20
    case INT4OID:
477
20
      return std::to_string(VERIFY_RESULT(GetInt32(result, row, column)));
478
0
    case FLOAT8OID:
479
0
      return std::to_string(VERIFY_RESULT(GetDouble(result, row, column)));
480
20
    case TEXTOID: FALLTHROUGH_INTENDED;
481
20
    case BPCHAROID: FALLTHROUGH_INTENDED;
482
20
    case VARCHAROID:
483
20
      return VERIFY_RESULT(GetString(result, row, column));
484
0
    default:
485
0
      return Format("Type not supported: $0", type);
486
41
  }
487
41
}
488
489
2
void LogResult(PGresult* result) {
490
2
  int cols = PQnfields(result);
491
2
  int rows = PQntuples(result);
492
23
  for (int row = 0; row != rows; 
++row21
) {
493
21
    std::string line;
494
62
    for (int col = 0; col != cols; 
++col41
) {
495
41
      if (col) {
496
20
        line += ", ";
497
20
      }
498
41
      line += CHECK_RESULT(ToString(result, row, col));
499
41
    }
500
21
    LOG(INFO) << line;
501
21
  }
502
2
}
503
504
// Escape literals in postgres (e.g. to make a libpq connection to a database named
505
// `this->'\<-this`, use `dbname='this->\'\\<-this'`).
506
//
507
// This should behave like `PQescapeLiteral` except that it doesn't need an existing connection
508
// passed in.
509
4.88k
std::string PqEscapeLiteral(const std::string& input) {
510
4.88k
  std::string output = input;
511
  // Escape certain characters.
512
4.88k
  ReplaceAll(&output, "\\", "\\\\");
513
4.88k
  ReplaceAll(&output, "'", "\\'");
514
  // Quote.
515
4.88k
  output.insert(0, 1, '\'');
516
4.88k
  output.push_back('\'');
517
4.88k
  return output;
518
4.88k
}
519
520
// Escape identifiers in postgres (e.g. to create a database named `this->"\<-this`, use `CREATE
521
// DATABASE "this->""\<-this"`).
522
//
523
// This should behave like `PQescapeIdentifier` except that it doesn't need an existing connection
524
// passed in.
525
14
std::string PqEscapeIdentifier(const std::string& input) {
526
14
  std::string output = input;
527
  // Escape certain characters.
528
14
  ReplaceAll(&output, "\"", "\"\"");
529
  // Quote.
530
14
  output.insert(0, 1, '"');
531
14
  output.push_back('"');
532
14
  return output;
533
14
}
534
535
20
bool HasTryAgain(const Status& status) {
536
20
  return status.ToString().find("Try again:") != std::string::npos;
537
20
}
538
539
} // namespace pgwrapper
540
} // namespace yb