YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/client.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/client/client.h"
34
35
#include <algorithm>
36
#include <limits>
37
#include <map>
38
#include <memory>
39
#include <mutex>
40
#include <set>
41
#include <string>
42
#include <unordered_map>
43
#include <unordered_set>
44
#include <vector>
45
46
#include <boost/container/small_vector.hpp>
47
#include <boost/preprocessor/cat.hpp>
48
#include <boost/preprocessor/stringize.hpp>
49
50
#include "yb/cdc/cdc_service.h"
51
#include "yb/client/client_fwd.h"
52
#include "yb/client/callbacks.h"
53
#include "yb/client/client-internal.h"
54
#include "yb/client/client_builder-internal.h"
55
#include "yb/client/client_utils.h"
56
#include "yb/client/meta_cache.h"
57
#include "yb/client/namespace_alterer.h"
58
#include "yb/client/permissions.h"
59
#include "yb/client/session.h"
60
#include "yb/client/table.h"
61
#include "yb/client/table_alterer.h"
62
#include "yb/client/table_creator.h"
63
#include "yb/client/table_info.h"
64
#include "yb/client/tablet_server.h"
65
#include "yb/client/yb_table_name.h"
66
67
#include "yb/common/common.pb.h"
68
#include "yb/common/common_flags.h"
69
#include "yb/common/entity_ids.h"
70
#include "yb/common/partition.h"
71
#include "yb/common/ql_type.h"
72
#include "yb/common/roles_permissions.h"
73
#include "yb/common/schema.h"
74
#include "yb/common/transaction.h"
75
#include "yb/common/wire_protocol.h"
76
77
#include "yb/gutil/bind.h"
78
#include "yb/gutil/map-util.h"
79
#include "yb/gutil/strings/substitute.h"
80
81
#include "yb/master/master_admin.proxy.h"
82
#include "yb/master/master_client.proxy.h"
83
#include "yb/master/master_cluster.proxy.h"
84
#include "yb/master/master_dcl.proxy.h"
85
#include "yb/master/master_ddl.proxy.h"
86
#include "yb/master/master_replication.proxy.h"
87
#include "yb/master/master_error.h"
88
#include "yb/master/master_util.h"
89
90
#include "yb/rpc/messenger.h"
91
#include "yb/rpc/proxy.h"
92
#include "yb/rpc/rpc.h"
93
94
#include "yb/util/atomic.h"
95
#include "yb/util/flag_tags.h"
96
#include "yb/util/format.h"
97
#include "yb/util/init.h"
98
#include "yb/util/logging.h"
99
#include "yb/util/logging_callback.h"
100
#include "yb/util/mem_tracker.h"
101
#include "yb/util/metric_entity.h"
102
#include "yb/util/monotime.h"
103
#include "yb/util/net/net_util.h"
104
#include "yb/util/result.h"
105
#include "yb/util/scope_exit.h"
106
#include "yb/util/size_literals.h"
107
#include "yb/util/slice.h"
108
#include "yb/util/status.h"
109
#include "yb/util/status_format.h"
110
#include "yb/util/status_log.h"
111
#include "yb/util/strongly_typed_bool.h"
112
113
#include "yb/yql/cql/ql/ptree/pt_option.h"
114
115
using namespace std::literals;
116
117
using yb::master::AlterTableRequestPB;
118
using yb::master::CreateTablegroupRequestPB;
119
using yb::master::CreateTablegroupResponsePB;
120
using yb::master::DeleteTablegroupRequestPB;
121
using yb::master::DeleteTablegroupResponsePB;
122
using yb::master::ListTablegroupsRequestPB;
123
using yb::master::ListTablegroupsResponsePB;
124
using yb::master::GetNamespaceInfoRequestPB;
125
using yb::master::GetNamespaceInfoResponsePB;
126
using yb::master::GetTableLocationsRequestPB;
127
using yb::master::GetTableLocationsResponsePB;
128
using yb::master::GetTabletLocationsRequestPB;
129
using yb::master::GetTabletLocationsResponsePB;
130
using yb::master::GetTransactionStatusTabletsRequestPB;
131
using yb::master::GetTransactionStatusTabletsResponsePB;
132
using yb::master::IsLoadBalancedRequestPB;
133
using yb::master::IsLoadBalancedResponsePB;
134
using yb::master::IsLoadBalancerIdleRequestPB;
135
using yb::master::IsLoadBalancerIdleResponsePB;
136
using yb::master::ListMastersRequestPB;
137
using yb::master::ListMastersResponsePB;
138
using yb::master::ListTablesRequestPB;
139
using yb::master::ListTablesResponsePB;
140
using yb::master::ListTablesResponsePB_TableInfo;
141
using yb::master::ListTabletServersRequestPB;
142
using yb::master::ListTabletServersResponsePB;
143
using yb::master::ListTabletServersResponsePB_Entry;
144
using yb::master::ListLiveTabletServersRequestPB;
145
using yb::master::ListLiveTabletServersResponsePB;
146
using yb::master::ListLiveTabletServersResponsePB_Entry;
147
using yb::master::CreateNamespaceRequestPB;
148
using yb::master::CreateNamespaceResponsePB;
149
using yb::master::DeleteNamespaceRequestPB;
150
using yb::master::DeleteNamespaceResponsePB;
151
using yb::master::ListNamespacesRequestPB;
152
using yb::master::ListNamespacesResponsePB;
153
using yb::master::ReservePgsqlOidsRequestPB;
154
using yb::master::ReservePgsqlOidsResponsePB;
155
using yb::master::GetYsqlCatalogConfigRequestPB;
156
using yb::master::GetYsqlCatalogConfigResponsePB;
157
using yb::master::CreateUDTypeRequestPB;
158
using yb::master::CreateUDTypeResponsePB;
159
using yb::master::AlterRoleRequestPB;
160
using yb::master::AlterRoleResponsePB;
161
using yb::master::CreateRoleRequestPB;
162
using yb::master::CreateRoleResponsePB;
163
using yb::master::DeleteUDTypeRequestPB;
164
using yb::master::DeleteUDTypeResponsePB;
165
using yb::master::DeleteRoleRequestPB;
166
using yb::master::DeleteRoleResponsePB;
167
using yb::master::GetPermissionsRequestPB;
168
using yb::master::GetPermissionsResponsePB;
169
using yb::master::GrantRevokeRoleRequestPB;
170
using yb::master::GrantRevokeRoleResponsePB;
171
using yb::master::GetUDTypeInfoRequestPB;
172
using yb::master::GetUDTypeInfoResponsePB;
173
using yb::master::GrantRevokePermissionResponsePB;
174
using yb::master::GrantRevokePermissionRequestPB;
175
using yb::master::MasterDdlProxy;
176
using yb::master::ReplicationInfoPB;
177
using yb::master::TabletLocationsPB;
178
using yb::master::RedisConfigSetRequestPB;
179
using yb::master::RedisConfigSetResponsePB;
180
using yb::master::RedisConfigGetRequestPB;
181
using yb::master::RedisConfigGetResponsePB;
182
using yb::master::CreateCDCStreamRequestPB;
183
using yb::master::CreateCDCStreamResponsePB;
184
using yb::master::DeleteCDCStreamRequestPB;
185
using yb::master::DeleteCDCStreamResponsePB;
186
using yb::master::GetCDCDBStreamInfoRequestPB;
187
using yb::master::GetCDCDBStreamInfoResponsePB;
188
using yb::master::GetCDCStreamRequestPB;
189
using yb::master::GetCDCStreamResponsePB;
190
using yb::master::UpdateCDCStreamRequestPB;
191
using yb::master::UpdateCDCStreamResponsePB;
192
using yb::master::GetMasterClusterConfigRequestPB;
193
using yb::master::GetMasterClusterConfigResponsePB;
194
using yb::master::CreateTransactionStatusTableRequestPB;
195
using yb::master::CreateTransactionStatusTableResponsePB;
196
using yb::master::UpdateConsumerOnProducerSplitRequestPB;
197
using yb::master::UpdateConsumerOnProducerSplitResponsePB;
198
using yb::master::PlacementInfoPB;
199
using yb::rpc::Messenger;
200
using std::string;
201
using std::vector;
202
using google::protobuf::RepeatedPtrField;
203
204
using namespace yb::size_literals;  // NOLINT.
205
206
DEFINE_bool(client_suppress_created_logs, false,
207
            "Suppress 'Created table ...' messages");
208
TAG_FLAG(client_suppress_created_logs, advanced);
209
TAG_FLAG(client_suppress_created_logs, hidden);
210
211
DEFINE_int32(backfill_index_client_rpc_timeout_ms, 60 * 60 * 1000, // 60 min.
212
             "Timeout for BackfillIndex RPCs from client to master.");
213
TAG_FLAG(backfill_index_client_rpc_timeout_ms, advanced);
214
215
DEFINE_int32(ycql_num_tablets, -1,
216
             "The number of tablets per YCQL table. Default value is -1. "
217
             "Colocated tables are not affected. "
218
             "If it's value is not set then the value of yb_num_shards_per_tserver is used "
219
             "in conjunction with the number of tservers to determine the tablet count. "
220
             "If the user explicitly specifies a value of the tablet count in the Create Table "
221
             "DDL statement (with tablets = x syntax) then it takes precedence over the value "
222
             "of this flag. Needs to be set at tserver.");
223
TAG_FLAG(ycql_num_tablets, runtime);
224
225
DEFINE_int32(ysql_num_tablets, -1,
226
             "The number of tablets per YSQL table. Default value is -1. "
227
             "If it's value is not set then the value of ysql_num_shards_per_tserver is used "
228
             "in conjunction with the number of tservers to determine the tablet count. "
229
             "If the user explicitly specifies a value of the tablet count in the Create Table "
230
             "DDL statement (split into x tablets syntax) then it takes precedence over the "
231
             "value of this flag. Needs to be set at tserver.");
