YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_client.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/pggate/pg_client.h"
15
16
#include "yb/client/client-internal.h"
17
#include "yb/client/table.h"
18
#include "yb/client/table_info.h"
19
#include "yb/client/tablet_server.h"
20
#include "yb/client/yb_table_name.h"
21
22
#include "yb/gutil/casts.h"
23
24
#include "yb/rpc/poller.h"
25
#include "yb/rpc/rpc_controller.h"
26
27
#include "yb/tserver/pg_client.pb.h"
28
#include "yb/tserver/pg_client.proxy.h"
29
#include "yb/tserver/tserver_shared_mem.h"
30
31
#include "yb/util/logging.h"
32
#include "yb/util/protobuf_util.h"
33
#include "yb/util/result.h"
34
#include "yb/util/scope_exit.h"
35
#include "yb/util/shared_mem.h"
36
#include "yb/util/status.h"
37
38
#include "yb/yql/pggate/pg_op.h"
39
#include "yb/yql/pggate/pg_tabledesc.h"
40
41
DECLARE_bool(use_node_hostname_for_local_tserver);
42
DECLARE_int32(backfill_index_client_rpc_timeout_ms);
43
DECLARE_int32(yb_client_admin_operation_timeout_sec);
44
45
DEFINE_uint64(pg_client_heartbeat_interval_ms, 10000, "Pg client heartbeat interval in ms.");
46
47
using namespace std::literals;
48
49
namespace yb {
50
namespace pggate {
51
52
namespace {
53
54
// Adding this value to RPC call timeout, so postgres could detect timeout by it's own mechanism
55
// and report it.
56
const auto kExtraTimeout = 2s;
57
58
struct PerformData {
59
  PgsqlOps operations;
60
  tserver::PgPerformResponsePB resp;
61
  rpc::RpcController controller;
62
  PerformCallback callback;
63
64
728k
  CHECKED_STATUS Process() {
65
728k
    auto& responses = *resp.mutable_responses();
66
728k
    SCHECK_EQ(implicit_cast<size_t>(responses.size()), operations.size(), RuntimeError,
67
728k
              Format("Wrong number of responses: $0, while $1 expected",
68
728k
                     responses.size(), operations.size()));
69
4.18M
    for (uint32_t i = 0; i != operations.size(); ++i) {
70
3.45M
      if (responses[i].has_rows_data_sidecar()) {
71
3.45M
        operations[i]->rows_data() = VERIFY_RESULT(
72
3.45M
            controller.GetSidecarPtr(responses[i].rows_data_sidecar()));
73
3.45M
      }
74
3.45M
      operations[i]->response() = std::move(responses[i]);
75
3.45M
    }
76
728k
    return Status::OK();
77
728k
  }
78
};
79
80
0
std::string PrettyFunctionName(const char* name) {
81
0
  std::string result;
82
0
  for (const char* ch = name; *ch; ++ch) {
83
0
    if (!result.empty() && std::isupper(*ch)) {
84
0
      result += ' ';
85
0
    }
86
0
    result += *ch;
87
0
  }
88
0
  return result;
89
0
}
90
91
} // namespace
92
93
class PgClient::Impl {
94
 public:
95
1.65k
  Impl() : heartbeat_poller_(std::bind(&Impl::Heartbeat, this, false)) {
96
1.65k
    tablet_server_count_cache_.fill(0);
97
1.65k
  }
98
99
1.64k
  ~Impl() {
100
1.64k
    CHECK(!proxy_);
101
1.64k
  }
102
103
  CHECKED_STATUS Start(rpc::ProxyCache* proxy_cache,
104
                       rpc::Scheduler* scheduler,
105
1.65k
                       const tserver::TServerSharedObject& tserver_shared_object) {
106
1.65k
    CHECK_NOTNULL(&tserver_shared_object);
107
1.65k
    MonoDelta resolve_cache_timeout;
108
1.65k
    const auto& tserver_shared_data_ = *tserver_shared_object;
109
1.65k
    HostPort host_port(tserver_shared_data_.endpoint());
110
1.65k
    if (FLAGS_use_node_hostname_for_local_tserver) {
111
0
      host_port = HostPort(tserver_shared_data_.host().ToBuffer(),
112
0
                           tserver_shared_data_.endpoint().port());
113
0
      resolve_cache_timeout = MonoDelta::kMax;
114
0
    }
115
1.65k
    LOG(INFO) << "Using TServer host_port: " << host_port;
116
1.65k
    proxy_ = std::make_unique<tserver::PgClientServiceProxy>(
117
1.65k
        proxy_cache, host_port, nullptr /* protocol */, resolve_cache_timeout);
118
119
1.65k
    auto future = create_session_promise_.get_future();
120
1.65k
    Heartbeat(true);
121
1.65k
    session_id_ = VERIFY_RESULT(future.get());
122
1.65k
    LOG_WITH_PREFIX(INFO) << "Session id acquired";
123
1.65k
    heartbeat_poller_.Start(scheduler, FLAGS_pg_client_heartbeat_interval_ms * 1ms);
124
1.65k
    return Status::OK();
125
1.65k
  }
126
127
1.65k
  void Shutdown() {
128
1.65k
    heartbeat_poller_.Shutdown();
129
1.65k
    proxy_ = nullptr;
130
1.65k
  }
131
132
2.74k
  void Heartbeat(bool create) {
133
2.74k
    {
134
2.74k
      bool expected = false;
135
2.74k
      if (!heartbeat_running_.compare_exchange_strong(expected, true)) {
136
0
        LOG_WITH_PREFIX(DFATAL) << "Heartbeat did not complete yet";
137
0
        return;
138
0
      }
139
2.74k
    }
140
2.74k
    tserver::PgHeartbeatRequestPB req;
141
2.74k
    if (!create) {
142
1.09k
      req.set_session_id(session_id_);
143
1.09k
    }
144
2.74k
    proxy_->HeartbeatAsync(
145
2.74k
        req, &heartbeat_resp_, PrepareHeartbeatController(),
146
2.74k
        [this, create] {
147
2.74k
      auto status = ResponseStatus(heartbeat_resp_);
148
2.74k
      if (create) {
149
1.65k
        if (!status.ok()) {
150
0
          create_session_promise_.set_value(status);
151
1.65k
        } else {
152
1.65k
          create_session_promise_.set_value(heartbeat_resp_.session_id());
153
1.65k
        }
154
1.65k
      }
155
2.74k
      heartbeat_running_ = false;
156
2.74k
      if (!status.ok()) {
157
0
        LOG_WITH_PREFIX(WARNING) << "Heartbeat failed: " << status;
158
0
      }
159
2.74k
    });
160
2.74k
  }
161
162
0
  void SetTimeout(MonoDelta timeout) {
163
0
    timeout_ = timeout + kExtraTimeout;
164
0
  }
165
166
  Result<PgTableDescPtr> OpenTable(
167
64.5k
      const PgObjectId& table_id, bool reopen, CoarseTimePoint invalidate_cache_time) {
168
64.5k
    tserver::PgOpenTableRequestPB req;
169
64.5k
    req.set_table_id(table_id.GetYBTableId());
170
64.5k
    req.set_reopen(reopen);
171
64.5k
    if (invalidate_cache_time != CoarseTimePoint()) {
172
551
      req.set_invalidate_cache_time_us(ToMicroseconds(invalidate_cache_time.time_since_epoch()));
173
551
    }
174
64.5k
    tserver::PgOpenTableResponsePB resp;
175
176
64.5k
    RETURN_NOT_OK(proxy_->OpenTable(req, &resp, PrepareController()));
177
64.5k
    RETURN_NOT_OK(ResponseStatus(resp));
178
179
64.5k
    client::YBTableInfo info;
180
64.5k
    RETURN_NOT_OK(client::CreateTableInfoFromTableSchemaResp(resp.info(), &info));
181
182
64.5k
    auto partitions = std::make_shared<client::VersionedTablePartitionList>();
183
64.5k
    partitions->version = resp.partitions().version();
184
64.5k
    partitions->keys.assign(resp.partitions().keys().begin(), resp.partitions().keys().end());
185
186
64.5k
    return make_scoped_refptr<PgTableDesc>(
187
64.5k
        table_id, std::make_shared<client::YBTable>(info, std::move(partitions)));
188
64.5k
  }
189
190
82.3k
  CHECKED_STATUS FinishTransaction(Commit commit, DdlMode ddl_mode) {
191
82.3k
    tserver::PgFinishTransactionRequestPB req;
192
82.3k
    req.set_session_id(session_id_);
193
82.3k
    req.set_commit(commit);
194
82.3k
    req.set_ddl_mode(ddl_mode);
195
82.3k
    tserver::PgFinishTransactionResponsePB resp;
196
197
82.3k
    RETURN_NOT_OK(proxy_->FinishTransaction(req, &resp, PrepareController()));
198
82.3k
    return ResponseStatus(resp);
199
82.3k
  }
200
201
1.61k
  Result<master::GetNamespaceInfoResponsePB> GetDatabaseInfo(uint32_t oid) {
202
1.61k
    tserver::PgGetDatabaseInfoRequestPB req;
203
1.61k
    req.set_oid(oid);
204
205
1.61k
    tserver::PgGetDatabaseInfoResponsePB resp;
206
207
1.61k
    RETURN_NOT_OK(proxy_->GetDatabaseInfo(req, &resp, PrepareController()));
208
1.61k
    RETURN_NOT_OK(ResponseStatus(resp));
209
1.61k
    return resp.info();
210
1.61k
  }
211
212
  CHECKED_STATUS SetActiveSubTransaction(
213
48.8k
      SubTransactionId id, tserver::PgPerformOptionsPB* options) {
214
48.8k
    tserver::PgSetActiveSubTransactionRequestPB req;
215
48.8k
    req.set_session_id(session_id_);
216
48.8k
    if (options) {
217
307
      options->Swap(req.mutable_options());
218
307
    }
219
48.8k
    req.set_sub_transaction_id(id);
220
221
48.8k
    tserver::PgSetActiveSubTransactionResponsePB resp;
222
223
48.8k
    RETURN_NOT_OK(proxy_->SetActiveSubTransaction(req, &resp, PrepareController()));
224
48.8k
    return ResponseStatus(resp);
225
48.8k
  }
226
227
23.5k
  CHECKED_STATUS RollbackSubTransaction(SubTransactionId id) {
228
23.5k
    tserver::PgRollbackSubTransactionRequestPB req;
229
23.5k
    req.set_session_id(session_id_);
230
23.5k
    req.set_sub_transaction_id(id);
231
232
23.5k
    tserver::PgRollbackSubTransactionResponsePB resp;
233
234
23.5k
    RETURN_NOT_OK(proxy_->RollbackSubTransaction(req, &resp, PrepareController()));
235
23.5k
    return ResponseStatus(resp);
236
23.5k
  }
237
238
  void PerformAsync(
239
      tserver::PgPerformOptionsPB* options,
240
      PgsqlOps* operations,
241
774k
      const PerformCallback& callback) {
242
774k
    tserver::PgPerformRequestPB req;
243
774k
    req.set_session_id(session_id_);
244
774k
    *req.mutable_options() = std::move(*options);
245
775k
    auto se = ScopeExit([&req] {
246
4.02M
      for (auto& op : *req.mutable_ops()) {
247
4.02M
        if (!op.release_read()) {
248
2.12M
          op.release_write();
249
2.12M
        }
250
4.02M
      }
251
775k
    });
252
774k
    PrepareOperations(&req, operations);
253
254
774k
    auto data = std::make_shared<PerformData>();
255
774k
    data->operations = std::move(*operations);
256
774k
    data->callback = callback;
257
774k
    data->controller.set_invoke_callback_mode(rpc::InvokeCallbackMode::kReactorThread);
258
259
775k
    proxy_->PerformAsync(req, &data->resp, SetupController(&data->controller), [data] {
260
775k
      PerformResult result;
261
775k
      result.status = data->controller.status();
262
775k
      if (result.status.ok()) {
263
775k
        result.status = ResponseStatus(data->resp);
264
775k
      }
265
775k
      if (result.status.ok()) {
266
728k
        result.status = data->Process();
267
728k
      }
268
775k
      if (result.status.ok() && data->resp.has_catalog_read_time()) {
269
18.5k
        result.catalog_read_time = ReadHybridTime::FromPB(data->resp.catalog_read_time());
270
18.5k
      }
271
775k
      data->callback(result);
272
775k
    });
273
774k
  }
274
275
774k
  void PrepareOperations(tserver::PgPerformRequestPB* req, PgsqlOps* operations) {
276
774k
    auto& ops = *req->mutable_ops();
277
774k
    ops.Reserve(narrow_cast<int>(operations->size()));
278
4.02M
    for (auto& op : *operations) {
279
4.02M
      auto* union_op = ops.Add();
280
4.02M
      if (op->is_read()) {
281
1.89M
        auto& read_op = down_cast<PgsqlReadOp&>(*op);
282
1.89M
        union_op->set_allocated_read(&read_op.read_request());
283
1.89M
        if (read_op.read_from_followers()) {
284
0
          union_op->set_read_from_followers(true);
285
0
        }
286
2.12M
      } else {
287
2.12M
        auto& write_op = down_cast<PgsqlWriteOp&>(*op);
288
2.12M
        if (write_op.write_time()) {
289
103k
          req->set_write_time(write_op.write_time().ToUint64());
290
103k
        }
291
2.12M
        union_op->set_allocated_write(&write_op.write_request());
292
2.12M
      }
293
4.02M
      if (op->read_time()) {
294
24.8k
        op->read_time().AddToPB(req->mutable_options());
295
24.8k
      }
296
4.02M
    }
297
774k
  }
298
299
380
  Result<std::pair<PgOid, PgOid>> ReserveOids(PgOid database_oid, PgOid next_oid, uint32_t count) {
300
380
    tserver::PgReserveOidsRequestPB req;
301
380
    req.set_database_oid(database_oid);
302
380
    req.set_next_oid(next_oid);
303
380
    req.set_count(count);
304
305
380
    tserver::PgReserveOidsResponsePB resp;
306
307
380
    RETURN_NOT_OK(proxy_->ReserveOids(req, &resp, PrepareController()));
308
380
    RETURN_NOT_OK(ResponseStatus(resp));
309
380
    return std::pair<PgOid, PgOid>(resp.begin_oid(), resp.end_oid());
310
380
  }
311
312
0
  Result<bool> IsInitDbDone() {
313
0
    tserver::PgIsInitDbDoneRequestPB req;
314
0
    tserver::PgIsInitDbDoneResponsePB resp;
315
316
0
    RETURN_NOT_OK(proxy_->IsInitDbDone(req, &resp, PrepareController()));
317
0
    RETURN_NOT_OK(ResponseStatus(resp));
318
0
    return resp.done();
319
0
  }
320
321
0
  Result<uint64_t> GetCatalogMasterVersion() {
322
0
    tserver::PgGetCatalogMasterVersionRequestPB req;
323
0
    tserver::PgGetCatalogMasterVersionResponsePB resp;
324
325
0
    RETURN_NOT_OK(proxy_->GetCatalogMasterVersion(req, &resp, PrepareController()));
326
0
    RETURN_NOT_OK(ResponseStatus(resp));
327
0
    return resp.version();
328
0
  }
329
330
21
  CHECKED_STATUS CreateSequencesDataTable() {
331
21
    tserver::PgCreateSequencesDataTableRequestPB req;
332
21
    tserver::PgCreateSequencesDataTableResponsePB resp;
333
334
21
    RETURN_NOT_OK(proxy_->CreateSequencesDataTable(req, &resp, PrepareController()));
335
21
    return ResponseStatus(resp);
336
21
  }
337
338
  Result<client::YBTableName> DropTable(
339
1.17k
      tserver::PgDropTableRequestPB* req, CoarseTimePoint deadline) {
340
1.17k
    req->set_session_id(session_id_);
341
1.17k
    tserver::PgDropTableResponsePB resp;
342
1.17k
    RETURN_NOT_OK(proxy_->DropTable(*req, &resp, PrepareController(deadline)));
343
1.17k
    RETURN_NOT_OK(ResponseStatus(resp));
344
1.17k
    client::YBTableName result;
345
1.17k
    if (resp.has_indexed_table()) {
346
140
      result.GetFromTableIdentifierPB(resp.indexed_table());
347
140
    }
348
1.17k
    return result;
349
1.17k
  }
350
351
  CHECKED_STATUS BackfillIndex(
352
89
      tserver::PgBackfillIndexRequestPB* req, CoarseTimePoint deadline) {
353
89
    tserver::PgBackfillIndexResponsePB resp;
354
89
    req->set_session_id(session_id_);
355
356
89
    RETURN_NOT_OK(proxy_->BackfillIndex(*req, &resp, PrepareController(deadline)));
357
89
    return ResponseStatus(resp);
358
89
  }
359
360
178
  Result<int32> TabletServerCount(bool primary_only) {
361
178
    if (tablet_server_count_cache_[primary_only] > 0) {
362
121
      return tablet_server_count_cache_[primary_only];
363
121
    }
364
57
    tserver::PgTabletServerCountRequestPB req;
365
57
    tserver::PgTabletServerCountResponsePB resp;
366
57
    req.set_primary_only(primary_only);
367
368
57
    RETURN_NOT_OK(proxy_->TabletServerCount(req, &resp, PrepareController()));
369
57
    RETURN_NOT_OK(ResponseStatus(resp));
370
57
    tablet_server_count_cache_[primary_only] = resp.count();
371
57
    return resp.count();
372
57
  }
373
374
2
  Result<client::TabletServersInfo> ListLiveTabletServers(bool primary_only) {
375
2
    tserver::PgListLiveTabletServersRequestPB req;
376
2
    tserver::PgListLiveTabletServersResponsePB resp;
377
2
    req.set_primary_only(primary_only);
378
379
2
    RETURN_NOT_OK(proxy_->ListLiveTabletServers(req, &resp, PrepareController()));
380
2
    RETURN_NOT_OK(ResponseStatus(resp));
381
2
    client::TabletServersInfo result;
382
2
    result.reserve(resp.servers().size());
383
6
    for (const auto& server : resp.servers()) {
384
6
      result.push_back(client::YBTabletServerPlacementInfo::FromPB(server));
385
6
    }
386
2
    return result;
387
2
  }
388
389
0
  CHECKED_STATUS ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) {
390
0
    tserver::PgValidatePlacementResponsePB resp;
391
0
    RETURN_NOT_OK(proxy_->ValidatePlacement(*req, &resp, PrepareController()));
392
0
    return ResponseStatus(resp);
393
0
  }
394
395
  #define YB_PG_CLIENT_SIMPLE_METHOD_IMPL(r, data, method) \
396
  CHECKED_STATUS method( \
397
      tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)* req, \
398
1.64k
      CoarseTimePoint deadline) { \
399
1.64k
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
400
1.64k
    req->set_session_id(session_id_); \
401
1.64k
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
402
1.64k
    if (!status.ok()) { \
403
0
      if (status.IsTimedOut()) { \
404
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
405
0
      } \
406
0
      return status; \
407
0
    } \
408
1.64k
    return ResponseStatus(resp); \
409
1.64k
  }
Unexecuted instantiation: _ZN2yb6pggate8PgClient4Impl13AlterDatabaseEPNS_7tserver24PgAlterDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE
_ZN2yb6pggate8PgClient4Impl10AlterTableEPNS_7tserver21PgAlterTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
398
155
      CoarseTimePoint deadline) { \
399
155
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
400
155
    req->set_session_id(session_id_); \
401
155
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
402
155
    if (!status.ok()) { \
403
0
      if (status.IsTimedOut()) { \
404
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
405
0
      } \
406
0
      return status; \
407
0
    } \
408
155
    return ResponseStatus(resp); \
409
155
  }
