YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_client.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/pggate/pg_client.h"
15
16
#include "yb/client/client-internal.h"
17
#include "yb/client/table.h"
18
#include "yb/client/table_info.h"
19
#include "yb/client/tablet_server.h"
20
#include "yb/client/yb_table_name.h"
21
22
#include "yb/gutil/casts.h"
23
24
#include "yb/rpc/poller.h"
25
#include "yb/rpc/rpc_controller.h"
26
27
#include "yb/tserver/pg_client.pb.h"
28
#include "yb/tserver/pg_client.proxy.h"
29
#include "yb/tserver/tserver_shared_mem.h"
30
31
#include "yb/util/logging.h"
32
#include "yb/util/protobuf_util.h"
33
#include "yb/util/result.h"
34
#include "yb/util/scope_exit.h"
35
#include "yb/util/shared_mem.h"
36
#include "yb/util/status.h"
37
38
#include "yb/yql/pggate/pg_op.h"
39
#include "yb/yql/pggate/pg_tabledesc.h"
40
41
DECLARE_bool(use_node_hostname_for_local_tserver);
42
DECLARE_int32(backfill_index_client_rpc_timeout_ms);
43
DECLARE_int32(yb_client_admin_operation_timeout_sec);
44
45
DEFINE_uint64(pg_client_heartbeat_interval_ms, 10000, "Pg client heartbeat interval in ms.");
46
47
using namespace std::literals;
48
49
namespace yb {
50
namespace pggate {
51
52
namespace {
53
54
// Adding this value to RPC call timeout, so postgres could detect timeout by it's own mechanism
55
// and report it.
56
const auto kExtraTimeout = 2s;
57
58
struct PerformData {
59
  PgsqlOps operations;
60
  tserver::PgPerformResponsePB resp;
61
  rpc::RpcController controller;
62
  PerformCallback callback;
63
64
2.06M
  CHECKED_STATUS Process() {
65
2.06M
    auto& responses = *resp.mutable_responses();
66
2.06M
    SCHECK_EQ(implicit_cast<size_t>(responses.size()), operations.size(), RuntimeError,
67
2.06M
              Format("Wrong number of responses: $0, while $1 expected",
68
2.06M
                     responses.size(), operations.size()));
69
12.3M
    
for (uint32_t i = 0; 2.06M
i != operations.size();
++i10.3M
) {
70
10.3M
      if (
responses[i].has_rows_data_sidecar()10.3M
) {
71
10.3M
        operations[i]->rows_data() = VERIFY_RESULT(
72
10.3M
            controller.GetSidecarPtr(responses[i].rows_data_sidecar()));
73
10.3M
      }
74
10.3M
      operations[i]->response() = std::move(responses[i]);
75
10.3M
    }
76
2.06M
    return Status::OK();
77
2.06M
  }
78
};
79
80
7
std::string PrettyFunctionName(const char* name) {
81
7
  std::string result;
82
90
  for (const char* ch = name; *ch; 
++ch83
) {
83
83
    if (!result.empty() && 
std::isupper(*ch)76
) {
84
7
      result += ' ';
85
7
    }
86
83
    result += *ch;
87
83
  }
88
7
  return result;
89
7
}
90
91
} // namespace
92
93
class PgClient::Impl {
94
 public:
95
6.08k
  Impl() : heartbeat_poller_(std::bind(&Impl::Heartbeat, this, false)) {
96
6.08k
    tablet_server_count_cache_.fill(0);
97
6.08k
  }
98
99
6.06k
  ~Impl() {
100
6.06k
    CHECK(!proxy_);
101
6.06k
  }
102
103
  CHECKED_STATUS Start(rpc::ProxyCache* proxy_cache,
104
                       rpc::Scheduler* scheduler,
105
6.09k
                       const tserver::TServerSharedObject& tserver_shared_object) {
106
6.09k
    CHECK_NOTNULL(&tserver_shared_object);
107
6.09k
    MonoDelta resolve_cache_timeout;
108
6.09k
    const auto& tserver_shared_data_ = *tserver_shared_object;
109
6.09k
    HostPort host_port(tserver_shared_data_.endpoint());
110
6.09k
    if (FLAGS_use_node_hostname_for_local_tserver) {
111
6
      host_port = HostPort(tserver_shared_data_.host().ToBuffer(),
112
6
                           tserver_shared_data_.endpoint().port());
113
6
      resolve_cache_timeout = MonoDelta::kMax;
114
6
    }
115
6.09k
    LOG(INFO) << "Using TServer host_port: " << host_port;
116
6.09k
    proxy_ = std::make_unique<tserver::PgClientServiceProxy>(
117
6.09k
        proxy_cache, host_port, nullptr /* protocol */, resolve_cache_timeout);
118
119
6.09k
    auto future = create_session_promise_.get_future();
120
6.09k
    Heartbeat(true);
121
6.09k
    session_id_ = VERIFY_RESULT(future.get());
122
6.09k
    LOG_WITH_PREFIX(INFO) << "Session id acquired";
123
6.09k
    heartbeat_poller_.Start(scheduler, FLAGS_pg_client_heartbeat_interval_ms * 1ms);
124
6.09k
    return Status::OK();
125
6.09k
  }
126
127
6.07k
  void Shutdown() {
128
6.07k
    heartbeat_poller_.Shutdown();
129
6.07k
    proxy_ = nullptr;
130
6.07k
  }
131
132
10.8k
  void Heartbeat(bool create) {
133
10.8k
    {
134
10.8k
      bool expected = false;
135
10.8k
      if (!heartbeat_running_.compare_exchange_strong(expected, true)) {
136
12
        LOG_WITH_PREFIX(DFATAL) << "Heartbeat did not complete yet";
137
12
        return;
138
12
      }
139
10.8k
    }
140
10.7k
    tserver::PgHeartbeatRequestPB req;
141
10.7k
    if (!create) {
142
4.69k
      req.set_session_id(session_id_);
143
4.69k
    }
144
10.7k
    proxy_->HeartbeatAsync(
145
10.7k
        req, &heartbeat_resp_, PrepareHeartbeatController(),
146
10.7k
        [this, create] {
147
10.7k
      auto status = ResponseStatus(heartbeat_resp_);
148
10.7k
      if (create) {
149
6.09k
        if (!status.ok()) {
150
0
          create_session_promise_.set_value(status);
151
6.09k
        } else {
152
6.09k
          create_session_promise_.set_value(heartbeat_resp_.session_id());
153
6.09k
        }
154
6.09k
      }
155
10.7k
      heartbeat_running_ = false;
156
10.7k
      if (!status.ok()) {
157
0
        LOG_WITH_PREFIX(WARNING) << "Heartbeat failed: " << status;
158
0
      }
159
10.7k
    });
160
10.7k
  }
161
162
2
  void SetTimeout(MonoDelta timeout) {
163
2
    timeout_ = timeout + kExtraTimeout;
164
2
  }
165
166
  Result<PgTableDescPtr> OpenTable(
167
191k
      const PgObjectId& table_id, bool reopen, CoarseTimePoint invalidate_cache_time) {
168
191k
    tserver::PgOpenTableRequestPB req;
169
191k
    req.set_table_id(table_id.GetYbTableId());
170
191k
    req.set_reopen(reopen);
171
191k
    if (invalidate_cache_time != CoarseTimePoint()) {
172
1.04k
      req.set_invalidate_cache_time_us(ToMicroseconds(invalidate_cache_time.time_since_epoch()));
173
1.04k
    }
174
191k
    tserver::PgOpenTableResponsePB resp;
175
176
191k
    RETURN_NOT_OK(proxy_->OpenTable(req, &resp, PrepareController()));
177
191k
    RETURN_NOT_OK(ResponseStatus(resp));
178
179
191k
    auto partitions = std::make_shared<client::VersionedTablePartitionList>();
180
191k
    partitions->version = resp.partitions().version();
181
191k
    partitions->keys.assign(resp.partitions().keys().begin(), resp.partitions().keys().end());
182
183
191k
    auto result = make_scoped_refptr<PgTableDesc>(
184
191k
        table_id, resp.info(), std::move(partitions));
185
191k
    RETURN_NOT_OK(result->Init());
186
191k
    return result;
187
191k
  }
188
189
218k
  CHECKED_STATUS FinishTransaction(Commit commit, DdlMode ddl_mode) {
190
218k
    tserver::PgFinishTransactionRequestPB req;
191
218k
    req.set_session_id(session_id_);
192
218k
    req.set_commit(commit);
193
218k
    req.set_ddl_mode(ddl_mode);
194
218k
    tserver::PgFinishTransactionResponsePB resp;
195
196
218k
    RETURN_NOT_OK(proxy_->FinishTransaction(req, &resp, PrepareController()));
197
218k
    return ResponseStatus(resp);
198
218k
  }
199
200
5.71k
  Result<master::GetNamespaceInfoResponsePB> GetDatabaseInfo(uint32_t oid) {
201
5.71k
    tserver::PgGetDatabaseInfoRequestPB req;
202
5.71k
    req.set_oid(oid);
203
204
5.71k
    tserver::PgGetDatabaseInfoResponsePB resp;
205
206
5.71k
    RETURN_NOT_OK(proxy_->GetDatabaseInfo(req, &resp, PrepareController()));
207
5.71k
    RETURN_NOT_OK(ResponseStatus(resp));
208
5.71k
    return resp.info();
209
5.71k
  }
210
211
  CHECKED_STATUS SetActiveSubTransaction(
212
61.7k
      SubTransactionId id, tserver::PgPerformOptionsPB* options) {
213
61.7k
    tserver::PgSetActiveSubTransactionRequestPB req;
214
61.7k
    req.set_session_id(session_id_);
215
61.7k
    if (options) {
216
7.28k
      options->Swap(req.mutable_options());
217
7.28k
    }
218
61.7k
    req.set_sub_transaction_id(id);
219
220
61.7k
    tserver::PgSetActiveSubTransactionResponsePB resp;
221
222
61.7k
    RETURN_NOT_OK(proxy_->SetActiveSubTransaction(req, &resp, PrepareController()));
223
61.7k
    return ResponseStatus(resp);
224
61.7k
  }
225
226
13.5k
  CHECKED_STATUS RollbackSubTransaction(SubTransactionId id) {
227
13.5k
    tserver::PgRollbackSubTransactionRequestPB req;
228
13.5k
    req.set_session_id(session_id_);
229
13.5k
    req.set_sub_transaction_id(id);
230
231
13.5k
    tserver::PgRollbackSubTransactionResponsePB resp;
232
233
13.5k
    RETURN_NOT_OK(proxy_->RollbackSubTransaction(req, &resp, PrepareController()));
234
13.5k
    return ResponseStatus(resp);
235
13.5k
  }
236
237
  CHECKED_STATUS InsertSequenceTuple(int64_t db_oid,
238
                                     int64_t seq_oid,
239
                                     uint64_t ysql_catalog_version,
240
                                     int64_t last_val,
241
295
                                     bool is_called) {
242
295
    tserver::PgInsertSequenceTupleRequestPB req;
243
295
    req.set_session_id(session_id_);
244
295
    req.set_db_oid(db_oid);
245
295
    req.set_seq_oid(seq_oid);
246
295
    req.set_ysql_catalog_version(ysql_catalog_version);
247
295
    req.set_last_val(last_val);
248
295
    req.set_is_called(is_called);
249
250
295
    tserver::PgInsertSequenceTupleResponsePB resp;
251
252
295
    RETURN_NOT_OK(proxy_->InsertSequenceTuple(req, &resp, PrepareController()));
253
295
    return ResponseStatus(resp);
254
295
  }
255
256
  Result<bool> UpdateSequenceTuple(int64_t db_oid,
257
                                   int64_t seq_oid,
258
                                   uint64_t ysql_catalog_version,
259
                                   int64_t last_val,
260
                                   bool is_called,
261
                                   boost::optional<int64_t> expected_last_val,
262
2.97k
                                   boost::optional<bool> expected_is_called) {
263
2.97k
    tserver::PgUpdateSequenceTupleRequestPB req;
264
2.97k
    req.set_session_id(session_id_);
265
2.97k
    req.set_db_oid(db_oid);
266
2.97k
    req.set_seq_oid(seq_oid);
267
2.97k
    req.set_ysql_catalog_version(ysql_catalog_version);
268
2.97k
    req.set_last_val(last_val);
269
2.97k
    req.set_is_called(is_called);
270
2.97k
    if (expected_last_val && 
expected_is_called2.95k
) {
271
2.95k
      req.set_has_expected(true);
272
2.95k
      req.set_expected_last_val(*expected_last_val);
273
2.95k
      req.set_expected_is_called(*expected_is_called);
274
2.95k
    }
275
276
2.97k
    tserver::PgUpdateSequenceTupleResponsePB resp;
277
278
2.97k
    RETURN_NOT_OK(proxy_->UpdateSequenceTuple(req, &resp, PrepareController()));
279
2.97k
    RETURN_NOT_OK(ResponseStatus(resp));
280
2.97k
    return resp.skipped();
281
2.97k
  }
282
283
  Result<std::pair<int64_t, bool>> ReadSequenceTuple(int64_t db_oid,
284
                                                     int64_t seq_oid,
285
3.23k
                                                     uint64_t ysql_catalog_version) {
286
3.23k
    tserver::PgReadSequenceTupleRequestPB req;
287
3.23k
    req.set_session_id(session_id_);
288
3.23k
    req.set_db_oid(db_oid);
289
3.23k
    req.set_seq_oid(seq_oid);
290
3.23k
    req.set_ysql_catalog_version(ysql_catalog_version);
291
292
3.23k
    tserver::PgReadSequenceTupleResponsePB resp;
293
294
3.23k
    RETURN_NOT_OK(proxy_->ReadSequenceTuple(req, &resp, PrepareController()));
295
3.23k
    RETURN_NOT_OK(ResponseStatus(resp));
296
3.23k
    return std::make_pair(resp.last_val(), resp.is_called());
297
3.23k
  }
298
299
282
  CHECKED_STATUS DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) {
300
282
    tserver::PgDeleteSequenceTupleRequestPB req;
301
282
    req.set_session_id(session_id_);
302
282
    req.set_db_oid(db_oid);
303
282
    req.set_seq_oid(seq_oid);
304
305
282
    tserver::PgDeleteSequenceTupleResponsePB resp;
306
307
282
    RETURN_NOT_OK(proxy_->DeleteSequenceTuple(req, &resp, PrepareController()));
308
282
    return ResponseStatus(resp);
309
282
  }
310
311
71
  CHECKED_STATUS DeleteDBSequences(int64_t db_oid) {
312
71
    tserver::PgDeleteDBSequencesRequestPB req;
313
71
    req.set_session_id(session_id_);
314
71
    req.set_db_oid(db_oid);
315
316
71
    tserver::PgDeleteDBSequencesResponsePB resp;
317
318
71
    RETURN_NOT_OK(proxy_->DeleteDBSequences(req, &resp, PrepareController()));
319
71
    return ResponseStatus(resp);
320
71
  }
321
322
  void PerformAsync(
323
      tserver::PgPerformOptionsPB* options,
324
      PgsqlOps* operations,
325
2.16M
      const PerformCallback& callback) {
326
2.16M
    tserver::PgPerformRequestPB req;
327
2.16M
    req.set_session_id(session_id_);
328
2.16M
    *req.mutable_options() = std::move(*options);
329
2.17M
    auto se = ScopeExit([&req] {
330
11.5M
      for (auto& op : *req.mutable_ops()) {
331
11.5M
        if (!op.release_read()) {
332
7.20M
          op.release_write();
333
7.20M
        }
334
11.5M
      }
335
2.17M
    });
336
2.16M
    PrepareOperations(&req, operations);
337
338
2.16M
    auto data = std::make_shared<PerformData>();
339
2.16M
    data->operations = std::move(*operations);
340
2.16M
    data->callback = callback;
341
2.16M
    data->controller.set_invoke_callback_mode(rpc::InvokeCallbackMode::kReactorThread);
342
343
2.17M
    proxy_->PerformAsync(req, &data->resp, SetupController(&data->controller), [data] {
344
2.17M
      PerformResult result;
345
2.17M
      result.status = data->controller.status();
346
2.17M
      if (result.status.ok()) {
347
2.17M
        result.status = ResponseStatus(data->resp);
348
2.17M
      }
349
2.17M
      if (result.status.ok()) {
350
2.06M
        result.status = data->Process();
351
2.06M
      }
352
2.17M
      if (result.status.ok() && 
data->resp.has_catalog_read_time()2.06M
) {
353
77.1k
        result.catalog_read_time = ReadHybridTime::FromPB(data->resp.catalog_read_time());
354
77.1k
      }
355
2.17M
      data->callback(result);
356
2.17M
    });
357
2.16M
  }
358
359
2.16M
  void PrepareOperations(tserver::PgPerformRequestPB* req, PgsqlOps* operations) {
360
2.16M
    auto& ops = *req->mutable_ops();
361
2.16M
    ops.Reserve(narrow_cast<int>(operations->size()));
362
11.5M
    for (auto& op : *operations) {
363
11.5M
      auto* union_op = ops.Add();
364
11.5M
      if (op->is_read()) {
365
4.34M
        auto& read_op = down_cast<PgsqlReadOp&>(*op);
366
4.34M
        union_op->set_allocated_read(&read_op.read_request());
367
4.34M
        if (read_op.read_from_followers()) {
368
93
          union_op->set_read_from_followers(true);
369
93
        }
370
7.20M
      } else {
371
7.20M
        auto& write_op = down_cast<PgsqlWriteOp&>(*op);
372
7.20M
        if (write_op.write_time()) {
373
371k
          req->set_write_time(write_op.write_time().ToUint64());
374
371k
        }
375
7.20M
        union_op->set_allocated_write(&write_op.write_request());
376
7.20M
      }
377
11.5M
      if (op->read_time()) {
378
48.5k
        op->read_time().AddToPB(req->mutable_options());
379
48.5k
      }
380
11.5M
    }
381
2.16M
  }
382
383
805
  Result<std::pair<PgOid, PgOid>> ReserveOids(PgOid database_oid, PgOid next_oid, uint32_t count) {
384
805
    tserver::PgReserveOidsRequestPB req;
385
805
    req.set_database_oid(database_oid);
386
805
    req.set_next_oid(next_oid);
387
805
    req.set_count(count);
388
389
805
    tserver::PgReserveOidsResponsePB resp;
390
391
805
    RETURN_NOT_OK(proxy_->ReserveOids(req, &resp, PrepareController()));
392
805
    RETURN_NOT_OK(ResponseStatus(resp));
393
805
    return std::pair<PgOid, PgOid>(resp.begin_oid(), resp.end_oid());
394
805
  }
395
396
2
  Result<bool> IsInitDbDone() {
397
2
    tserver::PgIsInitDbDoneRequestPB req;
398
2
    tserver::PgIsInitDbDoneResponsePB resp;
399
400
2
    RETURN_NOT_OK(proxy_->IsInitDbDone(req, &resp, PrepareController()));
401
2
    RETURN_NOT_OK(ResponseStatus(resp));
402
2
    return resp.done();
403
2
  }
404
405
22
  Result<uint64_t> GetCatalogMasterVersion() {
406
22
    tserver::PgGetCatalogMasterVersionRequestPB req;
407
22
    tserver::PgGetCatalogMasterVersionResponsePB resp;
408
409
22
    RETURN_NOT_OK(proxy_->GetCatalogMasterVersion(req, &resp, PrepareController()));
410
22
    RETURN_NOT_OK(ResponseStatus(resp));
411
22
    return resp.version();
412
22
  }
413
414
0
  CHECKED_STATUS CreateSequencesDataTable() {
415
0
    tserver::PgCreateSequencesDataTableRequestPB req;
416
0
    tserver::PgCreateSequencesDataTableResponsePB resp;
417
418
0
    RETURN_NOT_OK(proxy_->CreateSequencesDataTable(req, &resp, PrepareController()));
419
0
    return ResponseStatus(resp);
420
0
  }
421
422
  Result<client::YBTableName> DropTable(
423
4.14k
      tserver::PgDropTableRequestPB* req, CoarseTimePoint deadline) {
424
4.14k
    req->set_session_id(session_id_);
425
4.14k
    tserver::PgDropTableResponsePB resp;
426
4.14k
    RETURN_NOT_OK(proxy_->DropTable(*req, &resp, PrepareController(deadline)));
427
4.14k
    RETURN_NOT_OK(ResponseStatus(resp));
428
4.14k
    client::YBTableName result;
429
4.14k
    if (resp.has_indexed_table()) {
430
667
      result.GetFromTableIdentifierPB(resp.indexed_table());
431
667
    }
432
4.14k
    return result;
433
4.14k
  }
434
435
  CHECKED_STATUS BackfillIndex(
436
540
      tserver::PgBackfillIndexRequestPB* req, CoarseTimePoint deadline) {
437
540
    tserver::PgBackfillIndexResponsePB resp;
438
540
    req->set_session_id(session_id_);
439
440
540
    RETURN_NOT_OK(proxy_->BackfillIndex(*req, &resp, PrepareController(deadline)));
441
536
    return ResponseStatus(resp);
442
540
  }
443
444
6.02k
  Result<int32> TabletServerCount(bool primary_only) {
445
6.02k
    if (tablet_server_count_cache_[primary_only] > 0) {
446
5.84k
      return tablet_server_count_cache_[primary_only];
447
5.84k
    }
448
180
    tserver::PgTabletServerCountRequestPB req;
449
180
    tserver::PgTabletServerCountResponsePB resp;
450
180
    req.set_primary_only(primary_only);
451
452
180
    RETURN_NOT_OK(proxy_->TabletServerCount(req, &resp, PrepareController()));
453
180
    RETURN_NOT_OK(ResponseStatus(resp));
454
180
    tablet_server_count_cache_[primary_only] = resp.count();
455
180
    return resp.count();
456
180
  }
457
458
4
  Result<client::TabletServersInfo> ListLiveTabletServers(bool primary_only) {
459
4
    tserver::PgListLiveTabletServersRequestPB req;
460
4
    tserver::PgListLiveTabletServersResponsePB resp;
461
4
    req.set_primary_only(primary_only);
462
463
4
    RETURN_NOT_OK(proxy_->ListLiveTabletServers(req, &resp, PrepareController()));
464
4
    RETURN_NOT_OK(ResponseStatus(resp));
465
4
    client::TabletServersInfo result;
466
4
    result.reserve(resp.servers().size());
467
12
    for (const auto& server : resp.servers()) {
468
12
      result.push_back(client::YBTabletServerPlacementInfo::FromPB(server));
469
12
    }
470
4
    return result;
471
4
  }
472
473
1
  CHECKED_STATUS ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) {
474
1
    tserver::PgValidatePlacementResponsePB resp;
475
1
    RETURN_NOT_OK(proxy_->ValidatePlacement(*req, &resp, PrepareController()));
476
1
    return ResponseStatus(resp);
477
1
  }
478
479
  #define YB_PG_CLIENT_SIMPLE_METHOD_IMPL(r, data, method) \
480
  CHECKED_STATUS method( \
481
      tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)* req, \
482
6.50k
      CoarseTimePoint deadline) { \
483
6.50k
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
484
6.50k
    req->set_session_id(session_id_); \
485
6.50k
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
486
6.50k
    if (!status.ok()) { \
487
9
      if (status.IsTimedOut()) { \
488
7
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
489
7
      } \
490
9
      
return status2
; \
491
9
    } \
492
6.50k
    
return ResponseStatus(resp)6.49k
; \
493
6.50k
  }