232
TAG_FLAG(ysql_num_tablets, runtime);
233
234
namespace yb {
235
namespace client {
236
237
using internal::MetaCache;
238
using ql::ObjectType;
239
using std::shared_ptr;
240
241
namespace {
242
243
void FillFromRepeatedTabletLocations(
244
    const RepeatedPtrField<TabletLocationsPB>& tablets,
245
    vector<TabletId>* tablet_uuids,
246
    vector<string>* ranges,
247
72
    vector<TabletLocationsPB>* locations) {
248
72
  tablet_uuids->reserve(tablets.size());
249
72
  if (ranges) {
250
72
    ranges->reserve(tablets.size());
251
72
  }
252
72
  if (locations) {
253
57
    locations->reserve(tablets.size());
254
57
  }
255
588
  for (const auto& tablet : tablets) {
256
588
    if (locations) {
257
513
      locations->push_back(tablet);
258
513
    }
259
588
    tablet_uuids->push_back(tablet.tablet_id());
260
588
    if (ranges) {
261
588
      const auto& partition = tablet.partition();
262
588
      ranges->push_back(partition.ShortDebugString());
263
588
    }
264
588
  }
265
72
}
266
267
std::future<FetchPartitionsResult> FetchPartitionsFuture(
268
85.4k
    YBClient* client, const TableId& table_id) {
269
85.4k
  return MakeFuture<FetchPartitionsResult>(
270
85.4k
      [&](const auto& callback) { YBTable::FetchPartitions(client, table_id, callback); });
271
85.4k
}
272
273
} // namespace
274
275
#define CALL_SYNC_LEADER_MASTER_RPC_EX(service, req, resp, method) \
276
140k
  do { \
277
140k
    auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout(); \
278
140k
    CALL_SYNC_LEADER_MASTER_RPC_WITH_DEADLINE(service, req, resp, deadline, method); \
279
138k
  } while(0);
280
281
#define CALL_SYNC_LEADER_MASTER_RPC(req, resp, method) \
282
6.50k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Ddl, req, resp, method)
283
284
#define CALL_SYNC_LEADER_MASTER_RPC_WITH_DEADLINE(service, req, resp, deadline, method) \
285
140k
  do { \
286
140k
    RETURN_NOT_OK(data_->SyncLeaderMasterRpc( \
287
140k
        deadline, req, &resp, BOOST_PP_STRINGIZE(method), \
288
140k
        &master::BOOST_PP_CAT(BOOST_PP_CAT(Master, service), Proxy)::            \
289
140k
            BOOST_PP_CAT(method, Async))); \
290
138k
  } while(0);
291
292
// Adapts between the internal LogSeverity and the client's YBLogSeverity.
293
static void LoggingAdapterCB(YBLoggingCallback* user_cb,
294
                             LogSeverity severity,
295
                             const char* filename,
296
                             int line_number,
297
                             const struct ::tm* time,
298
                             const char* message,
299
0
                             size_t message_len) {
300
0
  YBLogSeverity client_severity;
301
0
  switch (severity) {
302
0
    case yb::SEVERITY_INFO:
303
0
      client_severity = SEVERITY_INFO;
304
0
      break;
305
0
    case yb::SEVERITY_WARNING:
306
0
      client_severity = SEVERITY_WARNING;
307
0
      break;
308
0
    case yb::SEVERITY_ERROR:
309
0
      client_severity = SEVERITY_ERROR;
310
0
      break;
311
0
    case yb::SEVERITY_FATAL:
312
0
      client_severity = SEVERITY_FATAL;
313
0
      break;
314
0
    default:
315
0
      LOG(FATAL) << "Unknown YB log severity: " << severity;
316
0
  }
317
0
  user_cb->Run(client_severity, filename, line_number, time,
318
0
               message, message_len);
319
0
}
320
321
0
void InitLogging() {
322
0
  InitGoogleLoggingSafeBasic("yb_client");
323
0
}
324
325
0
void InstallLoggingCallback(YBLoggingCallback* cb) {
326
0
  RegisterLoggingCallback(Bind(&LoggingAdapterCB, Unretained(cb)));
327
0
}
328
329
0
void UninstallLoggingCallback() {
330
0
  UnregisterLoggingCallback();
331
0
}
332
333
0
void SetVerboseLogLevel(int level) {
334
0
  FLAGS_v = level;
335
0
}
336
337
0
Status SetInternalSignalNumber(int signum) {
338
0
  return SetStackTraceSignal(signum);
339
0
}
340
341
YBClientBuilder::YBClientBuilder()
342
24.4k
  : data_(new YBClientBuilder::Data()) {
343
24.4k
}
344
345
2.24k
YBClientBuilder::~YBClientBuilder() {
346
2.24k
}
347
348
258
YBClientBuilder& YBClientBuilder::clear_master_server_addrs() {
349
258
  data_->master_server_addrs_.clear();
350
258
  return *this;
351
258
}
352
353
0
YBClientBuilder& YBClientBuilder::master_server_addrs(const vector<string>& addrs) {
354
0
  for (const string& addr : addrs) {
355
0
    data_->master_server_addrs_.push_back(addr);
356
0
  }
357
0
  return *this;
358
0
}
359
360
24.5k
YBClientBuilder& YBClientBuilder::add_master_server_addr(const string& addr) {
361
24.5k
  data_->master_server_addrs_.push_back(addr);
362
24.5k
  return *this;
363
24.5k
}
364
365
0
YBClientBuilder& YBClientBuilder::skip_master_flagfile(bool should_skip) {
366
0
  data_->skip_master_flagfile_ = should_skip;
367
0
  return *this;
368
0
}
369
370
27
YBClientBuilder& YBClientBuilder::wait_for_leader_election_on_init(bool should_wait) {
371
27
  data_->wait_for_leader_election_on_init_ = should_wait;
372
27
  return *this;
373
27
}
374
375
10.9k
YBClientBuilder& YBClientBuilder::default_admin_operation_timeout(const MonoDelta& timeout) {
376
10.9k
  data_->default_admin_operation_timeout_ = timeout;
377
10.9k
  return *this;
378
10.9k
}
379
380
24.2k
YBClientBuilder& YBClientBuilder::default_rpc_timeout(const MonoDelta& timeout) {
381
24.2k
  data_->default_rpc_timeout_ = timeout;
382
24.2k
  return *this;
383
24.2k
}
384
385
7.46k
YBClientBuilder& YBClientBuilder::set_num_reactors(int32_t num_reactors) {
386
7.46k
  CHECK_GT(num_reactors, 0);
387
7.46k
  data_->num_reactors_ = num_reactors;
388
7.46k
  return *this;
389
7.46k
}
390
391
24.1k
YBClientBuilder& YBClientBuilder::set_cloud_info_pb(const CloudInfoPB& cloud_info_pb) {
392
24.1k
  data_->cloud_info_pb_ = cloud_info_pb;
393
24.1k
  return *this;
394
24.1k
}
395
396
YBClientBuilder& YBClientBuilder::set_metric_entity(
397
24.1k
    const scoped_refptr<MetricEntity>& metric_entity) {
398
24.1k
  data_->metric_entity_ = metric_entity;
399
24.1k
  return *this;
400
24.1k
}
401
402
24.1k
YBClientBuilder& YBClientBuilder::set_client_name(const std::string& name) {
403
24.1k
  data_->client_name_ = name;
404
24.1k
  return *this;
405
24.1k
}
406
407
0
YBClientBuilder& YBClientBuilder::set_callback_threadpool_size(size_t size) {
408
0
  data_->threadpool_size_ = size;
409
0
  return *this;
410
0
}
411
412
11.6k
YBClientBuilder& YBClientBuilder::set_tserver_uuid(const TabletServerId& uuid) {
413
11.6k
  data_->uuid_ = uuid;
414
11.6k
  return *this;
415
11.6k
}
416
417
24.1k
YBClientBuilder& YBClientBuilder::set_parent_mem_tracker(const MemTrackerPtr& mem_tracker) {
418
24.1k
  data_->parent_mem_tracker_ = mem_tracker;
419
24.1k
  return *this;
420
24.1k
}
421
422
10.8k
YBClientBuilder& YBClientBuilder::set_master_address_flag_name(const std::string& value) {
423
10.8k
  data_->master_address_flag_name_ = value;
424
10.8k
  return *this;
425
10.8k
}
426
427
24.1k
YBClientBuilder& YBClientBuilder::set_skip_master_leader_resolution(bool value) {
428
24.1k
  data_->skip_master_leader_resolution_ = value;
429
24.1k
  return *this;
430
24.1k
}
431
432
10.8k
YBClientBuilder& YBClientBuilder::AddMasterAddressSource(const MasterAddressSource& source) {
433
10.8k
  data_->master_address_sources_.push_back(source);
434
10.8k
  return *this;
435
10.8k
}
436
437
32.0k
Status YBClientBuilder::DoBuild(rpc::Messenger* messenger, std::unique_ptr<YBClient>* client) {
438
32.0k
  RETURN_NOT_OK(CheckCPUFlags());
439
440
32.0k
  std::unique_ptr<YBClient> c(new YBClient());
441
442
  // Init messenger.
443
32.0k
  if (messenger) {
444
31.5k
    c->data_->messenger_holder_ = nullptr;
445
31.5k
    c->data_->messenger_ = messenger;
446
486
  } else {
447
486
    c->data_->messenger_holder_ = VERIFY_RESULT(client::CreateClientMessenger(
448
486
        data_->client_name_, data_->num_reactors_,
449
486
        data_->metric_entity_, data_->parent_mem_tracker_));
450
486
    c->data_->messenger_ = c->data_->messenger_holder_.get();
451
486
  }
452
32.0k
  c->data_->proxy_cache_ = std::make_unique<rpc::ProxyCache>(c->data_->messenger_);
453
32.0k
  c->data_->metric_entity_ = data_->metric_entity_;
454
455
32.0k
  c->data_->master_address_flag_name_ = data_->master_address_flag_name_;
456
32.0k
  c->data_->master_address_sources_ = data_->master_address_sources_;
457
32.0k
  c->data_->master_server_addrs_ = data_->master_server_addrs_;
458
32.0k
  c->data_->skip_master_flagfile_ = data_->skip_master_flagfile_;
459
32.0k
  c->data_->default_admin_operation_timeout_ = data_->default_admin_operation_timeout_;
460
32.0k
  c->data_->default_rpc_timeout_ = data_->default_rpc_timeout_;
461
32.0k
  c->data_->wait_for_leader_election_on_init_ = data_->wait_for_leader_election_on_init_;
462
463
32.0k
  auto callback_threadpool_size = data_->threadpool_size_;
464
32.0k
  if (callback_threadpool_size == YBClientBuilder::Data::kUseNumReactorsAsNumThreads) {
465
0
    callback_threadpool_size = c->data_->messenger_->num_reactors();
466
0
  }
467
32.0k
  c->data_->use_threadpool_for_callbacks_ = callback_threadpool_size != 0;
468
32.0k
  if (callback_threadpool_size == 0) {
469
31.2k
    callback_threadpool_size = 1;
470
31.2k
  }
471
472
  // Not using an underscore because we sometimes get shortened thread names like "master_c" and it
473
  // is clearer to see "mastercb" instead.
474
32.0k
  ThreadPoolBuilder tpb(data_->client_name_ + "cb");
475
32.0k
  tpb.set_max_threads(narrow_cast<int>(callback_threadpool_size));
476
32.0k
  std::unique_ptr<ThreadPool> tp;
477
32.0k
  RETURN_NOT_OK_PREPEND(
478
32.0k
      tpb.Build(&tp),
479
32.0k
      Format("Could not create callback threadpool with $0 max threads",
480
32.0k
             callback_threadpool_size));
481
32.0k
  c->data_->threadpool_ = std::move(tp);
482
483
  // Let's allow for plenty of time for discovering the master the first
484
  // time around.
485
32.0k
  auto deadline = CoarseMonoClock::Now() + c->default_admin_operation_timeout();
486
30.5k
  for (;;) {
487
30.5k
    auto status = c->data_->SetMasterServerProxy(deadline,
488
30.5k
            data_->skip_master_leader_resolution_,
489
30.5k
            data_->wait_for_leader_election_on_init_);
490
30.5k
    if (status.ok()) {
491
22.9k
      break;
492
22.9k
    }
493
7.61k
    if (!status.IsNotFound() || CoarseMonoClock::Now() >= deadline) {
494
7.61k
      RETURN_NOT_OK_PREPEND(status, "Could not locate the leader master")
495
7.61k
    }
496
7.58k
  }
497
498
24.4k
  c->data_->meta_cache_.reset(new MetaCache(c.get()));
499
500
  // Init local host names used for locality decisions.
501
24.4k
  RETURN_NOT_OK_PREPEND(c->data_->InitLocalHostNames(),
502
24.4k
                        "Could not determine local host names");
503
24.4k
  c->data_->cloud_info_pb_ = data_->cloud_info_pb_;
504
24.4k
  c->data_->uuid_ = data_->uuid_;
505
506
24.4k
  client->swap(c);
507
24.4k
  return Status::OK();
508
24.4k
}
509
510
32.0k
Result<std::unique_ptr<YBClient>> YBClientBuilder::Build(rpc::Messenger* messenger) {
511
32.0k
  std::unique_ptr<YBClient> client;
512
32.0k
  RETURN_NOT_OK(DoBuild(messenger, &client));
513
24.4k
  return client;
514
32.0k
}
515
516
Result<std::unique_ptr<YBClient>> YBClientBuilder::Build(
517
0
    std::unique_ptr<rpc::Messenger>&& messenger) {
518
0
  std::unique_ptr<YBClient> client;
519
0
  auto ok = false;
520
0
  auto scope_exit = ScopeExit([&ok, &messenger] {
521
0
    if (!ok) {
522
0
      messenger->Shutdown();
523
0
    }
524
0
  });
525
0
  RETURN_NOT_OK(DoBuild(messenger.get(), &client));
526
0
  ok = true;
527
0
  client->data_->messenger_holder_ = std::move(messenger);
528
0
  return client;
529
0
}
530
531
32.0k
YBClient::YBClient() : data_(new YBClient::Data()) {
532
32.0k
  yb::InitCommonFlags();
533
32.0k
}
534
535
9.76k
YBClient::~YBClient() {
536
9.76k
  Shutdown();
537
9.76k
}
538
539
11.4k
void YBClient::Shutdown() {
540
11.4k
  data_->StartShutdown();
541
11.4k
  if (data_->messenger_holder_) {
542
154
    data_->messenger_holder_->Shutdown();
543
154
  }
544
11.4k
  if (data_->threadpool_) {
545
11.3k
    data_->threadpool_->Shutdown();
546
11.3k
  }
547
11.4k
  data_->CompleteShutdown();
548
11.4k
}
549
550
3.20k
std::unique_ptr<YBTableCreator> YBClient::NewTableCreator() {
551
3.20k
  return std::unique_ptr<YBTableCreator>(new YBTableCreator(this));
552
3.20k
}
553
554
Status YBClient::IsCreateTableInProgress(const YBTableName& table_name,
555
35
                                         bool *create_in_progress) {
556
35
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
557
35
  return data_->IsCreateTableInProgress(this, table_name, "" /* table_id */, deadline,
558
35
                                        create_in_progress);
559
35
}
560
561
0
Status YBClient::WaitForCreateTableToFinish(const YBTableName& table_name) {
562
0
  const auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
563
0
  return WaitForCreateTableToFinish(table_name, deadline);
564
0
}
565
566
Status YBClient::WaitForCreateTableToFinish(
567
0
    const YBTableName& table_name, const CoarseTimePoint& deadline) {
568
0
  return data_->WaitForCreateTableToFinish(this, table_name, "" /* table_id */, deadline);
569
0
}
570
571
0
Status YBClient::WaitForCreateTableToFinish(const string& table_id) {
572
0
  const auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
573
0
  return WaitForCreateTableToFinish(table_id, deadline);
574
0
}
575
576
Status YBClient::WaitForCreateTableToFinish(
577
0
    const string& table_id, const CoarseTimePoint& deadline) {
578
0
  const YBTableName empty_table_name;
579
0
  return data_->WaitForCreateTableToFinish(this, empty_table_name, table_id, deadline);
580
0
}
581
582
3.05k
Status YBClient::TruncateTable(const string& table_id, bool wait) {
583
3.05k
  return TruncateTables({table_id}, wait);
584
3.05k
}
585
586
3.05k
Status YBClient::TruncateTables(const vector<string>& table_ids, bool wait) {
587
3.05k
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
588
3.05k
  return data_->TruncateTables(this, table_ids, deadline, wait);
589
3.05k
}
590
591
89
Status YBClient::BackfillIndex(const TableId& table_id, bool wait, CoarseTimePoint deadline) {
592
89
  if (deadline == CoarseTimePoint()) {
593
0
    deadline = CoarseMonoClock::Now() + FLAGS_backfill_index_client_rpc_timeout_ms * 1ms;
594
0
  }
595
89
  return data_->BackfillIndex(this, YBTableName(), table_id, deadline, wait);
596
89
}
597
598
1.13k
Status YBClient::DeleteTable(const YBTableName& table_name, bool wait) {
599
1.13k
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
600
1.13k
  return data_->DeleteTable(this,
601
1.13k
                            table_name,
602
1.13k
                            "" /* table_id */,
603
1.13k
                            false /* is_index_table */,
604
1.13k
                            deadline,
605
1.13k
                            nullptr /* indexed_table_name */,
606
1.13k
                            wait);
607
1.13k
}
608
609
1.03k
Status YBClient::DeleteTable(const string& table_id, bool wait, CoarseTimePoint deadline) {
610
1.03k
  return data_->DeleteTable(this,
611
1.03k
                            YBTableName(),
612
1.03k
                            table_id,
613
1.03k
                            false /* is_index_table */,
614
1.03k
                            PatchAdminDeadline(deadline),
615
1.03k
                            nullptr /* indexed_table_name */,
616
1.03k
                            wait);
617
1.03k
}
618
619
Status YBClient::DeleteIndexTable(const YBTableName& table_name,
620
                                  YBTableName* indexed_table_name,
621
123
                                  bool wait) {
622
123
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
623
123
  return data_->DeleteTable(this,
624
123
                            table_name,
625
123
                            "" /* table_id */,
626
123
                            true /* is_index_table */,
627
123
                            deadline,
628
123
                            indexed_table_name,
629
123
                            wait);
630
123
}
631
632
Status YBClient::DeleteIndexTable(const string& table_id,
633
                                  YBTableName* indexed_table_name,
634
                                  bool wait,
635
141
                                  CoarseTimePoint deadline) {
636
141
  return data_->DeleteTable(this,
637
141
                            YBTableName(),
638
141
                            table_id,
639
141
                            true /* is_index_table */,
640
141
                            PatchAdminDeadline(deadline),
641
141
                            indexed_table_name,
642
141
                            wait);
643
141
}
644
645
Status YBClient::FlushTables(const std::vector<TableId>& table_ids,
646
                             bool add_indexes,
647
                             int timeout_secs,
648
7
                             bool is_compaction) {
649
7
  auto deadline = CoarseMonoClock::Now() + MonoDelta::FromSeconds(timeout_secs);
650
7
  return data_->FlushTables(this,
651
7
                            table_ids,
652
7
                            add_indexes,
653
7
                            deadline,
654
7
                            is_compaction);
655
7
}
656
657
Status YBClient::FlushTables(const std::vector<YBTableName>& table_names,
658
                             bool add_indexes,
659
                             int timeout_secs,
660
0
                             bool is_compaction) {
661
0
  auto deadline = CoarseMonoClock::Now() + MonoDelta::FromSeconds(timeout_secs);
662
0
  return data_->FlushTables(this,
663
0
                            table_names,
664
0
                            add_indexes,
665
0
                            deadline,
666
0
                            is_compaction);
667
0
}
668
669
167
std::unique_ptr<YBTableAlterer> YBClient::NewTableAlterer(const YBTableName& name) {
670
167
  return std::unique_ptr<YBTableAlterer>(new YBTableAlterer(this, name));
671
167
}
672
673
155
std::unique_ptr<YBTableAlterer> YBClient::NewTableAlterer(const string id) {
674
155
  return std::unique_ptr<YBTableAlterer>(new YBTableAlterer(this, id));
675
155
}
676
677
Status YBClient::IsAlterTableInProgress(const YBTableName& table_name,
678
                                        const string& table_id,
679
0
                                        bool *alter_in_progress) {
680
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
681
0
  return data_->IsAlterTableInProgress(this, table_name, table_id, deadline, alter_in_progress);
682
0
}
683
684
47
Result<YBTableInfo> YBClient::GetYBTableInfo(const YBTableName& table_name) {
685
47
  YBTableInfo info;
686
47
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
687
47
  RETURN_NOT_OK(data_->GetTableSchema(this, table_name, deadline, &info));
688
44
  return info;
689
47
}
690
691
Status YBClient::GetTableSchema(const YBTableName& table_name,
692
                                YBSchema* schema,
693
0
                                PartitionSchema* partition_schema) {
694
0
  Result<YBTableInfo> info = GetYBTableInfo(table_name);
695
0
  if (!info.ok()) {
696
0
    return info.status();
697
0
  }
698
  // Verify it is not an index table.
699
0
  if (info->index_info) {
700
0
    return STATUS(NotFound, "The table does not exist");
701
0
  }
702
703
0
  *schema = std::move(info->schema);
704
0
  *partition_schema = std::move(info->partition_schema);
705
0
  return Status::OK();
706
0
}
707
708
Status YBClient::GetTableSchemaById(const TableId& table_id, std::shared_ptr<YBTableInfo> info,
709
0
                                    StatusCallback callback) {
710
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
711
0
  return data_->GetTableSchemaById(this, table_id, deadline, info, callback);
712
0
}
713
714
Status YBClient::GetTablegroupSchemaById(const TablegroupId& parent_tablegroup_table_id,
715
                                         std::shared_ptr<std::vector<YBTableInfo>> info,
716
0
                                         StatusCallback callback) {
717
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
718
0
  return data_->GetTablegroupSchemaById(this,
719
0
                                        parent_tablegroup_table_id,
720
0
                                        deadline,
721
0
                                        info,
722
0
                                        callback);
723
0
}
724
725
Status YBClient::GetColocatedTabletSchemaById(const TableId& parent_colocated_table_id,
726
                                              std::shared_ptr<std::vector<YBTableInfo>> info,
727
0
                                              StatusCallback callback) {
728
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
729
0
  return data_->GetColocatedTabletSchemaById(this,
730
0
                                             parent_colocated_table_id,
731
0
                                             deadline,
732
0
                                             info,
733
0
                                             callback);
734
0
}
735
736
Result<IndexPermissions> YBClient::GetIndexPermissions(
737
    const TableId& table_id,
738
4
    const TableId& index_id) {
739
4
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
740
4
  return data_->GetIndexPermissions(
741
4
      this,
742
4
      table_id,
743
4
      index_id,
744
4
      deadline);
745
4
}
746
747
Result<IndexPermissions> YBClient::GetIndexPermissions(
748
    const YBTableName& table_name,
749
4
    const YBTableName& index_name) {
750
4
  YBTableInfo table_info = VERIFY_RESULT(GetYBTableInfo(table_name));
751
4
  YBTableInfo index_info = VERIFY_RESULT(GetYBTableInfo(index_name));
752
4
  return GetIndexPermissions(table_info.table_id, index_info.table_id);
753
4
}
754
755
Result<IndexPermissions> YBClient::WaitUntilIndexPermissionsAtLeast(
756
    const TableId& table_id,
757
    const TableId& index_id,
758
    const IndexPermissions& target_index_permissions,
759
    const CoarseTimePoint deadline,
760
0
    const CoarseDuration max_wait) {
761
0
  return data_->WaitUntilIndexPermissionsAtLeast(
762
0
      this,
763
0
      table_id,
764
0
      index_id,
765
0
      target_index_permissions,
766
0
      deadline,
767
0
      max_wait);
768
0
}
769
770
Result<IndexPermissions> YBClient::WaitUntilIndexPermissionsAtLeast(
771
    const TableId& table_id,
772
    const TableId& index_id,
773
    const IndexPermissions& target_index_permissions,
774
0
    const CoarseDuration max_wait) {
775
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
776
0
  return WaitUntilIndexPermissionsAtLeast(
777
0
      table_id,
778
0
      index_id,
779
0
      target_index_permissions,
780
0
      deadline,
781
0
      max_wait);
782
0
}
783
784
Result<IndexPermissions> YBClient::WaitUntilIndexPermissionsAtLeast(
785
    const YBTableName& table_name,
786
    const YBTableName& index_name,
787
    const IndexPermissions& target_index_permissions,
788
    const CoarseTimePoint deadline,
789
65
    const CoarseDuration max_wait) {
790
65
  return data_->WaitUntilIndexPermissionsAtLeast(
791
65
      this,
792
65
      table_name,
793
65
      index_name,
794
65
      target_index_permissions,
795
65
      deadline,
796
65
      max_wait);
797
65
}
798
799
Result<IndexPermissions> YBClient::WaitUntilIndexPermissionsAtLeast(
800
    const YBTableName& table_name,
801
    const YBTableName& index_name,
802
    const IndexPermissions& target_index_permissions,
803
41
    const CoarseDuration max_wait) {
804
41
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
805
41
  return WaitUntilIndexPermissionsAtLeast(
806
41
      table_name,
807
41
      index_name,
808
41
      target_index_permissions,
809
41
      deadline,
810
41
      max_wait);
811
41
}
812
813
Status YBClient::CreateNamespace(const std::string& namespace_name,
814
                                 const boost::optional<YQLDatabase>& database_type,
815
                                 const std::string& creator_role_name,
816
                                 const std::string& namespace_id,
817
                                 const std::string& source_namespace_id,
818
                                 const boost::optional<uint32_t>& next_pg_oid,
819
                                 const TransactionMetadata* txn,
820
                                 const bool colocated,
821
1.80k
                                 CoarseTimePoint deadline) {
822
1.80k
  CreateNamespaceRequestPB req;
823
1.80k
  CreateNamespaceResponsePB resp;
824
1.80k
  req.set_name(namespace_name);
825
1.80k
  if (!creator_role_name.empty()) {
826
904
    req.set_creator_role_name(creator_role_name);
827
904
  }
828
1.80k
  if (database_type) {
829
1.79k
    req.set_database_type(*database_type);
830
1.79k
  }
831
1.80k
  if (!namespace_id.empty()) {
832
46
    req.set_namespace_id(namespace_id);
833
46
  }
834
1.80k
  if (!source_namespace_id.empty()) {
835
22
    req.set_source_namespace_id(source_namespace_id);
836
22
  }
837
1.80k
  if (next_pg_oid) {
838
22
    req.set_next_pg_oid(*next_pg_oid);
839
22
  }
840
1.80k
  if (txn) {
841
22
    txn->ToPB(req.mutable_transaction());
842
22
  }
843
1.80k
  req.set_colocated(colocated);
844
1.80k
  deadline = PatchAdminDeadline(deadline);
845
1.80k
  RETURN_NOT_OK(data_->SyncLeaderMasterRpc(
846
1.80k
      deadline, req, &resp, "CreateNamespace", &MasterDdlProxy::CreateNamespaceAsync));
847
1.80k
  std::string cur_id = resp.has_id() ? resp.id() : namespace_id;
848
849
  // Verify that the namespace we found is running so that, once this request returns,
850
  // the client can send operations without receiving a "namespace not found" error.
851
1.80k
  RETURN_NOT_OK(data_->WaitForCreateNamespaceToFinish(
852
1.80k
      this, namespace_name, database_type, cur_id, deadline));
853
854
1.80k
  return Status::OK();
855
1.80k
}
856
857
Status YBClient::CreateNamespaceIfNotExists(const std::string& namespace_name,
858
                                            const boost::optional<YQLDatabase>& database_type,
859
                                            const std::string& creator_role_name,
860
                                            const std::string& namespace_id,
861
                                            const std::string& source_namespace_id,
862
                                            const boost::optional<uint32_t>& next_pg_oid,
863
200
                                            const bool colocated) {
864
200
  const auto namespace_exists = VERIFY_RESULT(
865
200
      !namespace_id.empty() ? NamespaceIdExists(namespace_id)
866
200
                            : NamespaceExists(namespace_name));
867
200
  if (namespace_exists) {
868
    // Verify that the namespace we found is running so that, once this request returns,
869
    // the client can send operations without receiving a "namespace not found" error.
870
27
    return data_->WaitForCreateNamespaceToFinish(this, namespace_name, database_type, namespace_id,
871
27
        CoarseMonoClock::Now() + default_admin_operation_timeout());
872
27
  }
873
874
173
  Status s = CreateNamespace(namespace_name, database_type, creator_role_name, namespace_id,
875
173
                             source_namespace_id, next_pg_oid, nullptr /* txn */, colocated);
876
173
  if (s.IsAlreadyPresent() && database_type && *database_type == YQLDatabase::YQL_DATABASE_CQL) {
877
0
    return Status::OK();
878
0
  }
879
173
  return s;
880
173
}
881
882
Status YBClient::IsCreateNamespaceInProgress(const std::string& namespace_name,
883
                                             const boost::optional<YQLDatabase>& database_type,
884
                                             const std::string& namespace_id,
885
0
                                             bool *create_in_progress) {
886
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
887
0
  return data_->IsCreateNamespaceInProgress(this, namespace_name, database_type, namespace_id,
888
0
                                            deadline, create_in_progress);
889
0
}
890
891
Status YBClient::DeleteNamespace(const std::string& namespace_name,
892
                                 const boost::optional<YQLDatabase>& database_type,
893
                                 const std::string& namespace_id,
894
1.51k
                                 CoarseTimePoint deadline) {
895
1.51k
  DeleteNamespaceRequestPB req;
896
1.51k
  DeleteNamespaceResponsePB resp;
897
1.51k
  req.mutable_namespace_()->set_name(namespace_name);
898
1.51k
  if (!namespace_id.empty()) {
899
21
    req.mutable_namespace_()->set_id(namespace_id);
900
21
  }
901
1.51k
  if (database_type) {
902
22
    req.set_database_type(*database_type);
903
22
    req.mutable_namespace_()->set_database_type(*database_type);
904
22
  }
905
1.51k
  deadline = PatchAdminDeadline(deadline);
906
1.51k
  RETURN_NOT_OK(data_->SyncLeaderMasterRpc(
907
1.51k
      deadline, req, &resp, "DeleteNamespace", &MasterDdlProxy::DeleteNamespaceAsync));
908
909
  // Verify that, once this request returns, the namespace has been successfully marked as deleted.
910
1.51k
  RETURN_NOT_OK(data_->WaitForDeleteNamespaceToFinish(this, namespace_name, database_type,
911
1.51k
      namespace_id, CoarseMonoClock::Now() + default_admin_operation_timeout()));
912
913
1.51k
  return Status::OK();
914
1.51k
}
915
916
Status YBClient::IsDeleteNamespaceInProgress(const std::string& namespace_name,
917
                                             const boost::optional<YQLDatabase>& database_type,
918
                                             const std::string& namespace_id,
919
0
                                             bool *delete_in_progress) {
920
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
921
0
  return data_->IsDeleteNamespaceInProgress(this, namespace_name, database_type, namespace_id,
922
0
                                            deadline, delete_in_progress);
923
0
}
924
925
YBNamespaceAlterer* YBClient::NewNamespaceAlterer(
926
0
    const string& namespace_name, const std::string& namespace_id) {
927
0
  return new YBNamespaceAlterer(this, namespace_name, namespace_id);
928
0
}
929
930
Result<vector<master::NamespaceIdentifierPB>> YBClient::ListNamespaces(
931
4.40k
    const boost::optional<YQLDatabase>& database_type) {
932
4.40k
  ListNamespacesRequestPB req;
933
4.40k
  ListNamespacesResponsePB resp;
934
4.40k
  if (database_type) {
935
0
    req.set_database_type(*database_type);
936
0
  }
937
4.40k
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListNamespaces);
938
4.40k
  auto* namespaces = resp.mutable_namespaces();
939
4.40k
  vector<master::NamespaceIdentifierPB> result;
940
4.40k
  result.reserve(namespaces->size());
941
19.4k
  for (auto& ns : *namespaces) {
942
19.4k
    result.push_back(std::move(ns));
943
19.4k
  }
944
4.40k
  return result;
945
4.40k
}
946
947
Status YBClient::GetNamespaceInfo(const std::string& namespace_id,
948
                                  const std::string& namespace_name,
949
                                  const boost::optional<YQLDatabase>& database_type,
950
1.77k
                                  master::GetNamespaceInfoResponsePB* ret) {
951
1.77k
  GetNamespaceInfoRequestPB req;
952
1.77k
  GetNamespaceInfoResponsePB resp;
953
954
1.77k
  if (!namespace_id.empty()) {
955
1.61k
    req.mutable_namespace_()->set_id(namespace_id);
956
1.61k
  }
957
1.77k
  if (!namespace_name.empty()) {
958
155
    req.mutable_namespace_()->set_name(namespace_name);
959
155
  }
960
1.77k
  if (database_type) {
961
1.77k
    req.mutable_namespace_()->set_database_type(*database_type);
962
1.77k
  }
963
964
1.77k
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, GetNamespaceInfo);
965
1.77k
  ret->Swap(&resp);