_ZN2yb6pggate8PgClient4Impl14CreateDatabaseEPNS_7tserver25PgCreateDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
398
22
      CoarseTimePoint deadline) { \
399
22
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
400
22
    req->set_session_id(session_id_); \
401
22
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
402
22
    if (!status.ok()) { \
403
0
      if (status.IsTimedOut()) { \
404
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
405
0
      } \
406
0
      return status; \
407
0
    } \
408
22
    return ResponseStatus(resp); \
409
22
  }
_ZN2yb6pggate8PgClient4Impl11CreateTableEPNS_7tserver22PgCreateTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
398
1.41k
      CoarseTimePoint deadline) { \
399
1.41k
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
400
1.41k
    req->set_session_id(session_id_); \
401
1.41k
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
402
1.41k
    if (!status.ok()) { \
403
0
      if (status.IsTimedOut()) { \
404
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
405
0
      } \
406
0
      return status; \
407
0
    } \
408
1.41k
    return ResponseStatus(resp); \
409
1.41k
  }
_ZN2yb6pggate8PgClient4Impl16CreateTablegroupEPNS_7tserver27PgCreateTablegroupRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
398
1
      CoarseTimePoint deadline) { \
399
1
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
400
1
    req->set_session_id(session_id_); \
401
1
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
402
1
    if (!status.ok()) { \
403
0
      if (status.IsTimedOut()) { \
404
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
405
0
      } \
406
0
      return status; \
407
0
    } \
408
1
    return ResponseStatus(resp); \
409
1
  }