yb::pggate::PgClient::Impl::AlterDatabase(yb::tserver::PgAlterDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
482
3
      CoarseTimePoint deadline) { \
483
3
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
484
3
    req->set_session_id(session_id_); \
485
3
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
486
3
    if (!status.ok()) { \
487
0
      if (status.IsTimedOut()) { \
488
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
489
0
      } \
490
0
      return status; \
491
0
    } \
492
3
    return ResponseStatus(resp); \
493
3
  }
yb::pggate::PgClient::Impl::AlterTable(yb::tserver::PgAlterTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
482
522
      CoarseTimePoint deadline) { \
483
522
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
484
522
    req->set_session_id(session_id_); \
485
522
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
486
522
    if (!status.ok()) { \
487
0
      if (status.IsTimedOut()) { \
488
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
489
0
      } \
490
0
      return status; \
491
0
    } \
492
522
    return ResponseStatus(resp); \
493
522
  }
yb::pggate::PgClient::Impl::CreateDatabase(yb::tserver::PgCreateDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
482
134
      CoarseTimePoint deadline) { \
483
134
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
484
134
    req->set_session_id(session_id_); \
485
134
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
486
134
    if (!status.ok()) { \
487
2
      if (status.IsTimedOut()) { \
488
2
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
489
2
      } \
490
2
      
return status0
; \
491
2
    } \
492
134
    
return ResponseStatus(resp)132
; \
493
134
  }