966
1.77k
  return Status::OK();
967
1.77k
}
968
969
Status YBClient::ReservePgsqlOids(const std::string& namespace_id,
970
                                  const uint32_t next_oid, const uint32_t count,
971
380
                                  uint32_t* begin_oid, uint32_t* end_oid) {
972
380
  ReservePgsqlOidsRequestPB req;
973
380
  ReservePgsqlOidsResponsePB resp;
974
380
  req.set_namespace_id(namespace_id);
975
380
  req.set_next_oid(next_oid);
976
380
  req.set_count(count);
977
380
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, ReservePgsqlOids);
978
380
  *begin_oid = resp.begin_oid();
979
380
  *end_oid = resp.end_oid();
980
380
  return Status::OK();
981
380
}
982
983
0
Status YBClient::GetYsqlCatalogMasterVersion(uint64_t *ysql_catalog_version) {
984
0
  GetYsqlCatalogConfigRequestPB req;
985
0
  GetYsqlCatalogConfigResponsePB resp;
986
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, GetYsqlCatalogConfig);
987
0
  *ysql_catalog_version = resp.version();
988
0
  return Status::OK();
989
0
}
990
991
Status YBClient::GrantRevokePermission(GrantRevokeStatementType statement_type,
992
                                       const PermissionType& permission,
993
                                       const ResourceType& resource_type,
994
                                       const std::string& canonical_resource,
995
                                       const char* resource_name,
996
                                       const char* namespace_name,
997
721
                                       const std::string& role_name) {
998
  // Setting up request.
999
721
  GrantRevokePermissionRequestPB req;
1000
721
  req.set_role_name(role_name);
1001
721
  req.set_canonical_resource(canonical_resource);
1002
721
  if (resource_name != nullptr) {
1003
520
    req.set_resource_name(resource_name);
1004
520
  }
1005
721
  if (namespace_name != nullptr) {
1006
435
    req.mutable_namespace_()->set_name(namespace_name);
1007
435
  }
1008
721
  req.set_resource_type(resource_type);
1009
721
  req.set_permission(permission);
1010
1011
721
  req.set_revoke(statement_type == GrantRevokeStatementType::REVOKE);
1012
1013
721
  GrantRevokePermissionResponsePB resp;
1014
721
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, GrantRevokePermission);
1015
721
  return Status::OK();