_ZN2yb6pggate8PgClient4Impl12DropDatabaseEPNS_7tserver23PgDropDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
398
21
      CoarseTimePoint deadline) { \
399
21
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
400
21
    req->set_session_id(session_id_); \
401
21
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
402
21
    if (!status.ok()) { \
403
0
      if (status.IsTimedOut()) { \
404
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
405
0
      } \
406
0
      return status; \
407
0
    } \
408
21
    return ResponseStatus(resp); \
409
21
  }
_ZN2yb6pggate8PgClient4Impl14DropTablegroupEPNS_7tserver25PgDropTablegroupRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
398
1
      CoarseTimePoint deadline) { \
399
1
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
400
1
    req->set_session_id(session_id_); \
401
1
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
402
1
    if (!status.ok()) { \
403
0
      if (status.IsTimedOut()) { \
404
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
405
0
      } \
406
0
      return status; \
407
0
    } \
408
1
    return ResponseStatus(resp); \
409
1
  }
_ZN2yb6pggate8PgClient4Impl13TruncateTableEPNS_7tserver24PgTruncateTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
398
31
      CoarseTimePoint deadline) { \
399
31
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
400
31
    req->set_session_id(session_id_); \
401
31
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
402
31
    if (!status.ok()) { \
403
0
      if (status.IsTimedOut()) { \
404
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
405
0
      } \
406
0
      return status; \
407
0
    } \
408
31
    return ResponseStatus(resp); \
409
31
  }