yb::pggate::PgClient::Impl::CreateTable(yb::tserver::PgCreateTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
482
5.05k
      CoarseTimePoint deadline) { \
483
5.05k
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
484
5.05k
    req->set_session_id(session_id_); \
485
5.05k
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
486
5.05k
    if (!status.ok()) { \
487
7
      if (status.IsTimedOut()) { \
488
5
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
489
5
      } \
490
7
      
return status2
; \
491
7
    } \
492
5.05k
    
return ResponseStatus(resp)5.05k
; \
493
5.05k
  }
yb::pggate::PgClient::Impl::CreateTablegroup(yb::tserver::PgCreateTablegroupRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
482
54
      CoarseTimePoint deadline) { \
483
54
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
484
54
    req->set_session_id(session_id_); \
485
54
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
486
54
    if (!status.ok()) { \
487
0
      if (status.IsTimedOut()) { \
488
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
489
0
      } \
490
0
      return status; \
491
0
    } \
492
54
    return ResponseStatus(resp); \
493
54
  }
yb::pggate::PgClient::Impl::DropDatabase(yb::tserver::PgDropDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
482
72
      CoarseTimePoint deadline) { \
483
72
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
484
72
    req->set_session_id(session_id_); \
485
72
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
486
72
    if (!status.ok()) { \
487
0
      if (status.IsTimedOut()) { \
488
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
489
0
      } \
490
0
      return status; \
491
0
    } \
492
72
    return ResponseStatus(resp); \
493
72
  }