1016
721
}
1017
1018
Result<bool> YBClient::NamespaceExists(const std::string& namespace_name,
1019
4.37k
                                       const boost::optional<YQLDatabase>& database_type) {
1020
12.7k
  for (const auto& ns : VERIFY_RESULT(ListNamespaces(database_type))) {
1021
12.7k
    if (ns.name() == namespace_name) {
1022
4.22k
      return true;
1023
4.22k
    }
1024
12.7k
  }
1025
151
  return false;
1026
4.37k
}
1027
1028
Result<bool> YBClient::NamespaceIdExists(const std::string& namespace_id,
1029
24
                                         const boost::optional<YQLDatabase>& database_type) {
1030
199
  for (const auto& ns : VERIFY_RESULT(ListNamespaces(database_type))) {
1031
199
    if (ns.id() == namespace_id) {
1032
0
      return true;
1033
0
    }
1034
199
  }
1035
24
  return false;
1036
24
}
1037
1038
Status YBClient::CreateTablegroup(const std::string& namespace_name,
1039
                                  const std::string& namespace_id,
1040
                                  const std::string& tablegroup_id,
1041
2
                                  const std::string& tablespace_id) {
1042
2
  CreateTablegroupRequestPB req;
1043
2
  CreateTablegroupResponsePB resp;
1044
2
  req.set_id(tablegroup_id);
1045
2
  req.set_namespace_id(namespace_id);
1046
2
  req.set_namespace_name(namespace_name);
1047
1048
2
  if (!tablespace_id.empty()) {
1049
0
    req.set_tablespace_id(tablespace_id);
1050
0
  }
1051
1052
2
  int attempts = 0;
1053
2
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1054
1055
2
  Status s = data_->SyncLeaderMasterRpc(
1056
2
      deadline, req, &resp, "CreateTablegroup", &MasterDdlProxy::CreateTablegroupAsync,
1057
2
      &attempts);
1058
1059
  // This case should not happen but need to validate contents since fields are optional in PB.
1060
2
  if (!resp.has_parent_table_id() || !resp.has_parent_table_name()) {
1061
0
    return STATUS(NotFound, "Parent table information not found in CREATE TABLEGROUP response.");
1062
0
  }
1063
1064
2
  const YBTableName table_name(YQL_DATABASE_PGSQL, namespace_name, resp.parent_table_name());
1065
1066
  // Handle special cases based on resp.error().
1067
2
  if (resp.has_error()) {
1068
0
    LOG_IF(DFATAL, s.ok()) << "Expecting error status if response has error: " <<
1069
0
        resp.error().code() << " Status: " << resp.error().status().ShortDebugString();
1070
1071
0
    if (resp.error().code() == master::MasterErrorPB::OBJECT_ALREADY_PRESENT && attempts > 1) {
1072
      // If the table already exists and the number of attempts is >
1073
      // 1, then it means we may have succeeded in creating the
1074
      // table, but client didn't receive the successful
1075
      // response (e.g., due to failure before the successful
1076
      // response could be sent back, or due to a I/O pause or a
1077
      // network blip leading to a timeout, etc...)
1078
0
      YBTableInfo info;
1079
1080
      // A fix for https://yugabyte.atlassian.net/browse/ENG-529:
1081
      // If we've been retrying table creation, and the table is now in the process is being
1082
      // created, we can sometimes see an empty schema. Wait until the table is fully created
1083
      // before we compare the schema.
1084
0
      RETURN_NOT_OK_PREPEND(
1085
0
          data_->WaitForCreateTableToFinish(this, table_name, resp.parent_table_id(), deadline),
1086
0
          strings::Substitute("Failed waiting for table $0 to finish being created",
1087
0
                              table_name.ToString()));
1088
1089
0
      RETURN_NOT_OK_PREPEND(
1090
0
          data_->GetTableSchema(this, table_name, deadline, &info),
1091
0
          strings::Substitute("Unable to check the schema of table $0", table_name.ToString()));
1092
1093
0
      YBSchemaBuilder schemaBuilder;
1094
0
      schemaBuilder.AddColumn("parent_column")->Type(BINARY)->PrimaryKey()->NotNull();
1095
0
      YBSchema ybschema;
1096
0
      CHECK_OK(schemaBuilder.Build(&ybschema));
1097
1098
0
      if (!ybschema.Equals(info.schema)) {
1099
0
         string msg = Format("Table $0 already exists with a different "
1100
0
                             "schema. Requested schema was: $1, actual schema is: $2",
1101
0
                             table_name,
1102
0
                             internal::GetSchema(ybschema),
1103
0
                             internal::GetSchema(info.schema));
1104
0
        LOG(ERROR) << msg;
1105
0
        return STATUS(AlreadyPresent, msg);
1106
0
      }
1107
1108
0
      return Status::OK();
1109
0
    }
1110
1111
0
    return StatusFromPB(resp.error().status());
1112
0
  }
1113
1114
  // Wait for create table to finish.
1115
2
  RETURN_NOT_OK_PREPEND(
1116
2
      data_->WaitForCreateTableToFinish(this, table_name, resp.parent_table_id(), deadline),
1117
2
      strings::Substitute("Failed waiting for parent table $0 to finish being created",
1118
2
                          table_name.ToString()));
1119
1120
2
  return Status::OK();
1121
2
}
1122
1123
Status YBClient::DeleteTablegroup(const std::string& namespace_id,
1124
1
                                  const std::string& tablegroup_id) {
1125
1
  DeleteTablegroupRequestPB req;
1126
1
  DeleteTablegroupResponsePB resp;
1127
1
  req.set_id(tablegroup_id);
1128
1
  req.set_namespace_id(namespace_id);
1129
1130
1
  int attempts = 0;
1131
1
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1132
1133
1
  Status s = data_->SyncLeaderMasterRpc(
1134
1
      deadline, req, &resp, "DeleteTablegroup", &MasterDdlProxy::DeleteTablegroupAsync,
1135
1
      &attempts);
1136
1137
  // This case should not happen but need to validate contents since fields are optional in PB.
1138
1
  if (!resp.has_parent_table_id()) {
1139
0
    return STATUS(NotFound, "Parent table information not found in DELETE TABLEGROUP response.");
1140
0
  }
1141
1142
  // Handle special cases based on resp.error().
1143
1
  if (resp.has_error()) {
1144
0
    LOG_IF(DFATAL, s.ok()) << "Expecting error status if response has error: " <<
1145
0
        resp.error().code() << " Status: " << resp.error().status().ShortDebugString();
1146
1147
0
    if (resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND && attempts > 1) {
1148
      // A prior attempt to delete the table has succeeded, but
1149
      // appeared as a failure to the client due to, e.g., an I/O or
1150
      // network issue.
1151
0
      LOG(INFO) << "Parent table for tablegroup with ID " << tablegroup_id << " already deleted.";
1152
0
      return Status::OK();
1153
0
    } else {
1154
0
      return StatusFromPB(resp.error().status());
1155
0
    }
1156
1
  } else {
1157
    // Check the status only if the response has no error.
1158
1
    RETURN_NOT_OK(s);
1159
1
  }
1160
1161
  // Spin until the table is deleted. Currently only waits till the table reaches DELETING state
1162
  // See github issue #5290
1163
1
  RETURN_NOT_OK_PREPEND(data_->WaitForDeleteTableToFinish(this,
1164
1
                                                          resp.parent_table_id(),
1165
1
                                                          deadline),
1166
1
      strings::Substitute("Failed waiting for parent table with id $0 to finish being deleted",
1167
1
                          resp.parent_table_id()));
1168
1169
1
  LOG(INFO) << "Deleted parent table for tablegroup with ID " << tablegroup_id;
1170
1
  return Status::OK();
1171
1
}
1172
1173
Result<vector<master::TablegroupIdentifierPB>>
1174
1
YBClient::ListTablegroups(const std::string& namespace_name) {
1175
1
  GetNamespaceInfoResponsePB ret;
1176
1
  Status s = GetNamespaceInfo("", namespace_name, YQL_DATABASE_PGSQL, &ret);
1177
1
  if (!s.ok()) {
1178
0
    return s;
1179
0
  }
1180
1181
1
  ListTablegroupsRequestPB req;
1182
1
  ListTablegroupsResponsePB resp;
1183
1184
1
  req.set_namespace_id(ret.namespace_().id());
1185
1
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListTablegroups);
1186
1
  auto* tablegroups = resp.mutable_tablegroups();