410
411
  BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_SIMPLE_METHOD_IMPL, ~, YB_PG_CLIENT_SIMPLE_METHODS);
412
413
 private:
414
1.65k
  std::string LogPrefix() const {
415
1.65k
    return Format("S $0: ", session_id_);
416
1.65k
  }
417
418
  rpc::RpcController* SetupController(
419
999k
      rpc::RpcController* controller, CoarseTimePoint deadline = CoarseTimePoint()) {
420
999k
    if (deadline != CoarseTimePoint()) {
421
1.71k
      controller->set_deadline(deadline);
422
997k
    } else {
423
997k
      controller->set_timeout(timeout_);
424
997k
    }
425
999k
    return controller;
426
999k
  }
427
428
224k
  rpc::RpcController* PrepareController(CoarseTimePoint deadline = CoarseTimePoint()) {
429
224k
    controller_.Reset();
430
224k
    return SetupController(&controller_, deadline);
431
224k
  }
432
433
2.74k
  rpc::RpcController* PrepareHeartbeatController() {
434
2.74k
    heartbeat_controller_.Reset();
435
2.74k
    heartbeat_controller_.set_timeout(FLAGS_pg_client_heartbeat_interval_ms * 1ms - 1s);
436
2.74k
    return &heartbeat_controller_;
437
2.74k
  }