yb::pggate::PgClient::Impl::DropTablegroup(yb::tserver::PgDropTablegroupRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
482
39
      CoarseTimePoint deadline) { \
483
39
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
484
39
    req->set_session_id(session_id_); \
485
39
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
486
39
    if (!status.ok()) { \
487
0
      if (status.IsTimedOut()) { \
488
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
489
0
      } \
490
0
      return status; \
491
0
    } \
492
39
    return ResponseStatus(resp); \
493
39
  }
yb::pggate::PgClient::Impl::TruncateTable(yb::tserver::PgTruncateTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
482
624
      CoarseTimePoint deadline) { \
483
624
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \
484
624
    req->set_session_id(session_id_); \
485
624
    auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \
486
624
    if (!status.ok()) { \
487
0
      if (status.IsTimedOut()) { \
488
0
        return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \
489
0
      } \
490
0
      return status; \
491
0
    } \
492
624
    return ResponseStatus(resp); \
493
624
  }
494
495
  BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_SIMPLE_METHOD_IMPL, ~, YB_PG_CLIENT_SIMPLE_METHODS);
496
497
 private:
498
6.10k
  std::string LogPrefix() const {
499
6.10k
    return Format("S $0: ", session_id_);
500
6.10k
  }
501
502
  rpc::RpcController* SetupController(
503
2.68M
      rpc::RpcController* controller, CoarseTimePoint deadline = CoarseTimePoint()) {
504
2.68M
    if (deadline != CoarseTimePoint()) {
505
6.97k
      controller->set_deadline(deadline);
506
2.67M
    } else {
507
2.67M
      controller->set_timeout(timeout_);
508
2.67M
    }
509
2.68M
    return controller;
510
2.68M
  }