1187
1
  vector<master::TablegroupIdentifierPB> result;
1188
1
  result.reserve(tablegroups->size());
1189
1
  for (auto& tg : *tablegroups) {
1190
1
    result.push_back(std::move(tg));
1191
1
  }
1192
1
  return result;
1193
1
}
1194
1195
Result<bool> YBClient::TablegroupExists(const std::string& namespace_name,
1196
1
                                        const std::string& tablegroup_id) {
1197
1198
1
  for (const auto& tg : VERIFY_RESULT(ListTablegroups(namespace_name))) {
1199
1
    if (tg.id().compare(tablegroup_id) == 0) {
1200
1
      return true;
1201
1
    }
1202
1
  }
1203
0
  return false;
1204
1
}
1205
1206
Status YBClient::GetUDType(const std::string& namespace_name,
1207
                           const std::string& type_name,
1208
55
                           std::shared_ptr<QLType>* ql_type) {
1209
  // Setting up request.
1210
55
  GetUDTypeInfoRequestPB req;
1211
55
  req.mutable_type()->mutable_namespace_()->set_name(namespace_name);
1212
55
  req.mutable_type()->set_type_name(type_name);
1213
1214
  // Sending request.
1215
55
  GetUDTypeInfoResponsePB resp;
1216
55
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, GetUDTypeInfo);
1217
1218
  // Filling in return values.
1219
48
  std::vector<string> field_names;
1220
92
  for (const auto& field_name : resp.udtype().field_names()) {
1221
92
    field_names.push_back(field_name);
1222
92
  }
1223
1224
48
  std::vector<shared_ptr<QLType>> field_types;
1225
92
  for (const auto& field_type : resp.udtype().field_types()) {
1226
92
    field_types.push_back(QLType::FromQLTypePB(field_type));
1227
92
  }
1228
1229
48
  (*ql_type)->SetUDTypeFields(resp.udtype().id(), field_names, field_types);
1230
1231
48
  return Status::OK();
1232
55
}
1233
1234
Status YBClient::CreateRole(const RoleName& role_name,
1235
                            const std::string& salted_hash,
1236
                            const bool login, const bool superuser,
1237
757
                            const RoleName& creator_role_name) {
1238
1239
  // Setting up request.
1240
757
  CreateRoleRequestPB req;
1241
757
  req.set_salted_hash(salted_hash);
1242
757
  req.set_name(role_name);
1243
757
  req.set_login(login);
1244
757
  req.set_superuser(superuser);
1245
1246
757
  if (!creator_role_name.empty()) {
1247
757
    req.set_creator_role_name(creator_role_name);
1248
757
  }
1249
1250
757
  CreateRoleResponsePB resp;
1251
757
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, CreateRole);
1252
751
  return Status::OK();
1253
757
}
1254
1255
Status YBClient::AlterRole(const RoleName& role_name,
1256
                           const boost::optional<std::string>& salted_hash,
1257
                           const boost::optional<bool> login,
1258
                           const boost::optional<bool> superuser,
1259
58
                           const RoleName& current_role_name) {
1260
  // Setting up request.
1261
58
  AlterRoleRequestPB req;
1262
58
  req.set_name(role_name);
1263
58
  if (salted_hash) {
1264
9
    req.set_salted_hash(*salted_hash);
1265
9
  }
1266
58
  if (login) {
1267
13
    req.set_login(*login);
1268
13
  }
1269
58
  if (superuser) {
1270
46
    req.set_superuser(*superuser);
1271
46
  }
1272
58
  req.set_current_role(current_role_name);
1273
1274
58
  AlterRoleResponsePB resp;
1275
58
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, AlterRole);
1276
45
  return Status::OK();
1277
58
}
1278
1279
Status YBClient::DeleteRole(const std::string& role_name,
1280
730
                            const std::string& current_role_name) {
1281
  // Setting up request.
1282
730
  DeleteRoleRequestPB req;
1283
730
  req.set_name(role_name);
1284
730
  req.set_current_role(current_role_name);
1285
1286
730
  DeleteRoleResponsePB resp;
1287
730
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, DeleteRole);
1288
724
  return Status::OK();
1289
730
}
1290
1291
static const string kRequirePass = "requirepass";
1292
0
Status YBClient::SetRedisPasswords(const std::vector<string>& passwords) {
1293
  // TODO: Store hash instead of the password?
1294
0
  return SetRedisConfig(kRequirePass, passwords);
1295
0
}
1296
1297
291
Status YBClient::GetRedisPasswords(vector<string>* passwords) {
1298
291
  Status s = GetRedisConfig(kRequirePass, passwords);
1299
291
  if (s.IsNotFound()) {
1300
    // If the redis config has no kRequirePass key.
1301
291
    passwords->clear();
1302
291
    s = Status::OK();
1303
291
  }
1304
291
  return s;
1305
291
}
1306
1307
0
Status YBClient::SetRedisConfig(const string& key, const vector<string>& values) {
1308
  // Setting up request.
1309
0
  RedisConfigSetRequestPB req;
1310
0
  req.set_keyword(key);
1311
0
  for (const auto& value : values) {
1312
0
    req.add_args(value);
1313
0
  }
1314
0
  RedisConfigSetResponsePB resp;
1315
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, RedisConfigSet);
1316
0
  return Status::OK();
1317
0
}
1318
1319
291
Status YBClient::GetRedisConfig(const string& key, vector<string>* values) {
1320
  // Setting up request.
1321
291
  RedisConfigGetRequestPB req;
1322
291
  RedisConfigGetResponsePB resp;
1323
291
  req.set_keyword(key);
1324
291
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, RedisConfigGet);
1325
0
  values->clear();
1326
0
  for (const auto& arg : resp.args())
1327
0
    values->push_back(arg);
1328
0
  return Status::OK();
1329
291
}
1330
1331
Status YBClient::GrantRevokeRole(GrantRevokeStatementType statement_type,
1332
                                 const std::string& granted_role_name,
1333
52
                                 const std::string& recipient_role_name) {
1334
  // Setting up request.
1335
52
  GrantRevokeRoleRequestPB req;
1336
52
  req.set_revoke(statement_type == GrantRevokeStatementType::REVOKE);
1337
52
  req.set_granted_role(granted_role_name);
1338
52
  req.set_recipient_role(recipient_role_name);
1339
1340
52
  GrantRevokeRoleResponsePB resp;
1341
52
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, GrantRevokeRole);
1342
49
  return Status::OK();
1343
52
}
1344
1345
118k
Status YBClient::GetPermissions(client::internal::PermissionsCache* permissions_cache) {
1346
118k
  if (!permissions_cache) {
1347
0
    DFATAL_OR_RETURN_NOT_OK(STATUS(InvalidArgument, "Invalid null permissions_cache"));
1348
0
  }
1349
1350
118k
  boost::optional<uint64_t> version = permissions_cache->version();
1351
1352
  // Setting up request.
1353
118k
  GetPermissionsRequestPB req;
1354
118k
  if (version) {
1355
116k
    req.set_if_version_greater_than(*version);
1356
116k
  }
1357
1358
118k
  GetPermissionsResponsePB resp;
1359
118k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, GetPermissions);
1360
1361
498
  VLOG(1) << "Got permissions cache: " << resp.ShortDebugString();
1362
1363
  // The first request is a special case. We always replace the cache since we don't have anything.
1364
118k
  if (!version) {
1365
    // We should at least receive cassandra's permissions.
1366
1.45k
    if (resp.role_permissions_size() == 0) {
1367
0
      DFATAL_OR_RETURN_NOT_OK(
1368
0
          STATUS(IllegalState, "Received invalid empty permissions cache from master"));
1369
1370
0
    }
1371
116k
  } else if (resp.version() == *version) {
1372
      // No roles should have been received if both versions match.
1373
108k
      if (resp.role_permissions_size() != 0) {
1374
0
        DFATAL_OR_RETURN_NOT_OK(STATUS(IllegalState,
1375
0
            "Received permissions cache when none was expected because the master's "
1376
0
            "permissions versions is equal to the client's version"));
1377
0
      }
1378
      // Nothing to update.
1379
108k
      return Status::OK();
1380
8.49k
  } else if (resp.version() < *version) {
1381
    // If the versions don't match, then the master's version has to be greater than ours.
1382
0
    DFATAL_OR_RETURN_NOT_OK(STATUS_SUBSTITUTE(IllegalState,
1383
0
        "Client's permissions version $0 can't be greater than the master's permissions version $1",
1384
0
        *version, resp.version()));
1385
0
  }
1386
1387
9.94k
  permissions_cache->UpdateRolesPermissions(resp);
1388
9.94k
  return Status::OK();
1389
118k
}
1390
1391
Status YBClient::CreateUDType(const std::string& namespace_name,
1392
                              const std::string& type_name,
1393
                              const std::vector<std::string>& field_names,
1394
46
                              const std::vector<std::shared_ptr<QLType>>& field_types) {
1395
  // Setting up request.
1396
46
  CreateUDTypeRequestPB req;
1397
46
  req.mutable_namespace_()->set_name(namespace_name);
1398
46
  req.set_name(type_name);
1399
86
  for (const string& field_name : field_names) {
1400
86
    req.add_field_names(field_name);
1401
86
  }
1402
86
  for (const std::shared_ptr<QLType>& field_type : field_types) {
1403
86
    field_type->ToQLTypePB(req.add_field_types());
1404
86
  }
1405
1406
46
  CreateUDTypeResponsePB resp;
1407
46
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, CreateUDType);
1408
45
  return Status::OK();
1409
46
}
1410
1411
Status YBClient::DeleteUDType(const std::string& namespace_name,
1412
53
                              const std::string& type_name) {
1413
  // Setting up request.
1414
53
  DeleteUDTypeRequestPB req;
1415
53
  req.mutable_type()->mutable_namespace_()->set_name(namespace_name);
1416
53
  req.mutable_type()->set_type_name(type_name);
1417
1418
53
  DeleteUDTypeResponsePB resp;
1419
53
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, DeleteUDType);
1420
45
  return Status::OK();
1421
53
}
1422
1423
Result<CDCStreamId> YBClient::CreateCDCStream(
1424
    const TableId& table_id,
1425
    const std::unordered_map<std::string, std::string>& options,
1426
    bool active,
1427
2.69k
    const CDCStreamId& db_stream_id) {
1428
  // Setting up request.
1429
2.69k
  CreateCDCStreamRequestPB req;
1430
2.69k
  req.set_table_id(table_id);
1431
2.69k
  if (!db_stream_id.empty()) {
1432
2.53k
    req.set_db_stream_id(db_stream_id);
1433
2.53k
  }
1434
2.69k
  req.mutable_options()->Reserve(narrow_cast<int>(options.size()));
1435
10.9k
  for (const auto& option : options) {
1436
10.9k
    auto new_option = req.add_options();
1437
10.9k
    new_option->set_key(option.first);
1438
10.9k
    new_option->set_value(option.second);
1439
10.9k
  }
1440
2.69k
  req.set_initial_state(active ? master::SysCDCStreamEntryPB::ACTIVE
1441
0
                               : master::SysCDCStreamEntryPB::INITIATED);
1442
1443
2.69k
  CreateCDCStreamResponsePB resp;
1444
2.69k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, CreateCDCStream);
1445
2.69k
  return resp.stream_id();
1446
2.69k
}
1447
1448
void YBClient::CreateCDCStream(const TableId& table_id,
1449
                               const std::unordered_map<std::string, std::string>& options,
1450
0
                               CreateCDCStreamCallback callback) {
1451
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1452
0
  data_->CreateCDCStream(this, table_id, options, deadline, callback);
1453
0
}
1454
1455
Status YBClient::GetCDCStream(const CDCStreamId& stream_id,
1456
                              NamespaceId* ns_id,
1457
                              std::vector<ObjectId>* object_ids,
1458
1
                              std::unordered_map<std::string, std::string>* options) {
1459
  // Setting up request.
1460
1
  GetCDCStreamRequestPB req;
1461
1
  req.set_stream_id(stream_id);
1462
1463
  // Sending request.
1464
1
  GetCDCStreamResponsePB resp;
1465
1
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, GetCDCStream);
1466
1467
  // Filling in return values.
1468
0
  if (resp.stream().has_namespace_id()) {
1469
0
    *ns_id = resp.stream().namespace_id();
1470
0
  }
