YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/libpq_utils.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/pgwrapper/libpq_utils.h"
15
16
#include <string>
17
#include <utility>
18
19
#include <boost/preprocessor/seq/for_each.hpp>
20
21
#include "yb/common/pgsql_error.h"
22
23
#include "yb/gutil/casts.h"
24
#include "yb/gutil/endian.h"
25
26
#include "yb/util/enums.h"
27
#include "yb/util/format.h"
28
#include "yb/util/logging.h"
29
#include "yb/util/monotime.h"
30
#include "yb/util/net/net_util.h"
31
#include "yb/util/status_format.h"
32
#include "yb/util/status_log.h"
33
34
using namespace std::literals;
35
36
namespace yb {
37
namespace pgwrapper {
38
39
namespace {
40
41
// Converts the given element of the ExecStatusType enum to a string.
42
0
std::string ExecStatusTypeToStr(ExecStatusType exec_status_type) {
43
0
#define EXEC_STATUS_SWITCH_CASE(r, data, item) case item: return #item;
44
0
#define EXEC_STATUS_TYPE_ENUM_ELEMENTS \
45
0
    (PGRES_EMPTY_QUERY) \
46
0
    (PGRES_COMMAND_OK) \
47
0
    (PGRES_TUPLES_OK) \
48
0
    (PGRES_COPY_OUT) \
49
0
    (PGRES_COPY_IN) \
50
0
    (PGRES_BAD_RESPONSE) \
51
0
    (PGRES_NONFATAL_ERROR) \
52
0
    (PGRES_FATAL_ERROR) \
53
0
    (PGRES_COPY_BOTH) \
54
0
    (PGRES_SINGLE_TUPLE)
55
0
  switch (exec_status_type) {
56
0
    BOOST_PP_SEQ_FOR_EACH(EXEC_STATUS_SWITCH_CASE, ~, EXEC_STATUS_TYPE_ENUM_ELEMENTS)
57
0
  }
58
0
#undef EXEC_STATUS_SWITCH_CASE
59
0
#undef EXEC_STATUS_TYPE_ENUM_ELEMENTS
60
0
  return Format("Unknown ExecStatusType ($0)", exec_status_type);
61
0
}
62
63
0
YBPgErrorCode GetSqlState(PGresult* result) {
64
0
  auto exec_status_type = PQresultStatus(result);
65
0
  if (exec_status_type == ExecStatusType::PGRES_COMMAND_OK ||
66
0
      exec_status_type == ExecStatusType::PGRES_TUPLES_OK) {
67
0
    return YBPgErrorCode::YB_PG_SUCCESSFUL_COMPLETION;
68
0
  }
69
70
0
  const char* sqlstate_str = PQresultErrorField(result, PG_DIAG_SQLSTATE);
71
0
  if (sqlstate_str == nullptr) {
72
0
    auto err_msg = PQresultErrorMessage(result);
73
0
    YB_LOG_EVERY_N_SECS(WARNING, 5)
74
0
        << "SQLSTATE is not defined for result with "
75
0
        << "error message: " << (err_msg ? err_msg : "N/A") << ", "
76
0
        << "PQresultStatus: " << ExecStatusTypeToStr(exec_status_type);
77
0
    return YBPgErrorCode::YB_PG_INTERNAL_ERROR;
78
0
  }
79
80
0
  CHECK_EQ(5, strlen(sqlstate_str))
81
0
      << "sqlstate_str: " << sqlstate_str
82
0
      << ", PQresultStatus: " << ExecStatusTypeToStr(exec_status_type);
83
84
0
  uint32_t sqlstate = 0;
85
86
0
  for (int i = 0; i < 5; ++i) {
87
0
    sqlstate |= (sqlstate_str[i] - '0') << (6 * i);
88
0
  }
89
0
  return static_cast<YBPgErrorCode>(sqlstate);
90
0
}
91
92
// Taken from <https://stackoverflow.com/a/24315631> by Gauthier Boaglio.
93
940
inline void ReplaceAll(std::string* str, const std::string& from, const std::string& to) {
94
940
  CHECK(str);
95
940
  size_t start_pos = 0;
96
940
  while ((start_pos = str->find(from, start_pos)) != std::string::npos) {
97
0
    str->replace(start_pos, from.length(), to);
98
0
    start_pos += to.length(); // Handles case where 'to' is a substring of 'from'
99
0
  }
100
940
}
101
102
}  // anonymous namespace
103
104
105
235
void PGConnClose::operator()(PGconn* conn) const {
106
235
  PQfinish(conn);
107
235
}
108
109
struct PGConn::CopyData {
110
  static constexpr size_t kBufferSize = 2048;
111
112
  Status error;
113
  char * pos;
114
  char buffer[kBufferSize];
115
116
0
  void Start() {
117
0
    pos = buffer;
118
0
    error = Status::OK();
119
0
  }
120
121
0
  void WriteUInt16(uint16_t value) {
122
0
    BigEndian::Store16(pos, value);
123
0
    pos += 2;
124
0
  }
125
126
0
  void WriteUInt32(uint32_t value) {
127
0
    BigEndian::Store32(pos, value);
128
0
    pos += 4;
129
0
  }
130
131
0
  void WriteUInt64(uint64_t value) {
132
0
    BigEndian::Store64(pos, value);
133
0
    pos += 8;
134
0
  }
135
136
0
  void Write(const char* value, size_t len) {
137
0
    memcpy(pos, value, len);
138
0
    pos += len;
139
0
  }
140
141
0
  size_t left() const {
142
0
    return buffer + kBufferSize - pos;
143
0
  }
144
};
145
146
Result<PGConn> PGConn::Connect(
147
    const HostPort& host_port,
148
    const std::string& db_name,
149
    const std::string& user,
150
0
    bool simple_query_protocol) {
151
0
  auto conn_info = Format(
152
0
      "host=$0 port=$1 user=$2",
153
0
      host_port.host(),
154
0
      host_port.port(),
155
0
      PqEscapeLiteral(user));
156
0
  if (!db_name.empty()) {
157
0
    conn_info = Format("dbname=$0 $1", PqEscapeLiteral(db_name), conn_info);
158
0
  }
159
0
  return Connect(conn_info, simple_query_protocol);
160
0
}
161
162
Result<PGConn> PGConn::Connect(const std::string& conn_str,
163
                               CoarseTimePoint deadline,
164
0
                               bool simple_query_protocol) {
165
0
  auto start = CoarseMonoClock::now();
166
0
  for (;;) {
167
0
    PGConnPtr result(PQconnectdb(conn_str.c_str()));
168
0
    if (!result) {
169
0
      return STATUS(NetworkError, "Failed to connect to DB");
170
0
    }
171
0
    auto status = PQstatus(result.get());
172
0
    if (status == ConnStatusType::CONNECTION_OK) {
173
0
      LOG(INFO) << "Connected to PG (" << conn_str << "), time taken: "
174
0
                << MonoDelta(CoarseMonoClock::Now() - start);
175
0
      return PGConn(std::move(result), simple_query_protocol);
176
0
    }
177
0
    auto now = CoarseMonoClock::now();
178
0
    if (now >= deadline) {
179
0
      std::string msg(yb::Format("$0", status));
180
0
      if (status == CONNECTION_BAD) {
181
0
        msg = PQerrorMessage(result.get());
182
        // Avoid double newline (postgres adds a newline after the error message).
183
0
        if (msg.back() == '\n') {
184
0
          msg.resize(msg.size() - 1);
185
0
        }
186
0
      }
187
0
      return STATUS_FORMAT(NetworkError, "Connect failed: $0, passed: $1",
188
0
                           msg, MonoDelta(now - start));
189
0
    }
190
0
  }
191
0
}
192
193
PGConn::PGConn(PGConnPtr ptr, bool simple_query_protocol)
194
0
    : impl_(std::move(ptr)), simple_query_protocol_(simple_query_protocol) {
195
0
}
196
197
0
PGConn::~PGConn() {
198
0
}
199
200
PGConn::PGConn(PGConn&& rhs)
201
0
    : impl_(std::move(rhs.impl_)), simple_query_protocol_(rhs.simple_query_protocol_) {
202
0
}
203
204
0
PGConn& PGConn::operator=(PGConn&& rhs) {
205
0
  impl_ = std::move(rhs.impl_);
206
0
  simple_query_protocol_ = rhs.simple_query_protocol_;
207
0
  return *this;
208
0
}
209
210
257
void PGResultClear::operator()(PGresult* result) const {
211
257
  PQclear(result);
212
257
}
213
214
0
Status PGConn::Execute(const std::string& command, bool show_query_in_error) {
215
0
  VLOG(1) << __func__ << " " << command;
216
0
  PGResultPtr res(PQexec(impl_.get(), command.c_str()));
217
0
  auto status = PQresultStatus(res.get());
218
0
  if (ExecStatusType::PGRES_COMMAND_OK != status) {
219
0
    if (status == ExecStatusType::PGRES_TUPLES_OK) {
220
0
      return STATUS_FORMAT(IllegalState,
221
0
                           "Tuples received in Execute$0",
222
0
                           show_query_in_error ? Format(" of '$0'", command) : "");
223
0
    }
224
0
    return STATUS(NetworkError,
225
0
                  Format("Execute$0 failed: $1, message: $2",
226
0
                         show_query_in_error ? Format(" of '$0'", command) : "",
227
0
                         status,
228
0
                         PQresultErrorMessage(res.get())),
229
0
                  Slice() /* msg2 */,
230
0
                  PgsqlError(GetSqlState(res.get())));
231
0
  }
232
0
  return Status::OK();
233
0
}
234
235
0
Result<PGResultPtr> CheckResult(PGResultPtr result, const std::string& command) {
236
0
  auto status = PQresultStatus(result.get());
237
0
  if (ExecStatusType::PGRES_TUPLES_OK != status && ExecStatusType::PGRES_COPY_IN != status) {
238
0
    return STATUS(NetworkError,
239
0
                  Format("Fetch '$0' failed: $1, message: $2",
240
0
                         command, status, PQresultErrorMessage(result.get())),
241
0
                  Slice() /* msg2 */,
242
0
                  PgsqlError(GetSqlState(result.get())));
243
0
  }
244
0
  return result;
245
0
}
246
247
0
Result<PGResultPtr> PGConn::Fetch(const std::string& command) {
248
0
  VLOG(1) << __func__ << " " << command;
249
0
  return CheckResult(
250
0
      PGResultPtr(simple_query_protocol_
251
0
          ? PQexec(impl_.get(), command.c_str())
252
0
          : PQexecParams(impl_.get(), command.c_str(), 0, nullptr, nullptr, nullptr, nullptr, 1)),
253
0
      command);
254
0
}
255
256
0
Result<PGResultPtr> PGConn::FetchMatrix(const std::string& command, int rows, int columns) {
257
0
  auto res = VERIFY_RESULT(Fetch(command));
258
259
0
  auto fetched_columns = PQnfields(res.get());
260
0
  if (fetched_columns != columns) {
261
0
    return STATUS_FORMAT(
262
0
        RuntimeError, "Fetched $0 columns, while $1 expected", fetched_columns, columns);
263
0
  }
264
265
0
  auto fetched_rows = PQntuples(res.get());
266
0
  if (fetched_rows != rows) {
267
0
    return STATUS_FORMAT(
268
0
        RuntimeError, "Fetched $0 rows, while $1 expected", fetched_rows, rows);
269
0
  }
270
271
0
  return res;
272
0
}
273
274
0
CHECKED_STATUS PGConn::StartTransaction(IsolationLevel isolation_level) {
275
0
  switch (isolation_level) {
276
0
    case IsolationLevel::NON_TRANSACTIONAL:
277
0
      return Status::OK();
278
0
    case IsolationLevel::READ_COMMITTED:
279
0
      return Execute("START TRANSACTION ISOLATION LEVEL READ COMMITTED");
280
0
    case IsolationLevel::SNAPSHOT_ISOLATION:
281
0
      return Execute("START TRANSACTION ISOLATION LEVEL REPEATABLE READ");
282
0
    case IsolationLevel::SERIALIZABLE_ISOLATION:
283
0
      return Execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE");
284
0
  }
285
286
0
  FATAL_INVALID_ENUM_VALUE(IsolationLevel, isolation_level);
287
0
}
288
289
0
CHECKED_STATUS PGConn::CommitTransaction() {
290
0
  return Execute("COMMIT");
291
0
}
292
293
0
CHECKED_STATUS PGConn::RollbackTransaction() {
294
0
  return Execute("ROLLBACK");
295
0
}
296
297
0
Result<bool> PGConn::HasIndexScan(const std::string& query) {
298
0
  constexpr int kExpectedColumns = 1;
299
0
  auto res = VERIFY_RESULT(FetchFormat("EXPLAIN $0", query));
300
301
0
  {
302
0
    int fetched_columns = PQnfields(res.get());
303
0
    if (fetched_columns != kExpectedColumns) {
304
0
      return STATUS_FORMAT(
305
0
          InternalError, "Fetched $0 columns, expected $1", fetched_columns, kExpectedColumns);
306
0
    }
307
0
  }
308
309
0
  for (int line = 0; line < PQntuples(res.get()); ++line) {
310
0
    std::string value = VERIFY_RESULT(GetString(res.get(), line, 0));
311
0
    if (value.find("Index Scan") != std::string::npos) {
312
0
      return true;
313
0
    } else if (value.find("Index Only Scan") != std::string::npos) {
314
0
      return true;
315
0
    }
316
0
  }
317
0
  return false;
318
0
}
319
320
321
0
Status PGConn::CopyBegin(const std::string& command) {
322
0
  auto result = VERIFY_RESULT(CheckResult(
323
0
      PGResultPtr(
324
0
          PQexecParams(impl_.get(), command.c_str(), 0, nullptr, nullptr, nullptr, nullptr, 0)),
325
0
      command));
326
327
0
  if (!copy_data_) {
328
0
    copy_data_.reset(new CopyData);
329
0
  }
330
0
  copy_data_->Start();
331
332
0
  static const char prefix[] = "PGCOPY\n\xff\r\n\0\0\0\0\0\0\0\0\0";
333
0
  copy_data_->Write(prefix, sizeof(prefix) - 1);
334
335
0
  return Status::OK();
336
0
}
337
338
0
bool PGConn::CopyEnsureBuffer(size_t len) {
339
0
  if (!copy_data_->error.ok()) {
340
0
    return false;
341
0
  }
342
0
  if (copy_data_->left() < len) {
343
0
    return CopyFlushBuffer();
344
0
  }
345
0
  return true;
346
0
}
347
348
0
void PGConn::CopyStartRow(int16_t columns) {
349
0
  if (!CopyEnsureBuffer(2)) {
350
0
    return;
351
0
  }
352
0
  copy_data_->WriteUInt16(columns);
353
0
}
354
355
0
bool PGConn::CopyFlushBuffer() {
356
0
  if (!copy_data_->error.ok()) {
357
0
    return false;
358
0
  }
359
0
  ptrdiff_t len = copy_data_->pos - copy_data_->buffer;
360
0
  if (len) {
361
0
    int res = PQputCopyData(impl_.get(), copy_data_->buffer, narrow_cast<int>(len));
362
0
    if (res < 0) {
363
0
      copy_data_->error = STATUS_FORMAT(NetworkError, "Put copy data failed: $0", res);
364
0
      return false;
365
0
    }
366
0
  }
367
0
  copy_data_->Start();
368
0
  return true;
369
0
}
370
371
0
void PGConn::CopyPutInt16(int16_t value) {
372
0
  if (!CopyEnsureBuffer(6)) {
373
0
    return;
374
0
  }
375
0
  copy_data_->WriteUInt32(2);
376
0
  copy_data_->WriteUInt16(value);
377
0
}
378
379
0
void PGConn::CopyPutInt32(int32_t value) {
380
0
  if (!CopyEnsureBuffer(8)) {
381
0
    return;
382
0
  }
383
0
  copy_data_->WriteUInt32(4);
384
0
  copy_data_->WriteUInt32(value);
385
0
}
386
387
0
void PGConn::CopyPutInt64(int64_t value) {
388
0
  if (!CopyEnsureBuffer(12)) {
389
0
    return;
390
0
  }
391
0
  copy_data_->WriteUInt32(8);
392
0
  copy_data_->WriteUInt64(value);
393
0
}
394
395
0
void PGConn::CopyPut(const char* value, size_t len) {
396
0
  if (!CopyEnsureBuffer(4)) {
397
0
    return;
398
0
  }
399
0
  copy_data_->WriteUInt32(static_cast<uint32_t>(len));
400
0
  for (;;) {
401
0
    size_t left = copy_data_->left();
402
0
    if (copy_data_->left() < len) {
403
0
      copy_data_->Write(value, left);
404
0
      value += left;
405
0
      len -= left;
406
0
      if (!CopyFlushBuffer()) {
407
0
        return;
408
0
      }
409
0
    } else {
410
0
      copy_data_->Write(value, len);
411
0
      break;
412
0
    }
413
0
  }
414
0
}
415
416
0
Result<PGResultPtr> PGConn::CopyEnd() {
417
0
  if (CopyEnsureBuffer(2)) {
418
0
    copy_data_->WriteUInt16(static_cast<uint16_t>(-1));
419
0
  }
420
0
  if (!CopyFlushBuffer()) {
421
0
    return copy_data_->error;
422
0
  }
423
0
  int res = PQputCopyEnd(impl_.get(), 0);
424
0
  if (res <= 0) {
425
0
    return STATUS_FORMAT(NetworkError, "Put copy end failed: $0", res);
426
0
  }
427
428
0
  return PGResultPtr(PQgetResult(impl_.get()));
429
0
}
430
431
0
Result<char*> GetValueWithLength(PGresult* result, int row, int column, size_t size) {
432
0
  size_t len = PQgetlength(result, row, column);
433
0
  if (len != size) {
434
0
    return STATUS_FORMAT(Corruption, "Bad column length: $0, expected: $1, row: $2, column: $3",
435
0
                         len, size, row, column);
436
0
  }
437
0
  return PQgetvalue(result, row, column);
438
0
}
439
440
0
Result<bool> GetBool(PGresult* result, int row, int column) {
441
0
  return *VERIFY_RESULT(GetValueWithLength(result, row, column, sizeof(bool)));
442
0
}
443
444
0
Result<int32_t> GetInt32(PGresult* result, int row, int column) {
445
0
  return BigEndian::Load32(VERIFY_RESULT(GetValueWithLength(result, row, column, sizeof(int32_t))));
446
0
}
447
448
0
Result<int64_t> GetInt64(PGresult* result, int row, int column) {
449
0
  return BigEndian::Load64(VERIFY_RESULT(GetValueWithLength(result, row, column, sizeof(int64_t))));
450
0
}
451
452
0
Result<double> GetDouble(PGresult* result, int row, int column) {
453
0
  auto temp =
454
0
      BigEndian::Load64(VERIFY_RESULT(GetValueWithLength(result, row, column, sizeof(int64_t))));
455
0
  return *reinterpret_cast<double*>(&temp);
456
0
}
457
458
256
Result<std::string> GetString(PGresult* result, int row, int column) {
459
256
  auto len = PQgetlength(result, row, column);
460
256
  auto value = PQgetvalue(result, row, column);
461
256
  return std::string(value, len);
462
256
}
463
464
0
Result<std::string> ToString(PGresult* result, int row, int column) {
465
0
  constexpr Oid INT8OID = 20;
466
0
  constexpr Oid INT4OID = 23;
467
0
  constexpr Oid TEXTOID = 25;
468
0
  constexpr Oid FLOAT8OID = 701;
469
0
  constexpr Oid BPCHAROID = 1042;
470
0
  constexpr Oid VARCHAROID = 1043;
471
472
0
  auto type = PQftype(result, column);
473
0
  switch (type) {
474
0
    case INT8OID:
475
0
      return std::to_string(VERIFY_RESULT(GetInt64(result, row, column)));
476
0
    case INT4OID:
477
0
      return std::to_string(VERIFY_RESULT(GetInt32(result, row, column)));
478
0
    case FLOAT8OID:
479
0
      return std::to_string(VERIFY_RESULT(GetDouble(result, row, column)));
480
0
    case TEXTOID: FALLTHROUGH_INTENDED;
481
0
    case BPCHAROID: FALLTHROUGH_INTENDED;
482
0
    case VARCHAROID:
483
0
      return VERIFY_RESULT(GetString(result, row, column));
484
0
    default:
485
0
      return Format("Type not supported: $0", type);
486
0
  }
487
0
}
488
489
0
void LogResult(PGresult* result) {
490
0
  int cols = PQnfields(result);
491
0
  int rows = PQntuples(result);
492
0
  for (int row = 0; row != rows; ++row) {
493
0
    std::string line;
494
0
    for (int col = 0; col != cols; ++col) {
495
0
      if (col) {
496
0
        line += ", ";
497
0
      }
498
0
      line += CHECK_RESULT(ToString(result, row, col));
499
0
    }
500
0
    LOG(INFO) << line;
501
0
  }
502
0
}
503
504
// Escape literals in postgres (e.g. to make a libpq connection to a database named
505
// `this->'\<-this`, use `dbname='this->\'\\<-this'`).
506
//
507
// This should behave like `PQescapeLiteral` except that it doesn't need an existing connection
508
// passed in.
509
470
std::string PqEscapeLiteral(const std::string& input) {
510
470
  std::string output = input;
511
  // Escape certain characters.
512
470
  ReplaceAll(&output, "\\", "\\\\");
513
470
  ReplaceAll(&output, "'", "\\'");
514
  // Quote.
515
470
  output.insert(0, 1, '\'');
516
470
  output.push_back('\'');
517
470
  return output;
518
470
}
519
520
// Escape identifiers in postgres (e.g. to create a database named `this->"\<-this`, use `CREATE
521
// DATABASE "this->""\<-this"`).
522
//
523
// This should behave like `PQescapeIdentifier` except that it doesn't need an existing connection
524
// passed in.
525
0
std::string PqEscapeIdentifier(const std::string& input) {
526
0
  std::string output = input;
527
  // Escape certain characters.
528
0
  ReplaceAll(&output, "\"", "\"\"");
529
  // Quote.
530
0
  output.insert(0, 1, '"');
531
0
  output.push_back('"');
532
0
  return output;
533
0
}
534
535
0
bool HasTryAgain(const Status& status) {
536
0
  return status.ToString().find("Try again:") != std::string::npos;
537
0
}
538
539
} // namespace pgwrapper
540
} // namespace yb