511
512
509k
  rpc::RpcController* PrepareController(CoarseTimePoint deadline = CoarseTimePoint()) {
513
509k
    controller_.Reset();
514
509k
    return SetupController(&controller_, deadline);
515
509k
  }
516
517
10.7k
  rpc::RpcController* PrepareHeartbeatController() {
518
10.7k
    heartbeat_controller_.Reset();
519
10.7k
    heartbeat_controller_.set_timeout(FLAGS_pg_client_heartbeat_interval_ms * 1ms - 1s);
520
10.7k
    return &heartbeat_controller_;
521
10.7k
  }
522
523
  std::unique_ptr<tserver::PgClientServiceProxy> proxy_;
524
  rpc::RpcController controller_;
525
  uint64_t session_id_ = 0;
526
527
  rpc::Poller heartbeat_poller_;
528
  std::atomic<bool> heartbeat_running_{false};
529
  rpc::RpcController heartbeat_controller_;
530
  tserver::PgHeartbeatResponsePB heartbeat_resp_;
531
  std::promise<Result<uint64_t>> create_session_promise_;
532
  std::array<int, 2> tablet_server_count_cache_;
533
  MonoDelta timeout_ = FLAGS_yb_client_admin_operation_timeout_sec * 1s;
534
};
535
536
6.09k
PgClient::PgClient() : impl_(new Impl) {
537
6.09k
}
538
539
6.07k
PgClient::~PgClient() {
540
6.07k
}
541
542
Status PgClient::Start(
543
    rpc::ProxyCache* proxy_cache, rpc::Scheduler* scheduler,
544
6.09k
    const tserver::TServerSharedObject& tserver_shared_object) {
545
6.09k
  return impl_->Start(proxy_cache, scheduler, tserver_shared_object);
546
6.09k
}
547
548
6.07k
void PgClient::Shutdown() {
549
6.07k
  impl_->Shutdown();
550
6.07k
}
551
552
2
void PgClient::SetTimeout(MonoDelta timeout) {
553
2
  impl_->SetTimeout(timeout);
554
2
}
555
556
Result<PgTableDescPtr> PgClient::OpenTable(
557
191k
    const PgObjectId& table_id, bool reopen, CoarseTimePoint invalidate_cache_time) {
558
191k
  return impl_->OpenTable(table_id, reopen, invalidate_cache_time);
559
191k
}
560
561
218k
Status PgClient::FinishTransaction(Commit commit, DdlMode ddl_mode) {
562
218k
  return impl_->FinishTransaction(commit, ddl_mode);
563
218k
}
564
565
5.72k
Result<master::GetNamespaceInfoResponsePB> PgClient::GetDatabaseInfo(uint32_t oid) {
566
5.72k
  return impl_->GetDatabaseInfo(oid);
567
5.72k
}
568
569
Result<std::pair<PgOid, PgOid>> PgClient::ReserveOids(
570
805
    PgOid database_oid, PgOid next_oid, uint32_t count) {
571
805
  return impl_->ReserveOids(database_oid, next_oid, count);
572
805
}
573
574
2
Result<bool> PgClient::IsInitDbDone() {
575
2
  return impl_->IsInitDbDone();
576
2
}
577
578
22
Result<uint64_t> PgClient::GetCatalogMasterVersion() {
579
22
  return impl_->GetCatalogMasterVersion();
580
22
}
581
582
0
Status PgClient::CreateSequencesDataTable() {
583
0
  return impl_->CreateSequencesDataTable();
584
0
}
585
586
Result<client::YBTableName> PgClient::DropTable(
587
4.14k
    tserver::PgDropTableRequestPB* req, CoarseTimePoint deadline) {
588
4.14k
  return impl_->DropTable(req, deadline);
589
4.14k
}
590
591
Status PgClient::BackfillIndex(
592
540
    tserver::PgBackfillIndexRequestPB* req, CoarseTimePoint deadline) {
593
540
  return impl_->BackfillIndex(req, deadline);
594
540
}
595
596
6.04k
Result<int32> PgClient::TabletServerCount(bool primary_only) {
597
6.04k
  return impl_->TabletServerCount(primary_only);
598
6.04k
}
599
600
4
Result<client::TabletServersInfo> PgClient::ListLiveTabletServers(bool primary_only) {
601
4
  return impl_->ListLiveTabletServers(primary_only);
602
4
}
603
604
Status PgClient::SetActiveSubTransaction(
605
61.7k
    SubTransactionId id, tserver::PgPerformOptionsPB* options) {
606
61.7k
  return impl_->SetActiveSubTransaction(id, options);
607
61.7k
}
608
609
13.5k
Status PgClient::RollbackSubTransaction(SubTransactionId id) {
610
13.5k
  return impl_->RollbackSubTransaction(id);
611
13.5k
}
612
613
1
Status PgClient::ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) {
614
1
  return impl_->ValidatePlacement(req);
615
1
}
616
617
Status PgClient::InsertSequenceTuple(int64_t db_oid,
618
                                     int64_t seq_oid,
619
                                     uint64_t ysql_catalog_version,
620
                                     int64_t last_val,
621
295
                                     bool is_called) {
622
295
  return impl_->InsertSequenceTuple(db_oid, seq_oid, ysql_catalog_version, last_val, is_called);
623
295
}
624
625
Result<bool> PgClient::UpdateSequenceTuple(int64_t db_oid,
626
                                           int64_t seq_oid,
627
                                           uint64_t ysql_catalog_version,
628
                                           int64_t last_val,
629
                                           bool is_called,
630
                                           boost::optional<int64_t> expected_last_val,
631
2.97k
                                           boost::optional<bool> expected_is_called) {
632
2.97k
  return impl_->UpdateSequenceTuple(
633
2.97k
      db_oid, seq_oid, ysql_catalog_version, last_val, is_called, expected_last_val,
634
2.97k
      expected_is_called);
635
2.97k
}
636
637
Result<std::pair<int64_t, bool>> PgClient::ReadSequenceTuple(int64_t db_oid,
638
                                                             int64_t seq_oid,
639
3.23k
                                                             uint64_t ysql_catalog_version) {
640
3.23k
  return impl_->ReadSequenceTuple(db_oid, seq_oid, ysql_catalog_version);
641
3.23k
}
642
643
282
Status PgClient::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) {
644
282
  return impl_->DeleteSequenceTuple(db_oid, seq_oid);
645
282
}
646
647
71
Status PgClient::DeleteDBSequences(int64_t db_oid) {
648
71
  return impl_->DeleteDBSequences(db_oid);
649
71
}
650
651
void PgClient::PerformAsync(
652
    tserver::PgPerformOptionsPB* options,
653
    PgsqlOps* operations,
654
2.17M
    const PerformCallback& callback) {
655
2.17M
  impl_->PerformAsync(options, operations, callback);
656
2.17M
}
657
658
#define YB_PG_CLIENT_SIMPLE_METHOD_DEFINE(r, data, method) \
659
Status PgClient::method( \
660
    tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)* req, \