1471
1472
0
  for (auto id : resp.stream().table_id()) {
1473
0
    object_ids->push_back(id);
1474
0
  }
1475
1476
0
  options->clear();
1477
0
  options->reserve(resp.stream().options_size());
1478
0
  for (const auto& option : resp.stream().options()) {
1479
0
    options->emplace(option.key(), option.value());
1480
0
  }
1481
1482
0
  if (!resp.stream().has_namespace_id()) {
1483
0
    options->emplace(cdc::kIdType, cdc::kTableId);
1484
0
  }
1485
1486
0
  return Status::OK();
1487
1
}
1488
1489
void YBClient::GetCDCStream(const CDCStreamId& stream_id,
1490
                            std::shared_ptr<TableId> table_id,
1491
                            std::shared_ptr<std::unordered_map<std::string, std::string>> options,
1492
0
                            StdStatusCallback callback) {
1493
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1494
0
  data_->GetCDCStream(this, stream_id, table_id, options, deadline, callback);
1495
0
}
1496
1497
Status YBClient::DeleteCDCStream(const vector<CDCStreamId>& streams,
1498
                                 bool force_delete,
1499
                                 bool ignore_errors,
1500
0
                                 master::DeleteCDCStreamResponsePB* ret) {
1501
0
  if (streams.empty()) {
1502
0
    return STATUS(InvalidArgument, "At least one stream id should be provided");
1503
0
  }
1504
1505
  // Setting up request.
1506
0
  DeleteCDCStreamRequestPB req;
1507
0
  req.mutable_stream_id()->Reserve(narrow_cast<int>(streams.size()));
1508
0
  for (const auto& stream : streams) {
1509
0
    req.add_stream_id(stream);
1510
0
  }
1511
0
  req.set_force_delete(force_delete);
1512
0
  req.set_ignore_errors(ignore_errors);
1513
1514
0
  if (ret) {
1515
0
    CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, (*ret), DeleteCDCStream);
1516
0
  } else {
1517
0
    DeleteCDCStreamResponsePB resp;
1518
0
    CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, DeleteCDCStream);
1519
0
  }
1520
1521
0
  return Status::OK();
1522
0
}
1523
1524
Status YBClient::DeleteCDCStream(const CDCStreamId& stream_id, bool force_delete,
1525
0
                                 bool ignore_errors) {
1526
  // Setting up request.
1527
0
  DeleteCDCStreamRequestPB req;
1528
0
  req.add_stream_id(stream_id);
1529
0
  req.set_force_delete(force_delete);
1530
0
  req.set_ignore_errors(ignore_errors);
1531
1532
0
  DeleteCDCStreamResponsePB resp;
1533
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, DeleteCDCStream);
1534
0
  return Status::OK();
1535
0
}
1536
1537
0
void YBClient::DeleteCDCStream(const CDCStreamId& stream_id, StatusCallback callback) {
1538
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1539
0
  data_->DeleteCDCStream(this, stream_id, deadline, callback);
1540
0
}
1541
1542
CHECKED_STATUS YBClient::GetCDCDBStreamInfo(
1543
  const std::string &db_stream_id,
1544
0
  std::vector<pair<std::string, std::string>>* db_stream_info) {
1545
  // Setting up request.
1546
0
  GetCDCDBStreamInfoRequestPB req;
1547
0
  req.set_db_stream_id(db_stream_id);
1548
1549
0
  GetCDCDBStreamInfoResponsePB resp;
1550
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, GetCDCDBStreamInfo);
1551
0
  db_stream_info->clear();
1552
0
  db_stream_info->reserve(resp.table_info_size());
1553
0
  for (const auto& tabinfo : resp.table_info()) {
1554
0
    std::string stream_id = tabinfo.stream_id();
1555
0
    std::string table_id = tabinfo.table_id();
1556
1557
0
    db_stream_info->push_back(std::make_pair(stream_id, table_id));
1558
0
  }
1559
1560
0
  return Status::OK();
1561
0
}
1562
1563
void YBClient::GetCDCDBStreamInfo(
1564
    const std::string& db_stream_id,
1565
    const std::shared_ptr<std::vector<pair<std::string, std::string>>>& db_stream_info,
1566
0
    const StdStatusCallback& callback) {
1567
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1568
0
  data_->GetCDCDBStreamInfo(this, db_stream_id, db_stream_info, deadline, callback);
1569
0
}
1570
1571
Status YBClient::UpdateCDCStream(const CDCStreamId& stream_id,
1572
0
                                 const master::SysCDCStreamEntryPB& new_entry) {
1573
0
  if (stream_id.empty()) {
1574
0
    return STATUS(InvalidArgument, "Stream id is required.");
1575
0
  }
1576
1577
  // Setting up request.
1578
0
  UpdateCDCStreamRequestPB req;
1579
0
  req.set_stream_id(stream_id);
1580
0
  req.mutable_entry()->CopyFrom(new_entry);
1581
1582
0
  UpdateCDCStreamResponsePB resp;
1583
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, UpdateCDCStream);
1584
0
  return Status::OK();
1585
0
}
1586
1587
Status YBClient::UpdateConsumerOnProducerSplit(
1588
    const string& producer_id,
1589
    const CDCStreamId& stream_id,
1590
0
    const master::ProducerSplitTabletInfoPB& split_info) {
1591
0
  if (producer_id.empty()) {
1592
0
    return STATUS(InvalidArgument, "Producer id is required.");
1593
0
  }
1594
0
  if (stream_id.empty()) {
1595
0
    return STATUS(InvalidArgument, "Stream id is required.");
1596
0
  }
1597
1598
0
  UpdateConsumerOnProducerSplitRequestPB req;
1599
0
  req.set_producer_id(producer_id);
1600
0
  req.set_stream_id(stream_id);
1601
0
  req.mutable_producer_split_tablet_info()->CopyFrom(split_info);
1602
1603
0
  UpdateConsumerOnProducerSplitResponsePB resp;
1604
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, UpdateConsumerOnProducerSplit);
1605
0
  return Status::OK();
1606
0
}
1607
1608
8
void YBClient::DeleteNotServingTablet(const TabletId& tablet_id, StdStatusCallback callback) {
1609
8
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1610
8
  data_->DeleteNotServingTablet(this, tablet_id, deadline, callback);
1611
8
}
1612
1613
void YBClient::GetTableLocations(
1614
    const TableId& table_id, int32_t max_tablets, RequireTabletsRunning require_tablets_running,
1615
85.5k
    GetTableLocationsCallback callback) {
1616
85.5k
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1617
85.5k
  data_->GetTableLocations(
1618
85.5k
      this, table_id, max_tablets, require_tablets_running, deadline, std::move(callback));
1619
85.5k
}
1620
1621
2.91k
Status YBClient::TabletServerCount(int *tserver_count, bool primary_only, bool use_cache) {
1622
2.91k
  int tserver_count_cached = data_->tserver_count_cached_[primary_only].load(
1623
2.91k
      std::memory_order_acquire);
1624
2.91k
  if (use_cache && tserver_count_cached > 0) {
1625
54
    *tserver_count = tserver_count_cached;
1626
54
    return Status::OK();
1627
54
  }
1628
1629
2.86k
  ListTabletServersRequestPB req;
1630
2.86k
  ListTabletServersResponsePB resp;
1631
2.86k
  req.set_primary_only(primary_only);
1632
2.86k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, ListTabletServers);
1633
2.86k
  data_->tserver_count_cached_[primary_only].store(resp.servers_size(), std::memory_order_release);
1634
2.86k
  *tserver_count = resp.servers_size();
1635
2.86k
  return Status::OK();
1636
2.86k
}
1637
1638
2
Result<std::vector<YBTabletServer>> YBClient::ListTabletServers() {
1639
2
  ListTabletServersRequestPB req;
1640
2
  ListTabletServersResponsePB resp;
1641
2
  std::vector<YBTabletServer> result;
1642
2
  CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, ListTabletServers);
1643
2
  result.reserve(resp.servers_size());
1644
8
  for (int i = 0; i < resp.servers_size(); i++) {
1645
6
    const ListTabletServersResponsePB_Entry& e = resp.servers(i);
1646
6
    result.push_back(YBTabletServer::FromPB(e, data_->cloud_info_pb_));
1647
6
  }
1648
2
  return result;
1649
2
}
1650
1651
2
Result<TabletServersInfo> YBClient::ListLiveTabletServers(bool primary_only) {
1652
2
  ListLiveTabletServersRequestPB req;
1653
2
  if (primary_only) {
1654
0
    req.set_primary_only(true);
1655
0
  }
1656
2
  ListLiveTabletServersResponsePB resp;
1657
2
  CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, ListLiveTabletServers);
1658
1659
2
  TabletServersInfo result;
1660
2
  result.resize(resp.servers_size());
1661
8
  for (int i = 0; i < resp.servers_size(); i++) {
1662
6
    const ListLiveTabletServersResponsePB_Entry& entry = resp.servers(i);
1663
6
    auto& out = result[i];
1664
6
    out.server = YBTabletServer::FromPB(entry, data_->cloud_info_pb_);
1665
6
    const CloudInfoPB& cloud_info = entry.registration().common().cloud_info();
1666
1667
6
    const auto& private_addresses = entry.registration().common().private_rpc_addresses();
1668
6
    if (!private_addresses.empty()) {
1669
6
      out.server.hostname = private_addresses.Get(0).host();
1670
6
    }
1671
1672
6
    const auto& broadcast_addresses = entry.registration().common().broadcast_addresses();
1673
6
    if (!broadcast_addresses.empty()) {
1674
0
      out.public_ip = broadcast_addresses.Get(0).host();
1675
0
    }
1676
1677
6
    out.is_primary = !entry.isfromreadreplica();
1678
6
    if (cloud_info.has_placement_cloud()) {
1679
6
      out.cloud = cloud_info.placement_cloud();
1680
6
      if (cloud_info.has_placement_region()) {
1681
6
        out.region = cloud_info.placement_region();
1682
6
      }
1683
6
      if (cloud_info.has_placement_zone()) {
1684
6
        out.zone = cloud_info.placement_zone();
1685
6
      }
1686
6
    }
1687
6
    out.pg_port = entry.registration().common().pg_port();
1688
6
  }
1689
1690
2
  return result;
1691
2
}
1692
1693
void YBClient::SetLocalTabletServer(const string& ts_uuid,
1694
                                    const shared_ptr<tserver::TabletServerServiceProxy>& proxy,
1695
5.48k
                                    const tserver::LocalTabletServer* local_tserver) {
1696
5.48k
  data_->meta_cache_->SetLocalTabletServer(ts_uuid, proxy, local_tserver);
1697
5.48k
}
1698
1699
0
internal::RemoteTabletServer* YBClient::GetLocalTabletServer() {
1700
0
  return data_->meta_cache_->local_tserver();
1701
0
}
1702
1703
void YBClient::SetNodeLocalForwardProxy(
1704
0
  const shared_ptr<tserver::TabletServerForwardServiceProxy>& proxy) {
1705
0
  data_->node_local_forward_proxy_ = proxy;
1706
0
}
1707
1708
0
std::shared_ptr<tserver::TabletServerForwardServiceProxy>& YBClient::GetNodeLocalForwardProxy() {
1709
0
  return data_->node_local_forward_proxy_;
1710
0
}
1711
1712
0
void YBClient::SetNodeLocalTServerHostPort(const ::yb::HostPort& hostport) {
1713
0
  data_->node_local_tserver_host_port_ = hostport;
1714
0
}
1715
1716
0
const ::yb::HostPort& YBClient::GetNodeLocalTServerHostPort() {
1717
0
  return data_->node_local_tserver_host_port_;
1718
0
}
1719
1720
178
Result<bool> YBClient::IsLoadBalanced(uint32_t num_servers) {
1721
178
  IsLoadBalancedRequestPB req;
1722
178
  IsLoadBalancedResponsePB resp;
1723
1724
178
  req.set_expected_num_servers(num_servers);
1725
  // Cannot use CALL_SYNC_LEADER_MASTER_RPC directly since this is substituted with RETURN_NOT_OK
1726
  // and we want to capture the status to check if load is balanced.
1727
178
  Status s = [&, this]() -> Status {
1728
178
    CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, IsLoadBalanced);
1729
2
    return Status::OK();
1730
178
  }();
1731
178
  return s.ok();
1732
178
}
1733
1734
2.27k
Result<bool> YBClient::IsLoadBalancerIdle() {
1735
2.27k
  IsLoadBalancerIdleRequestPB req;
1736
2.27k
  IsLoadBalancerIdleResponsePB resp;
1737
1738
2.27k
  Status s = [&]() -> Status {
1739
2.27k
    CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, IsLoadBalancerIdle);
1740
539
    return Status::OK();
1741
2.27k
  }();
1742
1743
2.27k
  if (s.ok()) {
1744
539
    return true;
1745
1.73k
  } else if (master::MasterError(s) == master::MasterErrorPB::LOAD_BALANCER_RECENTLY_ACTIVE) {
1746
1.73k
    return false;
1747
0
  } else {
1748
0
    return s;
1749
0
  }