438
439
  std::unique_ptr<tserver::PgClientServiceProxy> proxy_;
440
  rpc::RpcController controller_;
441
  uint64_t session_id_ = 0;
442
443
  rpc::Poller heartbeat_poller_;
444
  std::atomic<bool> heartbeat_running_{false};
445
  rpc::RpcController heartbeat_controller_;
446
  tserver::PgHeartbeatResponsePB heartbeat_resp_;
447
  std::promise<Result<uint64_t>> create_session_promise_;
448
  std::array<int, 2> tablet_server_count_cache_;
449
  MonoDelta timeout_ = FLAGS_yb_client_admin_operation_timeout_sec * 1s;
450
};
451
452
1.65k
PgClient::PgClient() : impl_(new Impl) {
453
1.65k
}
454
455
1.65k
PgClient::~PgClient() {
456
1.65k
}
457
458
Status PgClient::Start(
459
    rpc::ProxyCache* proxy_cache, rpc::Scheduler* scheduler,
460
1.65k
    const tserver::TServerSharedObject& tserver_shared_object) {
461
1.65k
  return impl_->Start(proxy_cache, scheduler, tserver_shared_object);
462
1.65k
}
463
464
1.65k
void PgClient::Shutdown() {
465
1.65k
  impl_->Shutdown();
466
1.65k
}
467
468
0
void PgClient::SetTimeout(MonoDelta timeout) {
469
0
  impl_->SetTimeout(timeout);
470
0
}
471
472
Result<PgTableDescPtr> PgClient::OpenTable(
473
64.6k
    const PgObjectId& table_id, bool reopen, CoarseTimePoint invalidate_cache_time) {
474
64.6k
  return impl_->OpenTable(table_id, reopen, invalidate_cache_time);
475
64.6k
}
476
477
82.4k
Status PgClient::FinishTransaction(Commit commit, DdlMode ddl_mode) {
478
82.4k
  return impl_->FinishTransaction(commit, ddl_mode);
479
82.4k
}
480
481
1.61k
Result<master::GetNamespaceInfoResponsePB> PgClient::GetDatabaseInfo(uint32_t oid) {
482
1.61k
  return impl_->GetDatabaseInfo(oid);
483
1.61k
}
484
485
Result<std::pair<PgOid, PgOid>> PgClient::ReserveOids(
486
380
    PgOid database_oid, PgOid next_oid, uint32_t count) {
487
380
  return impl_->ReserveOids(database_oid, next_oid, count);
488
380
}
489
490
0
Result<bool> PgClient::IsInitDbDone() {
491
0
  return impl_->IsInitDbDone();
492
0
}
493
494
0
Result<uint64_t> PgClient::GetCatalogMasterVersion() {
495
0
  return impl_->GetCatalogMasterVersion();
496
0
}
497
498
21
Status PgClient::CreateSequencesDataTable() {
499
21
  return impl_->CreateSequencesDataTable();
500
21
}
501
502
Result<client::YBTableName> PgClient::DropTable(
503
1.17k
    tserver::PgDropTableRequestPB* req, CoarseTimePoint deadline) {
504
1.17k
  return impl_->DropTable(req, deadline);
505
1.17k
}
506
507
Status PgClient::BackfillIndex(
508
89
    tserver::PgBackfillIndexRequestPB* req, CoarseTimePoint deadline) {
509
89
  return impl_->BackfillIndex(req, deadline);
510
89
}
511
512
178
Result<int32> PgClient::TabletServerCount(bool primary_only) {
513
178
  return impl_->TabletServerCount(primary_only);
514
178
}
515
516
2
Result<client::TabletServersInfo> PgClient::ListLiveTabletServers(bool primary_only) {
517
2
  return impl_->ListLiveTabletServers(primary_only);
518
2
}
519
520
Status PgClient::SetActiveSubTransaction(
521
48.8k
    SubTransactionId id, tserver::PgPerformOptionsPB* options) {
522
48.8k
  return impl_->SetActiveSubTransaction(id, options);
523
48.8k
}
524
525
23.5k
Status PgClient::RollbackSubTransaction(SubTransactionId id) {
526
23.5k
  return impl_->RollbackSubTransaction(id);
527
23.5k
}
528
529
0
Status PgClient::ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) {
530
0
  return impl_->ValidatePlacement(req);
531
0
}
532
533
void PgClient::PerformAsync(
534
    tserver::PgPerformOptionsPB* options,
535
    PgsqlOps* operations,
536
775k
    const PerformCallback& callback) {
537
775k
  impl_->PerformAsync(options, operations, callback);
538
775k
}
539
540
#define YB_PG_CLIENT_SIMPLE_METHOD_DEFINE(r, data, method) \
541
Status PgClient::method( \
542
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)* req, \
543
1.64k
    CoarseTimePoint deadline) { \
544
1.64k
  return impl_->method(req, deadline); \
545
1.64k
}
Unexecuted instantiation: _ZN2yb6pggate8PgClient13AlterDatabaseEPNS_7tserver24PgAlterDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE
_ZN2yb6pggate8PgClient10AlterTableEPNS_7tserver21PgAlterTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
543
155
    CoarseTimePoint deadline) { \
544
155
  return impl_->method(req, deadline); \
545
155
}
_ZN2yb6pggate8PgClient14CreateDatabaseEPNS_7tserver25PgCreateDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
543
22
    CoarseTimePoint deadline) { \
544
22
  return impl_->method(req, deadline); \
545
22
}
_ZN2yb6pggate8PgClient11CreateTableEPNS_7tserver22PgCreateTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
543
1.41k
    CoarseTimePoint deadline) { \
544
1.41k
  return impl_->method(req, deadline); \
545
1.41k
}
_ZN2yb6pggate8PgClient16CreateTablegroupEPNS_7tserver27PgCreateTablegroupRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
543
1
    CoarseTimePoint deadline) { \
544
1
  return impl_->method(req, deadline); \
545
1
}
_ZN2yb6pggate8PgClient12DropDatabaseEPNS_7tserver23PgDropDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
543
21
    CoarseTimePoint deadline) { \
544
21
  return impl_->method(req, deadline); \
545
21
}
_ZN2yb6pggate8PgClient14DropTablegroupEPNS_7tserver25PgDropTablegroupRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
543
1
    CoarseTimePoint deadline) { \
544
1
  return impl_->method(req, deadline); \
545
1
}
_ZN2yb6pggate8PgClient13TruncateTableEPNS_7tserver24PgTruncateTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE
Line
Count
Source
543
31
    CoarseTimePoint deadline) { \
544
31
  return impl_->method(req, deadline); \
545
31
}
546
547
BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_SIMPLE_METHOD_DEFINE, ~, YB_PG_CLIENT_SIMPLE_METHODS);
548
549
}  // namespace pggate
550
}  // namespace yb