661
6.50k
    CoarseTimePoint deadline) { \
662
6.50k
  return impl_->method(req, deadline); \
663
6.50k
}
yb::pggate::PgClient::AlterDatabase(yb::tserver::PgAlterDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
661
3
    CoarseTimePoint deadline) { \
662
3
  return impl_->method(req, deadline); \
663
3
}
yb::pggate::PgClient::AlterTable(yb::tserver::PgAlterTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
661
522
    CoarseTimePoint deadline) { \
662
522
  return impl_->method(req, deadline); \
663
522
}
yb::pggate::PgClient::CreateDatabase(yb::tserver::PgCreateDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
661
134
    CoarseTimePoint deadline) { \
662
134
  return impl_->method(req, deadline); \
663
134
}
yb::pggate::PgClient::CreateTable(yb::tserver::PgCreateTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
661
5.05k
    CoarseTimePoint deadline) { \
662
5.05k
  return impl_->method(req, deadline); \
663
5.05k
}
yb::pggate::PgClient::CreateTablegroup(yb::tserver::PgCreateTablegroupRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
661
54
    CoarseTimePoint deadline) { \
662
54
  return impl_->method(req, deadline); \
663
54
}
yb::pggate::PgClient::DropDatabase(yb::tserver::PgDropDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
661
72
    CoarseTimePoint deadline) { \
662
72
  return impl_->method(req, deadline); \
663
72
}
yb::pggate::PgClient::DropTablegroup(yb::tserver::PgDropTablegroupRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
661
39
    CoarseTimePoint deadline) { \
662
39
  return impl_->method(req, deadline); \
663
39
}
yb::pggate::PgClient::TruncateTable(yb::tserver::PgTruncateTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >)
Line
Count
Source
661
624
    CoarseTimePoint deadline) { \
662
624
  return impl_->method(req, deadline); \
663
624
}
664
665
BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_SIMPLE_METHOD_DEFINE, ~, YB_PG_CLIENT_SIMPLE_METHODS);
666
667
}  // namespace pggate
668
}  // namespace yb