1750
2.27k
}
1751
1752
Status YBClient::ModifyTablePlacementInfo(const YBTableName& table_name,
1753
4
                                          master::PlacementInfoPB* replicas) {
1754
4
  master::ReplicationInfoPB replication_info;
1755
  // Merge the obtained info with the existing table replication info.
1756
4
  std::shared_ptr<client::YBTable> table;
1757
4
  RETURN_NOT_OK_PREPEND(OpenTable(table_name, &table), "Fetching table schema failed!");
1758
1759
  // If it does not exist, fetch the cluster replication info.
1760
4
  if (!table->replication_info()) {
1761
2
    GetMasterClusterConfigRequestPB req;
1762
2
    GetMasterClusterConfigResponsePB resp;
1763
2
    CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, GetMasterClusterConfig);
1764
2
    master::SysClusterConfigEntryPB* sys_cluster_config_entry = resp.mutable_cluster_config();
1765
2
    replication_info.CopyFrom(sys_cluster_config_entry->replication_info());
1766
    // TODO(bogdan): Figure out how to handle read replias and leader affinity.
1767
2
    replication_info.clear_read_replicas();
1768
2
    replication_info.clear_affinitized_leaders();
1769
2
  } else {
1770
    // Table replication info exists, copy it over.
1771
2
    replication_info.CopyFrom(table->replication_info().get());
1772
2
  }
1773
1774
  // Put in the new live placement info.
1775
4
  replication_info.set_allocated_live_replicas(replicas);
1776
1777
4
  std::unique_ptr<yb::client::YBTableAlterer> table_alterer(NewTableAlterer(table_name));
1778
4
  return table_alterer->replication_info(replication_info)->Alter();
1779
4
}
1780
1781
0
Status YBClient::CreateTransactionsStatusTable(const string& table_name) {
1782
0
  if (table_name.rfind(kTransactionTablePrefix, 0) != 0) {
1783
0
    return STATUS_FORMAT(
1784
0
        InvalidArgument, "Name '$0' for transaction table does not start with '$1'", table_name,
1785
0
        kTransactionTablePrefix);
1786
0
  }
1787
0
  master::CreateTransactionStatusTableRequestPB req;
1788
0
  master::CreateTransactionStatusTableResponsePB resp;
1789
0
  req.set_table_name(table_name);
1790
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Admin, req, resp, CreateTransactionStatusTable);
1791
0
  return Status::OK();
1792
0
}
1793
1794
Status YBClient::GetTabletsFromTableId(const string& table_id,
1795
                                       const int32_t max_tablets,
1796
2.54k
                                       RepeatedPtrField<TabletLocationsPB>* tablets) {
1797
2.54k
  GetTableLocationsRequestPB req;
1798
2.54k
  GetTableLocationsResponsePB resp;
1799
2.54k
  req.mutable_table()->set_table_id(table_id);
1800
1801
2.54k
  if (max_tablets == 0) {
1802
2.54k
    req.set_max_returned_locations(std::numeric_limits<int32_t>::max());
1803
0
  } else if (max_tablets > 0) {
1804
0
    req.set_max_returned_locations(max_tablets);
1805
0
  }
1806
2.54k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, GetTableLocations);
1807
2.54k
  *tablets = resp.tablet_locations();
1808
2.54k
  return Status::OK();
1809
2.54k
}
1810
1811
Status YBClient::GetTablets(const YBTableName& table_name,
1812
                            const int32_t max_tablets,
1813
                            RepeatedPtrField<TabletLocationsPB>* tablets,
1814
                            PartitionListVersion* partition_list_version,
1815
                            const RequireTabletsRunning require_tablets_running,
1816
268
                            const master::IncludeInactive include_inactive) {
1817
268
  GetTableLocationsRequestPB req;
1818
268
  GetTableLocationsResponsePB resp;
1819
268
  if (table_name.has_table()) {
1820
268
    table_name.SetIntoTableIdentifierPB(req.mutable_table());
1821
0
  } else if (table_name.has_table_id()) {
1822
0
    req.mutable_table()->set_table_id(table_name.table_id());
1823
0
  }
1824
1825
268
  if (max_tablets == 0) {
1826
264
    req.set_max_returned_locations(std::numeric_limits<int32_t>::max());
1827
4
  } else if (max_tablets > 0) {
1828
0
    req.set_max_returned_locations(max_tablets);
1829
0
  }
1830
268
  req.set_require_tablets_running(require_tablets_running);
1831
268
  req.set_include_inactive(include_inactive);
1832
268
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, GetTableLocations);
1833
268
  *tablets = resp.tablet_locations();
1834
268
  if (partition_list_version) {
1835
57
    *partition_list_version = resp.partition_list_version();
1836
57
  }
1837
268
  return Status::OK();
1838
268
}
1839
1840
Status YBClient::GetTabletLocation(const TabletId& tablet_id,
1841
0
                                   master::TabletLocationsPB* tablet_location) {
1842
0
  GetTabletLocationsRequestPB req;
1843
0
  GetTabletLocationsResponsePB resp;
1844
0
  req.add_tablet_ids(tablet_id);
1845
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, GetTabletLocations);
1846
1847
0
  if (resp.tablet_locations_size() != 1) {
1848
0
    return STATUS_SUBSTITUTE(IllegalState, "Expected single tablet for $0, received $1",
1849
0
                             tablet_id, resp.tablet_locations_size());
1850
0
  }
1851
1852
0
  *tablet_location = resp.tablet_locations(0);
1853
0
  return Status::OK();
1854
0
}
1855
1856
Result<TransactionStatusTablets> YBClient::GetTransactionStatusTablets(
1857
2.01k
    const CloudInfoPB& placement) {
1858
2.01k
  GetTransactionStatusTabletsRequestPB req;
1859
2.01k
  GetTransactionStatusTabletsResponsePB resp;
1860
1861
2.01k
  req.mutable_placement()->CopyFrom(placement);
1862
1863
2.01k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, GetTransactionStatusTablets);
1864
1865
2.01k
  TransactionStatusTablets tablets;
1866
1867
2.01k
  MoveCollection(&resp.global_tablet_id(), &tablets.global_tablets);
1868
2.01k
  MoveCollection(&resp.placement_local_tablet_id(), &tablets.placement_local_tablets);
1869
1870
2.01k
  return tablets;
1871
2.01k
}
1872
1873
Status YBClient::GetTablets(const YBTableName& table_name,
1874
                            const int32_t max_tablets,
1875
                            vector<TabletId>* tablet_uuids,
1876
                            vector<string>* ranges,
1877
                            std::vector<master::TabletLocationsPB>* locations,
1878
                            const RequireTabletsRunning require_tablets_running,
1879
15
                            master::IncludeInactive include_inactive) {
1880
15
  RepeatedPtrField<TabletLocationsPB> tablets;
1881
15
  RETURN_NOT_OK(GetTablets(
1882
15
      table_name, max_tablets, &tablets, /* partition_list_version =*/ nullptr,
1883
15
      require_tablets_running, include_inactive));
1884
15
  FillFromRepeatedTabletLocations(tablets, tablet_uuids, ranges, locations);
1885
15
  return Status::OK();
1886
15
}
1887
1888
Status YBClient::GetTabletsAndUpdateCache(
1889
    const YBTableName& table_name,
1890
    const int32_t max_tablets,
1891
    vector<TabletId>* tablet_uuids,
1892
    vector<string>* ranges,
1893
57
    std::vector<master::TabletLocationsPB>* locations) {
1894
57
  RepeatedPtrField<TabletLocationsPB> tablets;
1895
57
  PartitionListVersion partition_list_version;
1896
57
  RETURN_NOT_OK(GetTablets(
1897
57
      table_name, max_tablets, &tablets, &partition_list_version, RequireTabletsRunning::kFalse));
1898
57
  FillFromRepeatedTabletLocations(tablets, tablet_uuids, ranges, locations);
1899
1900
57
  RETURN_NOT_OK(data_->meta_cache_->ProcessTabletLocations(
1901
57
      tablets, partition_list_version, /* lookup_rpc = */ nullptr));
1902
1903
57
  return Status::OK();
1904
57
}
1905
1906
7.59M
rpc::Messenger* YBClient::messenger() const {
1907
7.59M
  return data_->messenger_;
1908
7.59M
}
1909
1910
132k
const scoped_refptr<MetricEntity>& YBClient::metric_entity() const {
1911
132k
  return data_->metric_entity_;
1912
132k
}
1913
1914
7.24M
rpc::ProxyCache& YBClient::proxy_cache() const {
1915
7.24M
  return *data_->proxy_cache_;
1916
7.24M
}
1917
1918
5.76M
ThreadPool *YBClient::callback_threadpool() {
1919
5.76M
  return data_->use_threadpool_for_callbacks_ ? data_->threadpool_.get() : nullptr;
1920
5.76M
}
1921
1922
4.68M
const std::string& YBClient::proxy_uuid() const {
1923
4.68M
  return data_->uuid_;
1924
4.68M
}
1925
1926
1.32M
const ClientId& YBClient::id() const {
1927
1.32M
  return data_->id_;
1928
1.32M
}
1929
1930
159
const CloudInfoPB& YBClient::cloud_info() const {
1931
159
  return data_->cloud_info_pb_;
1932
159
}
1933
1934
std::pair<RetryableRequestId, RetryableRequestId> YBClient::NextRequestIdAndMinRunningRequestId(
1935
1.32M
    const TabletId& tablet_id) {
1936
1.32M
  std::lock_guard<simple_spinlock> lock(data_->tablet_requests_mutex_);
1937
1.32M
  auto& tablet = data_->tablet_requests_[tablet_id];
1938
1.32M
  if (tablet.request_id_seq == kInitializeFromMinRunning) {
1939
0
    return std::make_pair(kInitializeFromMinRunning, kInitializeFromMinRunning);
1940
0
  }
1941
1.32M
  auto id = tablet.request_id_seq++;
1942
1.32M
  tablet.running_requests.insert(id);
1943
1.32M
  return std::make_pair(id, *tablet.running_requests.begin());
1944
1.32M
}
1945
1946
1.32M
void YBClient::RequestFinished(const TabletId& tablet_id, RetryableRequestId request_id) {
1947
1.32M
  if (request_id == kInitializeFromMinRunning) {
1948
0
    return;
1949
0
  }
1950
1.32M
  std::lock_guard<simple_spinlock> lock(data_->tablet_requests_mutex_);
1951
1.32M
  auto& tablet = data_->tablet_requests_[tablet_id];
1952
1.32M
  auto it = tablet.running_requests.find(request_id);
1953
1.32M
  if (it != tablet.running_requests.end()) {
1954
1.32M
    tablet.running_requests.erase(it);
1955
18.4E
  } else {
1956
18.4E
    LOG(DFATAL) << "RequestFinished called for an unknown request: "
1957
18.4E
                << tablet_id << ", " << request_id;
1958
18.4E
  }
1959
1.32M
}
1960
1961
void YBClient::MaybeUpdateMinRunningRequestId(
1962
0
    const TabletId& tablet_id, RetryableRequestId min_running_request_id) {
1963
0
  std::lock_guard<simple_spinlock> lock(data_->tablet_requests_mutex_);
1964
0
  auto& tablet = data_->tablet_requests_[tablet_id];
1965
0
  if (tablet.request_id_seq == kInitializeFromMinRunning) {
1966
0
    tablet.request_id_seq = min_running_request_id + (1 << 24);
1967
0
    VLOG(1) << "Set request_id_seq for tablet " << tablet_id << " to " << tablet.request_id_seq;
1968
0
  }
1969
0
}
1970
1971
void YBClient::LookupTabletByKey(const std::shared_ptr<YBTable>& table,
1972
                                 const std::string& partition_key,
1973
                                 CoarseTimePoint deadline,
1974
104k
                                 LookupTabletCallback callback) {
1975
104k
  data_->meta_cache_->LookupTabletByKey(table, partition_key, deadline, std::move(callback));
1976
104k
}
1977
1978
void YBClient::LookupTabletById(const std::string& tablet_id,
1979
                                const std::shared_ptr<const YBTable>& table,
1980
                                master::IncludeInactive include_inactive,
1981
                                CoarseTimePoint deadline,
1982
                                LookupTabletCallback callback,
1983
1.17M
                                UseCache use_cache) {
1984
1.17M
  data_->meta_cache_->LookupTabletById(
1985
1.17M
      tablet_id, table, include_inactive, deadline, std::move(callback), use_cache);
1986
1.17M
}
1987
1988
void YBClient::LookupAllTablets(const std::shared_ptr<const YBTable>& table,
1989
                                CoarseTimePoint deadline,
1990
0
                                LookupTabletRangeCallback callback) {
1991
0
  data_->meta_cache_->LookupAllTablets(table, deadline, std::move(callback));
1992
0
}
1993
1994
std::future<Result<internal::RemoteTabletPtr>> YBClient::LookupTabletByKeyFuture(
1995
    const std::shared_ptr<YBTable>& table,
1996
    const std::string& partition_key,
1997
0
    CoarseTimePoint deadline) {
1998
0
  return data_->meta_cache_->LookupTabletByKeyFuture(table, partition_key, deadline);
1999
0
}
2000
2001
std::future<Result<std::vector<internal::RemoteTabletPtr>>> YBClient::LookupAllTabletsFuture(
2002
    const std::shared_ptr<const YBTable>& table,
2003
0
    CoarseTimePoint deadline) {
2004
0
  return MakeFuture<Result<std::vector<internal::RemoteTabletPtr>>>([&](auto callback) {
2005
0
    this->LookupAllTablets(table, deadline, std::move(callback));
2006
0
  });
2007
0
}
2008
2009
27
HostPort YBClient::GetMasterLeaderAddress() {
2010
27
  return data_->leader_master_hostport();
2011
27
}
2012
2013
0
Status YBClient::ListMasters(CoarseTimePoint deadline, std::vector<std::string>* master_uuids) {
2014
0
  ListMastersRequestPB req;
2015
0
  ListMastersResponsePB resp;
2016
0
  CALL_SYNC_LEADER_MASTER_RPC_WITH_DEADLINE(Cluster, req, resp, deadline, ListMasters);
2017
2018
0
  master_uuids->clear();
2019
0
  for (const ServerEntryPB& master : resp.masters()) {
2020
0
    if (master.has_error()) {
2021
0
      LOG(ERROR) << "Master " << master.ShortDebugString() << " hit error "
2022
0
        << master.error().ShortDebugString();
2023
0
      return StatusFromPB(master.error());
2024
0
    }
2025
0
    master_uuids->push_back(master.instance_id().permanent_uuid());
2026
0
  }
2027
0
  return Status::OK();
2028
0
}
2029
2030
0
Result<HostPort> YBClient::RefreshMasterLeaderAddress() {
2031
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
2032
0
  RETURN_NOT_OK(data_->SetMasterServerProxy(deadline));
2033
2034
0
  return GetMasterLeaderAddress();
2035
0
}
2036
2037
488
void YBClient::RefreshMasterLeaderAddressAsync() {
2038
488
  data_->SetMasterServerProxyAsync(
2039
488
      CoarseMonoClock::Now() + default_admin_operation_timeout(),
2040
484
      false /* skip_resolution */, true /* wait_for_leader_election */, /* callback */ [](auto){});
2041
488
}
2042
2043
0
Status YBClient::RemoveMasterFromClient(const HostPort& remove) {
2044
0
  return data_->RemoveMasterAddress(remove);
2045
0
}
2046
2047
0
Status YBClient::AddMasterToClient(const HostPort& add) {
2048
0
  return data_->AddMasterAddress(add);
2049
0
}
2050
2051
0
Status YBClient::SetMasterAddresses(const std::string& addrs) {
2052
0
  return data_->SetMasterAddresses(addrs);
2053
0
}
2054
2055
0
Status YBClient::GetMasterUUID(const string& host, uint16_t port, string* uuid) {
2056
0
  HostPort hp(host, port);
2057
0
  ServerEntryPB server;
2058
0
  RETURN_NOT_OK(master::GetMasterEntryForHosts(
2059
0
      data_->proxy_cache_.get(), {hp}, default_rpc_timeout(), &server));
2060
2061
0
  if (server.has_error()) {
2062
0
    return STATUS_FORMAT(
2063
0
      RuntimeError,
2064
0
      "Error while getting uuid of $0.",
2065
0
      HostPortToString(host, port));
2066
0
  }
2067
2068
0
  *uuid = server.instance_id().permanent_uuid();
2069
2070
0
  return Status::OK();
2071
0
}
2072
2073
4
Status YBClient::SetReplicationInfo(const ReplicationInfoPB& replication_info) {
2074
4
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
2075
4
  return data_->SetReplicationInfo(this, replication_info, deadline);
2076
4
}
2077
2078
0
Status YBClient::ValidateReplicationInfo(const ReplicationInfoPB& replication_info) {
2079
0
  auto deadline = CoarseMonoClock::Now() + default_rpc_timeout();
2080
0
  return data_->ValidateReplicationInfo(replication_info, deadline);
2081
0
}
2082
2083
Result<std::vector<YBTableName>> YBClient::ListTables(const std::string& filter,
2084
17
                                                      bool exclude_ysql) {
2085
17
  ListTablesRequestPB req;
2086
17
  ListTablesResponsePB resp;
2087
2088
17
  if (!filter.empty()) {
2089
17
    req.set_name_filter(filter);
2090
17
  }
2091
2092
17
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListTables);
2093
17
  std::vector<YBTableName> result;
2094
17
  result.reserve(resp.tables_size());
2095
2096
18
  for (int i = 0; i < resp.tables_size(); i++) {
2097
1
    const ListTablesResponsePB_TableInfo& table_info = resp.tables(i);
2098
1
    DCHECK(table_info.has_namespace_());
2099
1
    DCHECK(table_info.namespace_().has_name());
2100
1
    DCHECK(table_info.namespace_().has_id());
2101
1
    if (exclude_ysql && table_info.table_type() == TableType::PGSQL_TABLE_TYPE) {
2102
0
      continue;
2103
0
    }
2104
1
    result.emplace_back(master::GetDatabaseTypeForTable(table_info.table_type()),
2105
1
                        table_info.namespace_().id(),
2106
1
                        table_info.namespace_().name(),
2107
1
                        table_info.id(),
2108
1
                        table_info.name(),
2109
1
                        table_info.pgschema_name(),
2110
1
                        table_info.relation_type());
2111
1
  }
2112
17
  return result;
2113
17
}
2114
2115
153
Result<std::vector<YBTableName>> YBClient::ListUserTables(const NamespaceId& ns_id) {
2116
153
  ListTablesRequestPB req;
2117
153
  ListTablesResponsePB resp;
2118
153
  bool exclude_ysql = false;
2119
2120
153
  if (!ns_id.empty()) {
2121
153
    req.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL);
2122
153
    req.mutable_namespace_()->set_id(ns_id);
2123
153
  }
2124
2125
153
  req.add_relation_type_filter(master::USER_TABLE_RELATION);
2126
153
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListTables);
2127
153
  std::vector<YBTableName> result;
2128
153
  result.reserve(resp.tables_size());
2129
2130
2.69k
  for (int i = 0; i < resp.tables_size(); i++) {
2131
2.53k
    const ListTablesResponsePB_TableInfo& table_info = resp.tables(i);
2132
2.53k
    DCHECK(table_info.has_namespace_());
2133
2.53k
    DCHECK(table_info.namespace_().has_name());
2134
2.53k
    DCHECK(table_info.namespace_().has_id());
2135
2.53k
    if (exclude_ysql && table_info.table_type() == TableType::PGSQL_TABLE_TYPE) {
2136
0
      continue;
2137
0
    }
2138
2.53k
    result.emplace_back(master::GetDatabaseTypeForTable(table_info.table_type()),
2139
2.53k
                        table_info.namespace_().id(),
2140
2.53k
                        table_info.namespace_().name(),
2141
2.53k
                        table_info.id(),
2142
2.53k
                        table_info.name(),
2143
2.53k
                        table_info.pgschema_name(),
2144
2.53k
                        table_info.relation_type());
2145
2.53k
  }
2146
153
  return result;
2147
153
}
2148
2149
17
Result<bool> YBClient::TableExists(const YBTableName& table_name) {
2150
17
  for (const YBTableName& table : VERIFY_RESULT(ListTables(table_name.table_name()))) {
2151
1
    if (table == table_name) {
2152
1
      return true;
2153
1
    }
2154
1
  }
2155
16
  return false;
2156
17
}
2157
2158
47.2k
Status YBClient::OpenTable(const YBTableName& table_name, YBTablePtr* table) {
2159
47.2k
  YBTableInfo info;
2160
47.2k
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
2161
47.2k
  RETURN_NOT_OK(data_->GetTableSchema(this, table_name, deadline, &info));
2162
44.9k
  auto future = FetchPartitionsFuture(this, info.table_id);
2163
  // In the future, probably will look up the table in some map to reuse YBTable instances.
2164
44.9k
  *table = std::make_shared<YBTable>(info, VERIFY_RESULT(future.get()));
2165
44.9k
  return Status::OK();
2166
44.9k
}
2167
2168
Status YBClient::OpenTable(
2169
40.5k
    const TableId& table_id, YBTablePtr* table, master::GetTableSchemaResponsePB* resp) {
2170
  // Fetch partitions first to run GetTableSchema and GetTableLocations RPCs in parallel.
2171
40.5k
  auto future = FetchPartitionsFuture(this, table_id);
2172
2173
40.5k
  YBTableInfo info;
2174
40.5k
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
2175
40.5k
  RETURN_NOT_OK(data_->GetTableSchema(this, table_id, deadline, &info, resp));
2176
  // In the future, probably will look up the table in some map to reuse YBTable instances.
2177
40.4k
  *table = std::make_shared<YBTable>(info, VERIFY_RESULT(future.get()));
2178
40.4k
  return Status::OK();
2179
40.4k
}
2180
2181
2.08k
shared_ptr<YBSession> YBClient::NewSession() {
2182
2.08k
  return std::make_shared<YBSession>(this);
2183
2.08k
}
2184
2185
0
bool YBClient::IsMultiMaster() const {
2186
0
  return data_->IsMultiMaster();
2187
0
}
2188
2189
2.89k
Result<int> YBClient::NumTabletsForUserTable(TableType table_type) {
2190
2.89k
  if (table_type == TableType::PGSQL_TABLE_TYPE &&
2191
1.27k
        FLAGS_ysql_num_tablets > 0) {
2192
0
    VLOG(1) << "num_tablets = " << FLAGS_ysql_num_tablets
2193
0
              << ": --ysql_num_tablets is specified.";
2194
0
    return FLAGS_ysql_num_tablets;
2195
2.89k
  } else if (FLAGS_ycql_num_tablets > 0) {
2196
0
    VLOG(1) << "num_tablets = " << FLAGS_ycql_num_tablets
2197
0
              << ": --ycql_num_tablets is specified.";
2198
33
    return FLAGS_ycql_num_tablets;
2199
2.85k
  } else {
2200
2.85k
    int tserver_count = 0;
2201
2.85k
    RETURN_NOT_OK(TabletServerCount(&tserver_count, true /* primary_only */));
2202
2.85k
    int num_tablets = 0;
2203
2.85k
    if (table_type == TableType::PGSQL_TABLE_TYPE) {
2204
1.27k
      num_tablets = tserver_count * FLAGS_ysql_num_shards_per_tserver;
2205
0
      VLOG(1) << "num_tablets = " << num_tablets << ": "
2206
0
              << "calculated as tserver_count * FLAGS_ysql_num_shards_per_tserver ("
2207
0
              << tserver_count << " * " << FLAGS_ysql_num_shards_per_tserver << ")";
2208
1.58k
    } else {
2209
1.58k
      num_tablets = tserver_count * FLAGS_yb_num_shards_per_tserver;
2210
0
      VLOG(1) << "num_tablets = " << num_tablets << ": "
2211
0
              << "calculated as tserver_count * FLAGS_yb_num_shards_per_tserver ("
2212
0
              << tserver_count << " * " << FLAGS_yb_num_shards_per_tserver << ")";
2213
1.58k
    }
2214
2.85k
    return num_tablets;
2215
2.85k
  }
2216
2.89k
}
2217
2218
0
void YBClient::TEST_set_admin_operation_timeout(const MonoDelta& timeout) {
2219
0
  data_->default_admin_operation_timeout_ = timeout;
2220
0
}
2221
2222
356k
const MonoDelta& YBClient::default_admin_operation_timeout() const {
2223
356k
  return data_->default_admin_operation_timeout_;
2224
356k
}
2225
2226
0
const MonoDelta& YBClient::default_rpc_timeout() const {
2227
0
  return data_->default_rpc_timeout_;
2228
0
}
2229
2230
const uint64_t YBClient::kNoHybridTime = 0;
2231
2232
0
uint64_t YBClient::GetLatestObservedHybridTime() const {
2233
0
  return data_->GetLatestObservedHybridTime();
2234
0
}
2235
2236
0
void YBClient::SetLatestObservedHybridTime(uint64_t ht_hybrid_time) {
2237
0
  data_->UpdateLatestObservedHybridTime(ht_hybrid_time);
2238
0
}
2239
2240
4.49k
CoarseTimePoint YBClient::PatchAdminDeadline(CoarseTimePoint deadline) const {
2241
4.49k
  if (deadline != CoarseTimePoint()) {
2242
1.21k
    return deadline;
2243
1.21k
  }
2244
3.28k
  return CoarseMonoClock::Now() + default_admin_operation_timeout();
2245
3.28k
}
2246
2247
0
Result<vector<master::NamespaceIdentifierPB>> YBClient::ListNamespaces() {
2248
0
  return ListNamespaces(boost::none);
2249
0
}
2250
2251
0
Result<YBTablePtr> YBClient::OpenTable(const TableId& table_id) {
2252
0
  YBTablePtr result;
2253
0
  RETURN_NOT_OK(OpenTable(table_id, &result));
2254
0
  return result;
2255
0
}
2256
2257
0
Result<YBTablePtr> YBClient::OpenTable(const YBTableName& name) {
2258
0
  YBTablePtr result;
2259
0
  RETURN_NOT_OK(OpenTable(name, &result));
2260
0
  return result;
2261
0
}
2262
2263
0
Result<TableId> GetTableId(YBClient* client, const YBTableName& table_name) {
2264
0
  return VERIFY_RESULT(client->GetYBTableInfo(table_name)).table_id;
2265
0
}
2266
2267
}  // namespace client
2268
}  // namespace yb