YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/client.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/client/client.h"
34
35
#include <algorithm>
36
#include <limits>
37
#include <map>
38
#include <memory>
39
#include <mutex>
40
#include <set>
41
#include <string>
42
#include <unordered_map>
43
#include <unordered_set>
44
#include <vector>
45
46
#include <boost/container/small_vector.hpp>
47
#include <boost/preprocessor/cat.hpp>
48
#include <boost/preprocessor/stringize.hpp>
49
50
#include "yb/cdc/cdc_service.h"
51
#include "yb/client/client_fwd.h"
52
#include "yb/client/callbacks.h"
53
#include "yb/client/client-internal.h"
54
#include "yb/client/client_builder-internal.h"
55
#include "yb/client/client_utils.h"
56
#include "yb/client/meta_cache.h"
57
#include "yb/client/namespace_alterer.h"
58
#include "yb/client/permissions.h"
59
#include "yb/client/session.h"
60
#include "yb/client/table.h"
61
#include "yb/client/table_alterer.h"
62
#include "yb/client/table_creator.h"
63
#include "yb/client/table_info.h"
64
#include "yb/client/tablet_server.h"
65
#include "yb/client/yb_table_name.h"
66
67
#include "yb/common/common.pb.h"
68
#include "yb/common/common_flags.h"
69
#include "yb/common/entity_ids.h"
70
#include "yb/common/partition.h"
71
#include "yb/common/ql_type.h"
72
#include "yb/common/roles_permissions.h"
73
#include "yb/common/schema.h"
74
#include "yb/common/transaction.h"
75
#include "yb/common/wire_protocol.h"
76
77
#include "yb/gutil/bind.h"
78
#include "yb/gutil/map-util.h"
79
#include "yb/gutil/strings/substitute.h"
80
81
#include "yb/master/master_admin.proxy.h"
82
#include "yb/master/master_client.proxy.h"
83
#include "yb/master/master_cluster.proxy.h"
84
#include "yb/master/master_dcl.proxy.h"
85
#include "yb/master/master_ddl.proxy.h"
86
#include "yb/master/master_replication.proxy.h"
87
#include "yb/master/master_error.h"
88
#include "yb/master/master_util.h"
89
90
#include "yb/rpc/messenger.h"
91
#include "yb/rpc/proxy.h"
92
#include "yb/rpc/rpc.h"
93
94
#include "yb/util/atomic.h"
95
#include "yb/util/flag_tags.h"
96
#include "yb/util/format.h"
97
#include "yb/util/init.h"
98
#include "yb/util/logging.h"
99
#include "yb/util/logging_callback.h"
100
#include "yb/util/mem_tracker.h"
101
#include "yb/util/metric_entity.h"
102
#include "yb/util/monotime.h"
103
#include "yb/util/net/net_util.h"
104
#include "yb/util/result.h"
105
#include "yb/util/scope_exit.h"
106
#include "yb/util/size_literals.h"
107
#include "yb/util/slice.h"
108
#include "yb/util/status.h"
109
#include "yb/util/status_format.h"
110
#include "yb/util/status_log.h"
111
#include "yb/util/strongly_typed_bool.h"
112
113
#include "yb/yql/cql/ql/ptree/pt_option.h"
114
115
using namespace std::literals;
116
117
using yb::master::AlterTableRequestPB;
118
using yb::master::CreateTablegroupRequestPB;
119
using yb::master::CreateTablegroupResponsePB;
120
using yb::master::DeleteTablegroupRequestPB;
121
using yb::master::DeleteTablegroupResponsePB;
122
using yb::master::ListTablegroupsRequestPB;
123
using yb::master::ListTablegroupsResponsePB;
124
using yb::master::GetNamespaceInfoRequestPB;
125
using yb::master::GetNamespaceInfoResponsePB;
126
using yb::master::GetTableLocationsRequestPB;
127
using yb::master::GetTableLocationsResponsePB;
128
using yb::master::GetTabletLocationsRequestPB;
129
using yb::master::GetTabletLocationsResponsePB;
130
using yb::master::GetTransactionStatusTabletsRequestPB;
131
using yb::master::GetTransactionStatusTabletsResponsePB;
132
using yb::master::IsLoadBalancedRequestPB;
133
using yb::master::IsLoadBalancedResponsePB;
134
using yb::master::IsLoadBalancerIdleRequestPB;
135
using yb::master::IsLoadBalancerIdleResponsePB;
136
using yb::master::ListMastersRequestPB;
137
using yb::master::ListMastersResponsePB;
138
using yb::master::ListTablesRequestPB;
139
using yb::master::ListTablesResponsePB;
140
using yb::master::ListTablesResponsePB_TableInfo;
141
using yb::master::ListTabletServersRequestPB;
142
using yb::master::ListTabletServersResponsePB;
143
using yb::master::ListTabletServersResponsePB_Entry;
144
using yb::master::ListLiveTabletServersRequestPB;
145
using yb::master::ListLiveTabletServersResponsePB;
146
using yb::master::ListLiveTabletServersResponsePB_Entry;
147
using yb::master::CreateNamespaceRequestPB;
148
using yb::master::CreateNamespaceResponsePB;
149
using yb::master::DeleteNamespaceRequestPB;
150
using yb::master::DeleteNamespaceResponsePB;
151
using yb::master::ListNamespacesRequestPB;
152
using yb::master::ListNamespacesResponsePB;
153
using yb::master::ReservePgsqlOidsRequestPB;
154
using yb::master::ReservePgsqlOidsResponsePB;
155
using yb::master::GetYsqlCatalogConfigRequestPB;
156
using yb::master::GetYsqlCatalogConfigResponsePB;
157
using yb::master::CreateUDTypeRequestPB;
158
using yb::master::CreateUDTypeResponsePB;
159
using yb::master::AlterRoleRequestPB;
160
using yb::master::AlterRoleResponsePB;
161
using yb::master::CreateRoleRequestPB;
162
using yb::master::CreateRoleResponsePB;
163
using yb::master::DeleteUDTypeRequestPB;
164
using yb::master::DeleteUDTypeResponsePB;
165
using yb::master::DeleteRoleRequestPB;
166
using yb::master::DeleteRoleResponsePB;
167
using yb::master::GetPermissionsRequestPB;
168
using yb::master::GetPermissionsResponsePB;
169
using yb::master::GrantRevokeRoleRequestPB;
170
using yb::master::GrantRevokeRoleResponsePB;
171
using yb::master::GetUDTypeInfoRequestPB;
172
using yb::master::GetUDTypeInfoResponsePB;
173
using yb::master::GrantRevokePermissionResponsePB;
174
using yb::master::GrantRevokePermissionRequestPB;
175
using yb::master::MasterDdlProxy;
176
using yb::master::ReplicationInfoPB;
177
using yb::master::TabletLocationsPB;
178
using yb::master::RedisConfigSetRequestPB;
179
using yb::master::RedisConfigSetResponsePB;
180
using yb::master::RedisConfigGetRequestPB;
181
using yb::master::RedisConfigGetResponsePB;
182
using yb::master::CreateCDCStreamRequestPB;
183
using yb::master::CreateCDCStreamResponsePB;
184
using yb::master::DeleteCDCStreamRequestPB;
185
using yb::master::DeleteCDCStreamResponsePB;
186
using yb::master::GetCDCDBStreamInfoRequestPB;
187
using yb::master::GetCDCDBStreamInfoResponsePB;
188
using yb::master::GetCDCStreamRequestPB;
189
using yb::master::GetCDCStreamResponsePB;
190
using yb::master::UpdateCDCStreamRequestPB;
191
using yb::master::UpdateCDCStreamResponsePB;
192
using yb::master::GetMasterClusterConfigRequestPB;
193
using yb::master::GetMasterClusterConfigResponsePB;
194
using yb::master::CreateTransactionStatusTableRequestPB;
195
using yb::master::CreateTransactionStatusTableResponsePB;
196
using yb::master::UpdateConsumerOnProducerSplitRequestPB;
197
using yb::master::UpdateConsumerOnProducerSplitResponsePB;
198
using yb::master::PlacementInfoPB;
199
using yb::rpc::Messenger;
200
using std::string;
201
using std::vector;
202
using google::protobuf::RepeatedPtrField;
203
204
using namespace yb::size_literals;  // NOLINT.
205
206
DEFINE_bool(client_suppress_created_logs, false,
207
            "Suppress 'Created table ...' messages");
208
TAG_FLAG(client_suppress_created_logs, advanced);
209
TAG_FLAG(client_suppress_created_logs, hidden);
210
211
DEFINE_int32(backfill_index_client_rpc_timeout_ms, 60 * 60 * 1000, // 60 min.
212
             "Timeout for BackfillIndex RPCs from client to master.");
213
TAG_FLAG(backfill_index_client_rpc_timeout_ms, advanced);
214
215
DEFINE_int32(ycql_num_tablets, -1,
216
             "The number of tablets per YCQL table. Default value is -1. "
217
             "Colocated tables are not affected. "
218
             "If it's value is not set then the value of yb_num_shards_per_tserver is used "
219
             "in conjunction with the number of tservers to determine the tablet count. "
220
             "If the user explicitly specifies a value of the tablet count in the Create Table "
221
             "DDL statement (with tablets = x syntax) then it takes precedence over the value "
222
             "of this flag. Needs to be set at tserver.");
223
TAG_FLAG(ycql_num_tablets, runtime);
224
225
DEFINE_int32(ysql_num_tablets, -1,
226
             "The number of tablets per YSQL table. Default value is -1. "
227
             "If it's value is not set then the value of ysql_num_shards_per_tserver is used "
228
             "in conjunction with the number of tservers to determine the tablet count. "
229
             "If the user explicitly specifies a value of the tablet count in the Create Table "
230
             "DDL statement (split into x tablets syntax) then it takes precedence over the "
231
             "value of this flag. Needs to be set at tserver.");
232
TAG_FLAG(ysql_num_tablets, runtime);
233
234
namespace yb {
235
namespace client {
236
237
using internal::MetaCache;
238
using ql::ObjectType;
239
using std::shared_ptr;
240
241
namespace {
242
243
void FillFromRepeatedTabletLocations(
244
    const RepeatedPtrField<TabletLocationsPB>& tablets,
245
    vector<TabletId>* tablet_uuids,
246
    vector<string>* ranges,
247
247
    vector<TabletLocationsPB>* locations) {
248
247
  tablet_uuids->reserve(tablets.size());
249
247
  if (ranges) {
250
247
    ranges->reserve(tablets.size());
251
247
  }
252
247
  if (locations) {
253
117
    locations->reserve(tablets.size());
254
117
  }
255
1.67k
  for (const auto& tablet : tablets) {
256
1.67k
    if (locations) {
257
1.02k
      locations->push_back(tablet);
258
1.02k
    }
259
1.67k
    tablet_uuids->push_back(tablet.tablet_id());
260
1.67k
    if (ranges) {
261
1.67k
      const auto& partition = tablet.partition();
262
1.67k
      ranges->push_back(partition.ShortDebugString());
263
1.67k
    }
264
1.67k
  }
265
247
}
266
267
std::future<FetchPartitionsResult> FetchPartitionsFuture(
268
134k
    YBClient* client, const TableId& table_id) {
269
134k
  return MakeFuture<FetchPartitionsResult>(
270
134k
      [&](const auto& callback) 
{ YBTable::FetchPartitions(client, table_id, callback); }134k
);
271
134k
}
272
273
} // namespace
274
275
#define CALL_SYNC_LEADER_MASTER_RPC_EX(service, req, resp, method) \
276
157k
  do { \
277
157k
    auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout(); \
278
157k
    CALL_SYNC_LEADER_MASTER_RPC_WITH_DEADLINE(service, req, resp, deadline, method); \
279
154k
  } while(0);
280
281
#define CALL_SYNC_LEADER_MASTER_RPC(req, resp, method) \
282
12.1k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Ddl, req, resp, method)
283
284
#define CALL_SYNC_LEADER_MASTER_RPC_WITH_DEADLINE(service, req, resp, deadline, method) \
285
157k
  do { \
286
157k
    RETURN_NOT_OK(data_->SyncLeaderMasterRpc( \
287
157k
        deadline, req, &resp, BOOST_PP_STRINGIZE(method), \
288
157k
        &master::BOOST_PP_CAT(BOOST_PP_CAT(Master, service), Proxy)::            \
289
157k
            BOOST_PP_CAT(method, Async))); \
290
157k
  } while(
0154k
);
291
292
// Adapts between the internal LogSeverity and the client's YBLogSeverity.
293
static void LoggingAdapterCB(YBLoggingCallback* user_cb,
294
                             LogSeverity severity,
295
                             const char* filename,
296
                             int line_number,
297
                             const struct ::tm* time,
298
                             const char* message,
299
0
                             size_t message_len) {
300
0
  YBLogSeverity client_severity;
301
0
  switch (severity) {
302
0
    case yb::SEVERITY_INFO:
303
0
      client_severity = SEVERITY_INFO;
304
0
      break;
305
0
    case yb::SEVERITY_WARNING:
306
0
      client_severity = SEVERITY_WARNING;
307
0
      break;
308
0
    case yb::SEVERITY_ERROR:
309
0
      client_severity = SEVERITY_ERROR;
310
0
      break;
311
0
    case yb::SEVERITY_FATAL:
312
0
      client_severity = SEVERITY_FATAL;
313
0
      break;
314
0
    default:
315
0
      LOG(FATAL) << "Unknown YB log severity: " << severity;
316
0
  }
317
0
  user_cb->Run(client_severity, filename, line_number, time,
318
0
               message, message_len);
319
0
}
320
321
0
void InitLogging() {
322
0
  InitGoogleLoggingSafeBasic("yb_client");
323
0
}
324
325
0
void InstallLoggingCallback(YBLoggingCallback* cb) {
326
0
  RegisterLoggingCallback(Bind(&LoggingAdapterCB, Unretained(cb)));
327
0
}
328
329
0
void UninstallLoggingCallback() {
330
0
  UnregisterLoggingCallback();
331
0
}
332
333
0
void SetVerboseLogLevel(int level) {
334
0
  FLAGS_v = level;
335
0
}
336
337
0
Status SetInternalSignalNumber(int signum) {
338
0
  return SetStackTraceSignal(signum);
339
0
}
340
341
YBClientBuilder::YBClientBuilder()
342
34.3k
  : data_(new YBClientBuilder::Data()) {
343
34.3k
}
344
345
898
YBClientBuilder::~YBClientBuilder() {
346
898
}
347
348
366
YBClientBuilder& YBClientBuilder::clear_master_server_addrs() {
349
366
  data_->master_server_addrs_.clear();
350
366
  return *this;
351
366
}
352
353
0
YBClientBuilder& YBClientBuilder::master_server_addrs(const vector<string>& addrs) {
354
0
  for (const string& addr : addrs) {
355
0
    data_->master_server_addrs_.push_back(addr);
356
0
  }
357
0
  return *this;
358
0
}
359
360
34.5k
YBClientBuilder& YBClientBuilder::add_master_server_addr(const string& addr) {
361
34.5k
  data_->master_server_addrs_.push_back(addr);
362
34.5k
  return *this;
363
34.5k
}
364
365
0
YBClientBuilder& YBClientBuilder::skip_master_flagfile(bool should_skip) {
366
0
  data_->skip_master_flagfile_ = should_skip;
367
0
  return *this;
368
0
}
369
370
114
YBClientBuilder& YBClientBuilder::wait_for_leader_election_on_init(bool should_wait) {
371
114
  data_->wait_for_leader_election_on_init_ = should_wait;
372
114
  return *this;
373
114
}
374
375
16.3k
YBClientBuilder& YBClientBuilder::default_admin_operation_timeout(const MonoDelta& timeout) {
376
16.3k
  data_->default_admin_operation_timeout_ = timeout;
377
16.3k
  return *this;
378
16.3k
}
379
380
33.8k
YBClientBuilder& YBClientBuilder::default_rpc_timeout(const MonoDelta& timeout) {
381
33.8k
  data_->default_rpc_timeout_ = timeout;
382
33.8k
  return *this;
383
33.8k
}
384
385
8.74k
YBClientBuilder& YBClientBuilder::set_num_reactors(int32_t num_reactors) {
386
8.74k
  CHECK_GT(num_reactors, 0);
387
8.74k
  data_->num_reactors_ = num_reactors;
388
8.74k
  return *this;
389
8.74k
}
390
391
33.5k
YBClientBuilder& YBClientBuilder::set_cloud_info_pb(const CloudInfoPB& cloud_info_pb) {
392
33.5k
  data_->cloud_info_pb_ = cloud_info_pb;
393
33.5k
  return *this;
394
33.5k
}
395
396
YBClientBuilder& YBClientBuilder::set_metric_entity(
397
33.5k
    const scoped_refptr<MetricEntity>& metric_entity) {
398
33.5k
  data_->metric_entity_ = metric_entity;
399
33.5k
  return *this;
400
33.5k
}
401
402
33.5k
YBClientBuilder& YBClientBuilder::set_client_name(const std::string& name) {
403
33.5k
  data_->client_name_ = name;
404
33.5k
  return *this;
405
33.5k
}
406
407
0
YBClientBuilder& YBClientBuilder::set_callback_threadpool_size(size_t size) {
408
0
  data_->threadpool_size_ = size;
409
0
  return *this;
410
0
}
411
412
17.6k
YBClientBuilder& YBClientBuilder::set_tserver_uuid(const TabletServerId& uuid) {
413
17.6k
  data_->uuid_ = uuid;
414
17.6k
  return *this;
415
17.6k
}
416
417
33.5k
YBClientBuilder& YBClientBuilder::set_parent_mem_tracker(const MemTrackerPtr& mem_tracker) {
418
33.5k
  data_->parent_mem_tracker_ = mem_tracker;
419
33.5k
  return *this;
420
33.5k
}
421
422
16.0k
YBClientBuilder& YBClientBuilder::set_master_address_flag_name(const std::string& value) {
423
16.0k
  data_->master_address_flag_name_ = value;
424
16.0k
  return *this;
425
16.0k
}
426
427
33.5k
YBClientBuilder& YBClientBuilder::set_skip_master_leader_resolution(bool value) {
428
33.5k
  data_->skip_master_leader_resolution_ = value;
429
33.5k
  return *this;
430
33.5k
}
431
432
16.0k
YBClientBuilder& YBClientBuilder::AddMasterAddressSource(const MasterAddressSource& source) {
433
16.0k
  data_->master_address_sources_.push_back(source);
434
16.0k
  return *this;
435
16.0k
}
436
437
46.8k
Status YBClientBuilder::DoBuild(rpc::Messenger* messenger, std::unique_ptr<YBClient>* client) {
438
46.8k
  RETURN_NOT_OK(CheckCPUFlags());
439
440
46.8k
  std::unique_ptr<YBClient> c(new YBClient());
441
442
  // Init messenger.
443
46.8k
  if (messenger) {
444
45.5k
    c->data_->messenger_holder_ = nullptr;
445
45.5k
    c->data_->messenger_ = messenger;
446
45.5k
  } else {
447
1.28k
    c->data_->messenger_holder_ = VERIFY_RESULT(client::CreateClientMessenger(
448
0
        data_->client_name_, data_->num_reactors_,
449
0
        data_->metric_entity_, data_->parent_mem_tracker_));
450
0
    c->data_->messenger_ = c->data_->messenger_holder_.get();
451
1.28k
  }
452
46.8k
  c->data_->proxy_cache_ = std::make_unique<rpc::ProxyCache>(c->data_->messenger_);
453
46.8k
  c->data_->metric_entity_ = data_->metric_entity_;
454
455
46.8k
  c->data_->master_address_flag_name_ = data_->master_address_flag_name_;
456
46.8k
  c->data_->master_address_sources_ = data_->master_address_sources_;
457
46.8k
  c->data_->master_server_addrs_ = data_->master_server_addrs_;
458
46.8k
  c->data_->skip_master_flagfile_ = data_->skip_master_flagfile_;
459
46.8k
  c->data_->default_admin_operation_timeout_ = data_->default_admin_operation_timeout_;
460
46.8k
  c->data_->default_rpc_timeout_ = data_->default_rpc_timeout_;
461
46.8k
  c->data_->wait_for_leader_election_on_init_ = data_->wait_for_leader_election_on_init_;
462
463
46.8k
  auto callback_threadpool_size = data_->threadpool_size_;
464
46.8k
  if (callback_threadpool_size == YBClientBuilder::Data::kUseNumReactorsAsNumThreads) {
465
0
    callback_threadpool_size = c->data_->messenger_->num_reactors();
466
0
  }
467
46.8k
  c->data_->use_threadpool_for_callbacks_ = callback_threadpool_size != 0;
468
46.8k
  if (callback_threadpool_size == 0) {
469
45.7k
    callback_threadpool_size = 1;
470
45.7k
  }
471
472
  // Not using an underscore because we sometimes get shortened thread names like "master_c" and it
473
  // is clearer to see "mastercb" instead.
474
46.8k
  ThreadPoolBuilder tpb(data_->client_name_ + "cb");
475
46.8k
  tpb.set_max_threads(narrow_cast<int>(callback_threadpool_size));
476
46.8k
  std::unique_ptr<ThreadPool> tp;
477
46.8k
  RETURN_NOT_OK_PREPEND(
478
46.8k
      tpb.Build(&tp),
479
46.8k
      Format("Could not create callback threadpool with $0 max threads",
480
46.8k
             callback_threadpool_size));
481
46.8k
  c->data_->threadpool_ = std::move(tp);
482
483
  // Let's allow for plenty of time for discovering the master the first
484
  // time around.
485
46.8k
  auto deadline = CoarseMonoClock::Now() + c->default_admin_operation_timeout();
486
46.8k
  for (;;) {
487
44.7k
    auto status = c->data_->SetMasterServerProxy(deadline,
488
44.7k
            data_->skip_master_leader_resolution_,
489
44.7k
            data_->wait_for_leader_election_on_init_);
490
44.7k
    if (status.ok()) {
491
30.7k
      break;
492
30.7k
    }
493
14.0k
    if (!status.IsNotFound() || 
CoarseMonoClock::Now() >= deadline2
) {
494
12.7k
      RETURN_NOT_OK_PREPEND(status, "Could not locate the leader master")
495
12.7k
    }
496
14.0k
  }
497
498
34.1k
  c->data_->meta_cache_.reset(new MetaCache(c.get()));
499
500
  // Init local host names used for locality decisions.
501
34.1k
  RETURN_NOT_OK_PREPEND(c->data_->InitLocalHostNames(),
502
34.1k
                        "Could not determine local host names");
503
34.1k
  c->data_->cloud_info_pb_ = data_->cloud_info_pb_;
504
34.1k
  c->data_->uuid_ = data_->uuid_;
505
506
34.1k
  client->swap(c);
507
34.1k
  return Status::OK();
508
34.1k
}
509
510
46.8k
Result<std::unique_ptr<YBClient>> YBClientBuilder::Build(rpc::Messenger* messenger) {
511
46.8k
  std::unique_ptr<YBClient> client;
512
46.8k
  RETURN_NOT_OK(DoBuild(messenger, &client));
513
34.1k
  return client;
514
46.8k
}
515
516
Result<std::unique_ptr<YBClient>> YBClientBuilder::Build(
517
0
    std::unique_ptr<rpc::Messenger>&& messenger) {
518
0
  std::unique_ptr<YBClient> client;
519
0
  auto ok = false;
520
0
  auto scope_exit = ScopeExit([&ok, &messenger] {
521
0
    if (!ok) {
522
0
      messenger->Shutdown();
523
0
    }
524
0
  });
525
0
  RETURN_NOT_OK(DoBuild(messenger.get(), &client));
526
0
  ok = true;
527
0
  client->data_->messenger_holder_ = std::move(messenger);
528
0
  return client;
529
0
}
530
531
46.8k
YBClient::YBClient() : data_(new YBClient::Data()) {
532
46.8k
  yb::InitCommonFlags();
533
46.8k
}
534
535
13.5k
YBClient::~YBClient() {
536
13.5k
  Shutdown();
537
13.5k
}
538
539
13.5k
void YBClient::Shutdown() {
540
13.5k
  data_->StartShutdown();
541
13.5k
  if (data_->messenger_holder_) {
542
250
    data_->messenger_holder_->Shutdown();
543
250
  }
544
13.5k
  if (data_->threadpool_) {
545
13.4k
    data_->threadpool_->Shutdown();
546
13.4k
  }
547
13.5k
  data_->CompleteShutdown();
548
13.5k
}
549
550
7.01k
std::unique_ptr<YBTableCreator> YBClient::NewTableCreator() {
551
7.01k
  return std::unique_ptr<YBTableCreator>(new YBTableCreator(this));
552
7.01k
}
553
554
Status YBClient::IsCreateTableInProgress(const YBTableName& table_name,
555
37
                                         bool *create_in_progress) {
556
37
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
557
37
  return data_->IsCreateTableInProgress(this, table_name, "" /* table_id */, deadline,
558
37
                                        create_in_progress);
559
37
}
560
561
0
Status YBClient::WaitForCreateTableToFinish(const YBTableName& table_name) {
562
0
  const auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
563
0
  return WaitForCreateTableToFinish(table_name, deadline);
564
0
}
565
566
Status YBClient::WaitForCreateTableToFinish(
567
0
    const YBTableName& table_name, const CoarseTimePoint& deadline) {
568
0
  return data_->WaitForCreateTableToFinish(this, table_name, "" /* table_id */, deadline);
569
0
}
570
571
0
Status YBClient::WaitForCreateTableToFinish(const string& table_id) {
572
0
  const auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
573
0
  return WaitForCreateTableToFinish(table_id, deadline);
574
0
}
575
576
Status YBClient::WaitForCreateTableToFinish(
577
0
    const string& table_id, const CoarseTimePoint& deadline) {
578
0
  const YBTableName empty_table_name;
579
0
  return data_->WaitForCreateTableToFinish(this, empty_table_name, table_id, deadline);
580
0
}
581
582
3.64k
Status YBClient::TruncateTable(const string& table_id, bool wait) {
583
3.64k
  return TruncateTables({table_id}, wait);
584
3.64k
}
585
586
3.79k
Status YBClient::TruncateTables(const vector<string>& table_ids, bool wait) {
587
3.79k
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
588
3.79k
  return data_->TruncateTables(this, table_ids, deadline, wait);
589
3.79k
}
590
591
540
Status YBClient::BackfillIndex(const TableId& table_id, bool wait, CoarseTimePoint deadline) {
592
540
  if (deadline == CoarseTimePoint()) {
593
0
    deadline = CoarseMonoClock::Now() + FLAGS_backfill_index_client_rpc_timeout_ms * 1ms;
594
0
  }
595
540
  return data_->BackfillIndex(this, YBTableName(), table_id, deadline, wait);
596
540
}
597
598
1.20k
Status YBClient::DeleteTable(const YBTableName& table_name, bool wait) {
599
1.20k
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
600
1.20k
  return data_->DeleteTable(this,
601
1.20k
                            table_name,
602
1.20k
                            "" /* table_id */,
603
1.20k
                            false /* is_index_table */,
604
1.20k
                            deadline,
605
1.20k
                            nullptr /* indexed_table_name */,
606
1.20k
                            wait);
607
1.20k
}
608
609
3.48k
Status YBClient::DeleteTable(const string& table_id, bool wait, CoarseTimePoint deadline) {
610
3.48k
  return data_->DeleteTable(this,
611
3.48k
                            YBTableName(),
612
3.48k
                            table_id,
613
3.48k
                            false /* is_index_table */,
614
3.48k
                            PatchAdminDeadline(deadline),
615
3.48k
                            nullptr /* indexed_table_name */,
616
3.48k
                            wait);
617
3.48k
}
618
619
Status YBClient::DeleteIndexTable(const YBTableName& table_name,
620
                                  YBTableName* indexed_table_name,
621
124
                                  bool wait) {
622
124
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
623
124
  return data_->DeleteTable(this,
624
124
                            table_name,
625
124
                            "" /* table_id */,
626
124
                            true /* is_index_table */,
627
124
                            deadline,
628
124
                            indexed_table_name,
629
124
                            wait);
630
124
}
631
632
Status YBClient::DeleteIndexTable(const string& table_id,
633
                                  YBTableName* indexed_table_name,
634
                                  bool wait,
635
669
                                  CoarseTimePoint deadline) {
636
669
  return data_->DeleteTable(this,
637
669
                            YBTableName(),
638
669
                            table_id,
639
669
                            true /* is_index_table */,
640
669
                            PatchAdminDeadline(deadline),
641
669
                            indexed_table_name,
642
669
                            wait);
643
669
}
644
645
Status YBClient::FlushTables(const std::vector<TableId>& table_ids,
646
                             bool add_indexes,
647
                             int timeout_secs,
648
8
                             bool is_compaction) {
649
8
  auto deadline = CoarseMonoClock::Now() + MonoDelta::FromSeconds(timeout_secs);
650
8
  return data_->FlushTables(this,
651
8
                            table_ids,
652
8
                            add_indexes,
653
8
                            deadline,
654
8
                            is_compaction);
655
8
}
656
657
Status YBClient::FlushTables(const std::vector<YBTableName>& table_names,
658
                             bool add_indexes,
659
                             int timeout_secs,
660
2
                             bool is_compaction) {
661
2
  auto deadline = CoarseMonoClock::Now() + MonoDelta::FromSeconds(timeout_secs);
662
2
  return data_->FlushTables(this,
663
2
                            table_names,
664
2
                            add_indexes,
665
2
                            deadline,
666
2
                            is_compaction);
667
2
}
668
669
178
std::unique_ptr<YBTableAlterer> YBClient::NewTableAlterer(const YBTableName& name) {
670
178
  return std::unique_ptr<YBTableAlterer>(new YBTableAlterer(this, name));
671
178
}
672
673
522
std::unique_ptr<YBTableAlterer> YBClient::NewTableAlterer(const string id) {
674
522
  return std::unique_ptr<YBTableAlterer>(new YBTableAlterer(this, id));
675
522
}
676
677
Status YBClient::IsAlterTableInProgress(const YBTableName& table_name,
678
                                        const string& table_id,
679
0
                                        bool *alter_in_progress) {
680
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
681
0
  return data_->IsAlterTableInProgress(this, table_name, table_id, deadline, alter_in_progress);
682
0
}
683
684
61
Result<YBTableInfo> YBClient::GetYBTableInfo(const YBTableName& table_name) {
685
61
  YBTableInfo info;
686
61
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
687
61
  RETURN_NOT_OK(data_->GetTableSchema(this, table_name, deadline, &info));
688
58
  return info;
689
61
}
690
691
Status YBClient::GetTableSchema(const YBTableName& table_name,
692
                                YBSchema* schema,
693
12
                                PartitionSchema* partition_schema) {
694
12
  Result<YBTableInfo> info = GetYBTableInfo(table_name);
695
12
  if (!info.ok()) {
696
0
    return info.status();
697
0
  }
698
  // Verify it is not an index table.
699
12
  if (info->index_info) {
700
0
    return STATUS(NotFound, "The table does not exist");
701
0
  }
702
703
12
  *schema = std::move(info->schema);
704
12
  *partition_schema = std::move(info->partition_schema);
705
12
  return Status::OK();
706
12
}
707
708
Status YBClient::GetTableSchemaById(const TableId& table_id, std::shared_ptr<YBTableInfo> info,
709
7
                                    StatusCallback callback) {
710
7
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
711
7
  return data_->GetTableSchemaById(this, table_id, deadline, info, callback);
712
7
}
713
714
Status YBClient::GetTablegroupSchemaById(const TablegroupId& parent_tablegroup_table_id,
715
                                         std::shared_ptr<std::vector<YBTableInfo>> info,
716
0
                                         StatusCallback callback) {
717
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
718
0
  return data_->GetTablegroupSchemaById(this,
719
0
                                        parent_tablegroup_table_id,
720
0
                                        deadline,
721
0
                                        info,
722
0
                                        callback);
723
0
}
724
725
Status YBClient::GetColocatedTabletSchemaById(const TableId& parent_colocated_table_id,
726
                                              std::shared_ptr<std::vector<YBTableInfo>> info,
727
0
                                              StatusCallback callback) {
728
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
729
0
  return data_->GetColocatedTabletSchemaById(this,
730
0
                                             parent_colocated_table_id,
731
0
                                             deadline,
732
0
                                             info,
733
0
                                             callback);
734
0
}
735
736
Result<IndexPermissions> YBClient::GetIndexPermissions(
737
    const TableId& table_id,
738
4
    const TableId& index_id) {
739
4
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
740
4
  return data_->GetIndexPermissions(
741
4
      this,
742
4
      table_id,
743
4
      index_id,
744
4
      deadline);
745
4
}
746
747
Result<IndexPermissions> YBClient::GetIndexPermissions(
748
    const YBTableName& table_name,
749
4
    const YBTableName& index_name) {
750
4
  YBTableInfo table_info = VERIFY_RESULT(GetYBTableInfo(table_name));
751
4
  YBTableInfo index_info = VERIFY_RESULT(GetYBTableInfo(index_name));
752
0
  return GetIndexPermissions(table_info.table_id, index_info.table_id);
753
4
}
754
755
Result<IndexPermissions> YBClient::WaitUntilIndexPermissionsAtLeast(
756
    const TableId& table_id,
757
    const TableId& index_id,
758
    const IndexPermissions& target_index_permissions,
759
    const CoarseTimePoint deadline,
760
0
    const CoarseDuration max_wait) {
761
0
  return data_->WaitUntilIndexPermissionsAtLeast(
762
0
      this,
763
0
      table_id,
764
0
      index_id,
765
0
      target_index_permissions,
766
0
      deadline,
767
0
      max_wait);
768
0
}
769
770
Result<IndexPermissions> YBClient::WaitUntilIndexPermissionsAtLeast(
771
    const TableId& table_id,
772
    const TableId& index_id,
773
    const IndexPermissions& target_index_permissions,
774
0
    const CoarseDuration max_wait) {
775
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
776
0
  return WaitUntilIndexPermissionsAtLeast(
777
0
      table_id,
778
0
      index_id,
779
0
      target_index_permissions,
780
0
      deadline,
781
0
      max_wait);
782
0
}
783
784
Result<IndexPermissions> YBClient::WaitUntilIndexPermissionsAtLeast(
785
    const YBTableName& table_name,
786
    const YBTableName& index_name,
787
    const IndexPermissions& target_index_permissions,
788
    const CoarseTimePoint deadline,
789
65
    const CoarseDuration max_wait) {
790
65
  return data_->WaitUntilIndexPermissionsAtLeast(
791
65
      this,
792
65
      table_name,
793
65
      index_name,
794
65
      target_index_permissions,
795
65
      deadline,
796
65
      max_wait);
797
65
}
798
799
Result<IndexPermissions> YBClient::WaitUntilIndexPermissionsAtLeast(
800
    const YBTableName& table_name,
801
    const YBTableName& index_name,
802
    const IndexPermissions& target_index_permissions,
803
41
    const CoarseDuration max_wait) {
804
41
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
805
41
  return WaitUntilIndexPermissionsAtLeast(
806
41
      table_name,
807
41
      index_name,
808
41
      target_index_permissions,
809
41
      deadline,
810
41
      max_wait);
811
41
}
812
813
Status YBClient::CreateNamespace(const std::string& namespace_name,
814
                                 const boost::optional<YQLDatabase>& database_type,
815
                                 const std::string& creator_role_name,
816
                                 const std::string& namespace_id,
817
                                 const std::string& source_namespace_id,
818
                                 const boost::optional<uint32_t>& next_pg_oid,
819
                                 const TransactionMetadata* txn,
820
                                 const bool colocated,
821
2.10k
                                 CoarseTimePoint deadline) {
822
2.10k
  CreateNamespaceRequestPB req;
823
2.10k
  CreateNamespaceResponsePB resp;
824
2.10k
  req.set_name(namespace_name);
825
2.10k
  if (!creator_role_name.empty()) {
826
904
    req.set_creator_role_name(creator_role_name);
827
904
  }
828
2.10k
  if (database_type) {
829
2.08k
    req.set_database_type(*database_type);
830
2.08k
  }
831
2.10k
  if (!namespace_id.empty()) {
832
229
    req.set_namespace_id(namespace_id);
833
229
  }
834
2.10k
  if (!source_namespace_id.empty()) {
835
123
    req.set_source_namespace_id(source_namespace_id);
836
123
  }
837
2.10k
  if (next_pg_oid) {
838
134
    req.set_next_pg_oid(*next_pg_oid);
839
134
  }
840
2.10k
  if (txn) {
841
114
    txn->ToPB(req.mutable_transaction());
842
114
  }
843
2.10k
  req.set_colocated(colocated);
844
2.10k
  deadline = PatchAdminDeadline(deadline);
845
2.10k
  RETURN_NOT_OK(data_->SyncLeaderMasterRpc(
846
2.10k
      deadline, req, &resp, "CreateNamespace", &MasterDdlProxy::CreateNamespaceAsync));
847
2.09k
  std::string cur_id = resp.has_id() ? 
resp.id()2.08k
:
namespace_id13
;
848
849
  // Verify that the namespace we found is running so that, once this request returns,
850
  // the client can send operations without receiving a "namespace not found" error.
851
2.09k
  RETURN_NOT_OK(data_->WaitForCreateNamespaceToFinish(
852
2.09k
      this, namespace_name, database_type, cur_id, deadline));
853
854
2.09k
  return Status::OK();
855
2.09k
}
856
857
Status YBClient::CreateNamespaceIfNotExists(const std::string& namespace_name,
858
                                            const boost::optional<YQLDatabase>& database_type,
859
                                            const std::string& creator_role_name,
860
                                            const std::string& namespace_id,
861
                                            const std::string& source_namespace_id,
862
                                            const boost::optional<uint32_t>& next_pg_oid,
863
332
                                            const bool colocated) {
864
332
  const auto namespace_exists = VERIFY_RESULT(
865
332
      !namespace_id.empty() ? NamespaceIdExists(namespace_id)
866
332
                            : NamespaceExists(namespace_name));
867
332
  if (namespace_exists) {
868
    // Verify that the namespace we found is running so that, once this request returns,
869
    // the client can send operations without receiving a "namespace not found" error.
870
32
    return data_->WaitForCreateNamespaceToFinish(this, namespace_name, database_type, namespace_id,
871
32
        CoarseMonoClock::Now() + default_admin_operation_timeout());
872
32
  }
873
874
300
  Status s = CreateNamespace(namespace_name, database_type, creator_role_name, namespace_id,
875
300
                             source_namespace_id, next_pg_oid, nullptr /* txn */, colocated);
876
300
  if (s.IsAlreadyPresent() && 
database_type0
&&
*database_type == YQLDatabase::YQL_DATABASE_CQL0
) {
877
0
    return Status::OK();
878
0
  }
879
300
  return s;
880
300
}
881
882
Status YBClient::IsCreateNamespaceInProgress(const std::string& namespace_name,
883
                                             const boost::optional<YQLDatabase>& database_type,
884
                                             const std::string& namespace_id,
885
0
                                             bool *create_in_progress) {
886
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
887
0
  return data_->IsCreateNamespaceInProgress(this, namespace_name, database_type, namespace_id,
888
0
                                            deadline, create_in_progress);
889
0
}
890
891
Status YBClient::DeleteNamespace(const std::string& namespace_name,
892
                                 const boost::optional<YQLDatabase>& database_type,
893
                                 const std::string& namespace_id,
894
1.60k
                                 CoarseTimePoint deadline) {
895
1.60k
  DeleteNamespaceRequestPB req;
896
1.60k
  DeleteNamespaceResponsePB resp;
897
1.60k
  req.mutable_namespace_()->set_name(namespace_name);
898
1.60k
  if (!namespace_id.empty()) {
899
72
    req.mutable_namespace_()->set_id(namespace_id);
900
72
  }
901
1.60k
  if (database_type) {
902
72
    req.set_database_type(*database_type);
903
72
    req.mutable_namespace_()->set_database_type(*database_type);
904
72
  }
905
1.60k
  deadline = PatchAdminDeadline(deadline);
906
1.60k
  RETURN_NOT_OK(data_->SyncLeaderMasterRpc(
907
1.60k
      deadline, req, &resp, "DeleteNamespace", &MasterDdlProxy::DeleteNamespaceAsync));
908
909
  // Verify that, once this request returns, the namespace has been successfully marked as deleted.
910
1.60k
  RETURN_NOT_OK(data_->WaitForDeleteNamespaceToFinish(this, namespace_name, database_type,
911
1.60k
      namespace_id, CoarseMonoClock::Now() + default_admin_operation_timeout()));
912
913
1.60k
  return Status::OK();
914
1.60k
}
915
916
Status YBClient::IsDeleteNamespaceInProgress(const std::string& namespace_name,
917
                                             const boost::optional<YQLDatabase>& database_type,
918
                                             const std::string& namespace_id,
919
0
                                             bool *delete_in_progress) {
920
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
921
0
  return data_->IsDeleteNamespaceInProgress(this, namespace_name, database_type, namespace_id,
922
0
                                            deadline, delete_in_progress);
923
0
}
924
925
YBNamespaceAlterer* YBClient::NewNamespaceAlterer(
926
3
    const string& namespace_name, const std::string& namespace_id) {
927
3
  return new YBNamespaceAlterer(this, namespace_name, namespace_id);
928
3
}
929
930
Result<vector<master::NamespaceIdentifierPB>> YBClient::ListNamespaces(
931
5.08k
    const boost::optional<YQLDatabase>& database_type) {
932
5.08k
  ListNamespacesRequestPB req;
933
5.08k
  ListNamespacesResponsePB resp;
934
5.08k
  if (database_type) {
935
275
    req.set_database_type(*database_type);
936
275
  }
937
5.08k
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListNamespaces);
938
5.08k
  auto* namespaces = resp.mutable_namespaces();
939
5.08k
  vector<master::NamespaceIdentifierPB> result;
940
5.08k
  result.reserve(namespaces->size());
941
22.9k
  for (auto& ns : *namespaces) {
942
22.9k
    result.push_back(std::move(ns));
943
22.9k
  }
944
5.08k
  return result;
945
5.08k
}
946
947
Status YBClient::GetNamespaceInfo(const std::string& namespace_id,
948
                                  const std::string& namespace_name,
949
                                  const boost::optional<YQLDatabase>& database_type,
950
6.03k
                                  master::GetNamespaceInfoResponsePB* ret) {
951
6.03k
  GetNamespaceInfoRequestPB req;
952
6.03k
  GetNamespaceInfoResponsePB resp;
953
954
6.03k
  if (!namespace_id.empty()) {
955
5.71k
    req.mutable_namespace_()->set_id(namespace_id);
956
5.71k
  }
957
6.03k
  if (!namespace_name.empty()) {
958
312
    req.mutable_namespace_()->set_name(namespace_name);
959
312
  }
960
6.03k
  if (database_type) {
961
6.03k
    req.mutable_namespace_()->set_database_type(*database_type);
962
6.03k
  }
963
964
6.03k
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, GetNamespaceInfo);
965
6.03k
  ret->Swap(&resp);
966
6.03k
  return Status::OK();
967
6.03k
}
968
969
Status YBClient::ReservePgsqlOids(const std::string& namespace_id,
970
                                  const uint32_t next_oid, const uint32_t count,
971
805
                                  uint32_t* begin_oid, uint32_t* end_oid) {
972
805
  ReservePgsqlOidsRequestPB req;
973
805
  ReservePgsqlOidsResponsePB resp;
974
805
  req.set_namespace_id(namespace_id);
975
805
  req.set_next_oid(next_oid);
976
805
  req.set_count(count);
977
805
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, ReservePgsqlOids);
978
805
  *begin_oid = resp.begin_oid();
979
805
  *end_oid = resp.end_oid();
980
805
  return Status::OK();
981
805
}
982
983
22
Status YBClient::GetYsqlCatalogMasterVersion(uint64_t *ysql_catalog_version) {
984
22
  GetYsqlCatalogConfigRequestPB req;
985
22
  GetYsqlCatalogConfigResponsePB resp;
986
22
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, GetYsqlCatalogConfig);
987
22
  *ysql_catalog_version = resp.version();
988
22
  return Status::OK();
989
22
}
990
991
Status YBClient::GrantRevokePermission(GrantRevokeStatementType statement_type,
992
                                       const PermissionType& permission,
993
                                       const ResourceType& resource_type,
994
                                       const std::string& canonical_resource,
995
                                       const char* resource_name,
996
                                       const char* namespace_name,
997
721
                                       const std::string& role_name) {
998
  // Setting up request.
999
721
  GrantRevokePermissionRequestPB req;
1000
721
  req.set_role_name(role_name);
1001
721
  req.set_canonical_resource(canonical_resource);
1002
721
  if (resource_name != nullptr) {
1003
520
    req.set_resource_name(resource_name);
1004
520
  }
1005
721
  if (namespace_name != nullptr) {
1006
435
    req.mutable_namespace_()->set_name(namespace_name);
1007
435
  }
1008
721
  req.set_resource_type(resource_type);
1009
721
  req.set_permission(permission);
1010
1011
721
  req.set_revoke(statement_type == GrantRevokeStatementType::REVOKE);
1012
1013
721
  GrantRevokePermissionResponsePB resp;
1014
721
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, GrantRevokePermission);
1015
721
  return Status::OK();
1016
721
}
1017
1018
Result<bool> YBClient::NamespaceExists(const std::string& namespace_name,
1019
4.96k
                                       const boost::optional<YQLDatabase>& database_type) {
1020
14.7k
  for (const auto& ns : 
VERIFY_RESULT4.96k
(ListNamespaces(database_type)))4.96k
{
1021
14.7k
    if (ns.name() == namespace_name) {
1022
4.60k
      return true;
1023
4.60k
    }
1024
14.7k
  }
1025
355
  return false;
1026
4.96k
}
1027
1028
Result<bool> YBClient::NamespaceIdExists(const std::string& namespace_id,
1029
95
                                         const boost::optional<YQLDatabase>& database_type) {
1030
768
  for (const auto& ns : 
VERIFY_RESULT95
(ListNamespaces(database_type)))95
{
1031
768
    if (ns.id() == namespace_id) {
1032
0
      return true;
1033
0
    }
1034
768
  }
1035
95
  return false;
1036
95
}
1037
1038
Status YBClient::CreateTablegroup(const std::string& namespace_name,
1039
                                  const std::string& namespace_id,
1040
                                  const std::string& tablegroup_id,
1041
55
                                  const std::string& tablespace_id) {
1042
55
  CreateTablegroupRequestPB req;
1043
55
  CreateTablegroupResponsePB resp;
1044
55
  req.set_id(tablegroup_id);
1045
55
  req.set_namespace_id(namespace_id);
1046
55
  req.set_namespace_name(namespace_name);
1047
1048
55
  if (!tablespace_id.empty()) {
1049
10
    req.set_tablespace_id(tablespace_id);
1050
10
  }
1051
1052
55
  int attempts = 0;
1053
55
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1054
1055
55
  Status s = data_->SyncLeaderMasterRpc(
1056
55
      deadline, req, &resp, "CreateTablegroup", &MasterDdlProxy::CreateTablegroupAsync,
1057
55
      &attempts);
1058
1059
  // This case should not happen but need to validate contents since fields are optional in PB.
1060
55
  if (!resp.has_parent_table_id() || !resp.has_parent_table_name()) {
1061
0
    return STATUS(NotFound, "Parent table information not found in CREATE TABLEGROUP response.");
1062
0
  }
1063
1064
55
  const YBTableName table_name(YQL_DATABASE_PGSQL, namespace_name, resp.parent_table_name());
1065
1066
  // Handle special cases based on resp.error().
1067
55
  if (resp.has_error()) {
1068
2
    LOG_IF(DFATAL, s.ok()) << "Expecting error status if response has error: " <<
1069
0
        resp.error().code() << " Status: " << resp.error().status().ShortDebugString();
1070
1071
2
    if (resp.error().code() == master::MasterErrorPB::OBJECT_ALREADY_PRESENT && 
attempts > 10
) {
1072
      // If the table already exists and the number of attempts is >
1073
      // 1, then it means we may have succeeded in creating the
1074
      // table, but client didn't receive the successful
1075
      // response (e.g., due to failure before the successful
1076
      // response could be sent back, or due to a I/O pause or a
1077
      // network blip leading to a timeout, etc...)
1078
0
      YBTableInfo info;
1079
1080
      // A fix for https://yugabyte.atlassian.net/browse/ENG-529:
1081
      // If we've been retrying table creation, and the table is now in the process is being
1082
      // created, we can sometimes see an empty schema. Wait until the table is fully created
1083
      // before we compare the schema.
1084
0
      RETURN_NOT_OK_PREPEND(
1085
0
          data_->WaitForCreateTableToFinish(this, table_name, resp.parent_table_id(), deadline),
1086
0
          strings::Substitute("Failed waiting for table $0 to finish being created",
1087
0
                              table_name.ToString()));
1088
1089
0
      RETURN_NOT_OK_PREPEND(
1090
0
          data_->GetTableSchema(this, table_name, deadline, &info),
1091
0
          strings::Substitute("Unable to check the schema of table $0", table_name.ToString()));
1092
1093
0
      YBSchemaBuilder schemaBuilder;
1094
0
      schemaBuilder.AddColumn("parent_column")->Type(BINARY)->PrimaryKey()->NotNull();
1095
0
      YBSchema ybschema;
1096
0
      CHECK_OK(schemaBuilder.Build(&ybschema));
1097
1098
0
      if (!ybschema.Equals(info.schema)) {
1099
0
         string msg = Format("Table $0 already exists with a different "
1100
0
                             "schema. Requested schema was: $1, actual schema is: $2",
1101
0
                             table_name,
1102
0
                             internal::GetSchema(ybschema),
1103
0
                             internal::GetSchema(info.schema));
1104
0
        LOG(ERROR) << msg;
1105
0
        return STATUS(AlreadyPresent, msg);
1106
0
      }
1107
1108
0
      return Status::OK();
1109
0
    }
1110
1111
2
    return StatusFromPB(resp.error().status());
1112
2
  }
1113
1114
  // Wait for create table to finish.
1115
53
  RETURN_NOT_OK_PREPEND(
1116
53
      data_->WaitForCreateTableToFinish(this, table_name, resp.parent_table_id(), deadline),
1117
53
      strings::Substitute("Failed waiting for parent table $0 to finish being created",
1118
53
                          table_name.ToString()));
1119
1120
53
  return Status::OK();
1121
53
}
1122
1123
Status YBClient::DeleteTablegroup(const std::string& namespace_id,
1124
39
                                  const std::string& tablegroup_id) {
1125
39
  DeleteTablegroupRequestPB req;
1126
39
  DeleteTablegroupResponsePB resp;
1127
39
  req.set_id(tablegroup_id);
1128
39
  req.set_namespace_id(namespace_id);
1129
1130
39
  int attempts = 0;
1131
39
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1132
1133
39
  Status s = data_->SyncLeaderMasterRpc(
1134
39
      deadline, req, &resp, "DeleteTablegroup", &MasterDdlProxy::DeleteTablegroupAsync,
1135
39
      &attempts);
1136
1137
  // This case should not happen but need to validate contents since fields are optional in PB.
1138
39
  if (!resp.has_parent_table_id()) {
1139
0
    return STATUS(NotFound, "Parent table information not found in DELETE TABLEGROUP response.");
1140
0
  }
1141
1142
  // Handle special cases based on resp.error().
1143
39
  if (resp.has_error()) {
1144
0
    LOG_IF(DFATAL, s.ok()) << "Expecting error status if response has error: " <<
1145
0
        resp.error().code() << " Status: " << resp.error().status().ShortDebugString();
1146
1147
0
    if (resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND && attempts > 1) {
1148
      // A prior attempt to delete the table has succeeded, but
1149
      // appeared as a failure to the client due to, e.g., an I/O or
1150
      // network issue.
1151
0
      LOG(INFO) << "Parent table for tablegroup with ID " << tablegroup_id << " already deleted.";
1152
0
      return Status::OK();
1153
0
    } else {
1154
0
      return StatusFromPB(resp.error().status());
1155
0
    }
1156
39
  } else {
1157
    // Check the status only if the response has no error.
1158
39
    RETURN_NOT_OK(s);
1159
39
  }
1160
1161
  // Spin until the table is deleted. Currently only waits till the table reaches DELETING state
1162
  // See github issue #5290
1163
39
  RETURN_NOT_OK_PREPEND(data_->WaitForDeleteTableToFinish(this,
1164
39
                                                          resp.parent_table_id(),
1165
39
                                                          deadline),
1166
39
      strings::Substitute("Failed waiting for parent table with id $0 to finish being deleted",
1167
39
                          resp.parent_table_id()));
1168
1169
39
  LOG(INFO) << "Deleted parent table for tablegroup with ID " << tablegroup_id;
1170
39
  return Status::OK();
1171
39
}
1172
1173
Result<vector<master::TablegroupIdentifierPB>>
1174
5
YBClient::ListTablegroups(const std::string& namespace_name) {
1175
5
  GetNamespaceInfoResponsePB ret;
1176
5
  Status s = GetNamespaceInfo("", namespace_name, YQL_DATABASE_PGSQL, &ret);
1177
5
  if (!s.ok()) {
1178
0
    return s;
1179
0
  }
1180
1181
5
  ListTablegroupsRequestPB req;
1182
5
  ListTablegroupsResponsePB resp;
1183
1184
5
  req.set_namespace_id(ret.namespace_().id());
1185
5
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListTablegroups);
1186
5
  auto* tablegroups = resp.mutable_tablegroups();
1187
5
  vector<master::TablegroupIdentifierPB> result;
1188
5
  result.reserve(tablegroups->size());
1189
7
  for (auto& tg : *tablegroups) {
1190
7
    result.push_back(std::move(tg));
1191
7
  }
1192
5
  return result;
1193
5
}
1194
1195
Result<bool> YBClient::TablegroupExists(const std::string& namespace_name,
1196
5
                                        const std::string& tablegroup_id) {
1197
1198
5
  for (const auto& tg : VERIFY_RESULT(ListTablegroups(namespace_name))) {
1199
5
    if (tg.id().compare(tablegroup_id) == 0) {
1200
4
      return true;
1201
4
    }
1202
5
  }
1203
1
  return false;
1204
5
}
1205
1206
Status YBClient::GetUDType(const std::string& namespace_name,
1207
                           const std::string& type_name,
1208
56
                           std::shared_ptr<QLType>* ql_type) {
1209
  // Setting up request.
1210
56
  GetUDTypeInfoRequestPB req;
1211
56
  req.mutable_type()->mutable_namespace_()->set_name(namespace_name);
1212
56
  req.mutable_type()->set_type_name(type_name);
1213
1214
  // Sending request.
1215
56
  GetUDTypeInfoResponsePB resp;
1216
56
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, GetUDTypeInfo);
1217
1218
  // Filling in return values.
1219
49
  std::vector<string> field_names;
1220
95
  for (const auto& field_name : resp.udtype().field_names()) {
1221
95
    field_names.push_back(field_name);
1222
95
  }
1223
1224
49
  std::vector<shared_ptr<QLType>> field_types;
1225
95
  for (const auto& field_type : resp.udtype().field_types()) {
1226
95
    field_types.push_back(QLType::FromQLTypePB(field_type));
1227
95
  }
1228
1229
49
  (*ql_type)->SetUDTypeFields(resp.udtype().id(), field_names, field_types);
1230
1231
49
  return Status::OK();
1232
56
}
1233
1234
Status YBClient::CreateRole(const RoleName& role_name,
1235
                            const std::string& salted_hash,
1236
                            const bool login, const bool superuser,
1237
757
                            const RoleName& creator_role_name) {
1238
1239
  // Setting up request.
1240
757
  CreateRoleRequestPB req;
1241
757
  req.set_salted_hash(salted_hash);
1242
757
  req.set_name(role_name);
1243
757
  req.set_login(login);
1244
757
  req.set_superuser(superuser);
1245
1246
757
  if (!creator_role_name.empty()) {
1247
757
    req.set_creator_role_name(creator_role_name);
1248
757
  }
1249
1250
757
  CreateRoleResponsePB resp;
1251
757
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, CreateRole);
1252
751
  return Status::OK();
1253
757
}
1254
1255
Status YBClient::AlterRole(const RoleName& role_name,
1256
                           const boost::optional<std::string>& salted_hash,
1257
                           const boost::optional<bool> login,
1258
                           const boost::optional<bool> superuser,
1259
58
                           const RoleName& current_role_name) {
1260
  // Setting up request.
1261
58
  AlterRoleRequestPB req;
1262
58
  req.set_name(role_name);
1263
58
  if (salted_hash) {
1264
9
    req.set_salted_hash(*salted_hash);
1265
9
  }
1266
58
  if (login) {
1267
13
    req.set_login(*login);
1268
13
  }
1269
58
  if (superuser) {
1270
46
    req.set_superuser(*superuser);
1271
46
  }
1272
58
  req.set_current_role(current_role_name);
1273
1274
58
  AlterRoleResponsePB resp;
1275
58
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, AlterRole);
1276
45
  return Status::OK();
1277
58
}
1278
1279
Status YBClient::DeleteRole(const std::string& role_name,
1280
730
                            const std::string& current_role_name) {
1281
  // Setting up request.
1282
730
  DeleteRoleRequestPB req;
1283
730
  req.set_name(role_name);
1284
730
  req.set_current_role(current_role_name);
1285
1286
730
  DeleteRoleResponsePB resp;
1287
730
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, DeleteRole);
1288
724
  return Status::OK();
1289
730
}
1290
1291
static const string kRequirePass = "requirepass";
1292
182
Status YBClient::SetRedisPasswords(const std::vector<string>& passwords) {
1293
  // TODO: Store hash instead of the password?
1294
182
  return SetRedisConfig(kRequirePass, passwords);
1295
182
}
1296
1297
1.17k
Status YBClient::GetRedisPasswords(vector<string>* passwords) {
1298
1.17k
  Status s = GetRedisConfig(kRequirePass, passwords);
1299
1.17k
  if (s.IsNotFound()) {
1300
    // If the redis config has no kRequirePass key.
1301
821
    passwords->clear();
1302
821
    s = Status::OK();
1303
821
  }
1304
1.17k
  return s;
1305
1.17k
}
1306
1307
182
Status YBClient::SetRedisConfig(const string& key, const vector<string>& values) {
1308
  // Setting up request.
1309
182
  RedisConfigSetRequestPB req;
1310
182
  req.set_keyword(key);
1311
182
  for (const auto& value : values) {
1312
182
    req.add_args(value);
1313
182
  }
1314
182
  RedisConfigSetResponsePB resp;
1315
182
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, RedisConfigSet);
1316
182
  return Status::OK();
1317
182
}
1318
1319
1.17k
Status YBClient::GetRedisConfig(const string& key, vector<string>* values) {
1320
  // Setting up request.
1321
1.17k
  RedisConfigGetRequestPB req;
1322
1.17k
  RedisConfigGetResponsePB resp;
1323
1.17k
  req.set_keyword(key);
1324
1.17k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, RedisConfigGet);
1325
355
  values->clear();
1326
355
  for (const auto& arg : resp.args())
1327
355
    values->push_back(arg);
1328
355
  return Status::OK();
1329
1.17k
}
1330
1331
Status YBClient::GrantRevokeRole(GrantRevokeStatementType statement_type,
1332
                                 const std::string& granted_role_name,
1333
52
                                 const std::string& recipient_role_name) {
1334
  // Setting up request.
1335
52
  GrantRevokeRoleRequestPB req;
1336
52
  req.set_revoke(statement_type == GrantRevokeStatementType::REVOKE);
1337
52
  req.set_granted_role(granted_role_name);
1338
52
  req.set_recipient_role(recipient_role_name);
1339
1340
52
  GrantRevokeRoleResponsePB resp;
1341
52
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, GrantRevokeRole);
1342
49
  return Status::OK();
1343
52
}
1344
1345
117k
Status YBClient::GetPermissions(client::internal::PermissionsCache* permissions_cache) {
1346
117k
  if (!permissions_cache) {
1347
0
    DFATAL_OR_RETURN_NOT_OK(STATUS(InvalidArgument, "Invalid null permissions_cache"));
1348
0
  }
1349
1350
117k
  boost::optional<uint64_t> version = permissions_cache->version();
1351
1352
  // Setting up request.
1353
117k
  GetPermissionsRequestPB req;
1354
117k
  if (version) {
1355
115k
    req.set_if_version_greater_than(*version);
1356
115k
  }
1357
1358
117k
  GetPermissionsResponsePB resp;
1359
117k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Dcl, req, resp, GetPermissions);
1360
1361
117k
  VLOG
(1) << "Got permissions cache: " << resp.ShortDebugString()533
;
1362
1363
  // The first request is a special case. We always replace the cache since we don't have anything.
1364
117k
  if (!version) {
1365
    // We should at least receive cassandra's permissions.
1366
1.45k
    if (resp.role_permissions_size() == 0) {
1367
0
      DFATAL_OR_RETURN_NOT_OK(
1368
0
          STATUS(IllegalState, "Received invalid empty permissions cache from master"));
1369
1370
0
    }
1371
115k
  } else if (resp.version() == *version) {
1372
      // No roles should have been received if both versions match.
1373
107k
      if (resp.role_permissions_size() != 0) {
1374
0
        DFATAL_OR_RETURN_NOT_OK(STATUS(IllegalState,
1375
0
            "Received permissions cache when none was expected because the master's "
1376
0
            "permissions versions is equal to the client's version"));
1377
0
      }
1378
      // Nothing to update.
1379
107k
      return Status::OK();
1380
107k
  } else 
if (8.59k
resp.version() < *version8.59k
) {
1381
    // If the versions don't match, then the master's version has to be greater than ours.
1382
0
    DFATAL_OR_RETURN_NOT_OK(STATUS_SUBSTITUTE(IllegalState,
1383
0
        "Client's permissions version $0 can't be greater than the master's permissions version $1",
1384
0
        *version, resp.version()));
1385
0
  }
1386
1387
10.0k
  permissions_cache->UpdateRolesPermissions(resp);
1388
10.0k
  return Status::OK();
1389
117k
}
1390
1391
Status YBClient::CreateUDType(const std::string& namespace_name,
1392
                              const std::string& type_name,
1393
                              const std::vector<std::string>& field_names,
1394
47
                              const std::vector<std::shared_ptr<QLType>>& field_types) {
1395
  // Setting up request.
1396
47
  CreateUDTypeRequestPB req;
1397
47
  req.mutable_namespace_()->set_name(namespace_name);
1398
47
  req.set_name(type_name);
1399
89
  for (const string& field_name : field_names) {
1400
89
    req.add_field_names(field_name);
1401
89
  }
1402
89
  for (const std::shared_ptr<QLType>& field_type : field_types) {
1403
89
    field_type->ToQLTypePB(req.add_field_types());
1404
89
  }
1405
1406
47
  CreateUDTypeResponsePB resp;
1407
47
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, CreateUDType);
1408
46
  return Status::OK();
1409
47
}
1410
1411
Status YBClient::DeleteUDType(const std::string& namespace_name,
1412
54
                              const std::string& type_name) {
1413
  // Setting up request.
1414
54
  DeleteUDTypeRequestPB req;
1415
54
  req.mutable_type()->mutable_namespace_()->set_name(namespace_name);
1416
54
  req.mutable_type()->set_type_name(type_name);
1417
1418
54
  DeleteUDTypeResponsePB resp;
1419
54
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, DeleteUDType);
1420
46
  return Status::OK();
1421
54
}
1422
1423
Result<CDCStreamId> YBClient::CreateCDCStream(
1424
    const TableId& table_id,
1425
    const std::unordered_map<std::string, std::string>& options,
1426
    bool active,
1427
5.49k
    const CDCStreamId& db_stream_id) {
1428
  // Setting up request.
1429
5.49k
  CreateCDCStreamRequestPB req;
1430
5.49k
  req.set_table_id(table_id);
1431
5.49k
  if (!db_stream_id.empty()) {
1432
5.18k
    req.set_db_stream_id(db_stream_id);
1433
5.18k
  }
1434
5.49k
  req.mutable_options()->Reserve(narrow_cast<int>(options.size()));
1435
22.2k
  for (const auto& option : options) {
1436
22.2k
    auto new_option = req.add_options();
1437
22.2k
    new_option->set_key(option.first);
1438
22.2k
    new_option->set_value(option.second);
1439
22.2k
  }
1440
5.49k
  req.set_initial_state(active ? master::SysCDCStreamEntryPB::ACTIVE
1441
5.49k
                               : 
master::SysCDCStreamEntryPB::INITIATED0
);
1442
1443
5.49k
  CreateCDCStreamResponsePB resp;
1444
5.49k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, CreateCDCStream);
1445
5.49k
  return resp.stream_id();
1446
5.49k
}
1447
1448
void YBClient::CreateCDCStream(const TableId& table_id,
1449
                               const std::unordered_map<std::string, std::string>& options,
1450
0
                               CreateCDCStreamCallback callback) {
1451
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1452
0
  data_->CreateCDCStream(this, table_id, options, deadline, callback);
1453
0
}
1454
1455
Status YBClient::GetCDCStream(const CDCStreamId& stream_id,
1456
                              NamespaceId* ns_id,
1457
                              std::vector<ObjectId>* object_ids,
1458
1
                              std::unordered_map<std::string, std::string>* options) {
1459
  // Setting up request.
1460
1
  GetCDCStreamRequestPB req;
1461
1
  req.set_stream_id(stream_id);
1462
1463
  // Sending request.
1464
1
  GetCDCStreamResponsePB resp;
1465
1
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, GetCDCStream);
1466
1467
  // Filling in return values.
1468
0
  if (resp.stream().has_namespace_id()) {
1469
0
    *ns_id = resp.stream().namespace_id();
1470
0
  }
1471
1472
0
  for (auto id : resp.stream().table_id()) {
1473
0
    object_ids->push_back(id);
1474
0
  }
1475
1476
0
  options->clear();
1477
0
  options->reserve(resp.stream().options_size());
1478
0
  for (const auto& option : resp.stream().options()) {
1479
0
    options->emplace(option.key(), option.value());
1480
0
  }
1481
1482
0
  if (!resp.stream().has_namespace_id()) {
1483
0
    options->emplace(cdc::kIdType, cdc::kTableId);
1484
0
  }
1485
1486
0
  return Status::OK();
1487
1
}
1488
1489
void YBClient::GetCDCStream(const CDCStreamId& stream_id,
1490
                            std::shared_ptr<TableId> table_id,
1491
                            std::shared_ptr<std::unordered_map<std::string, std::string>> options,
1492
0
                            StdStatusCallback callback) {
1493
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1494
0
  data_->GetCDCStream(this, stream_id, table_id, options, deadline, callback);
1495
0
}
1496
1497
Status YBClient::DeleteCDCStream(const vector<CDCStreamId>& streams,
1498
                                 bool force_delete,
1499
                                 bool ignore_errors,
1500
0
                                 master::DeleteCDCStreamResponsePB* ret) {
1501
0
  if (streams.empty()) {
1502
0
    return STATUS(InvalidArgument, "At least one stream id should be provided");
1503
0
  }
1504
1505
  // Setting up request.
1506
0
  DeleteCDCStreamRequestPB req;
1507
0
  req.mutable_stream_id()->Reserve(narrow_cast<int>(streams.size()));
1508
0
  for (const auto& stream : streams) {
1509
0
    req.add_stream_id(stream);
1510
0
  }
1511
0
  req.set_force_delete(force_delete);
1512
0
  req.set_ignore_errors(ignore_errors);
1513
1514
0
  if (ret) {
1515
0
    CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, (*ret), DeleteCDCStream);
1516
0
  } else {
1517
0
    DeleteCDCStreamResponsePB resp;
1518
0
    CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, DeleteCDCStream);
1519
0
  }
1520
1521
0
  return Status::OK();
1522
0
}
1523
1524
Status YBClient::DeleteCDCStream(const CDCStreamId& stream_id, bool force_delete,
1525
0
                                 bool ignore_errors) {
1526
  // Setting up request.
1527
0
  DeleteCDCStreamRequestPB req;
1528
0
  req.add_stream_id(stream_id);
1529
0
  req.set_force_delete(force_delete);
1530
0
  req.set_ignore_errors(ignore_errors);
1531
1532
0
  DeleteCDCStreamResponsePB resp;
1533
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, DeleteCDCStream);
1534
0
  return Status::OK();
1535
0
}
1536
1537
0
void YBClient::DeleteCDCStream(const CDCStreamId& stream_id, StatusCallback callback) {
1538
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1539
0
  data_->DeleteCDCStream(this, stream_id, deadline, callback);
1540
0
}
1541
1542
CHECKED_STATUS YBClient::GetCDCDBStreamInfo(
1543
  const std::string &db_stream_id,
1544
0
  std::vector<pair<std::string, std::string>>* db_stream_info) {
1545
  // Setting up request.
1546
0
  GetCDCDBStreamInfoRequestPB req;
1547
0
  req.set_db_stream_id(db_stream_id);
1548
1549
0
  GetCDCDBStreamInfoResponsePB resp;
1550
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, GetCDCDBStreamInfo);
1551
0
  db_stream_info->clear();
1552
0
  db_stream_info->reserve(resp.table_info_size());
1553
0
  for (const auto& tabinfo : resp.table_info()) {
1554
0
    std::string stream_id = tabinfo.stream_id();
1555
0
    std::string table_id = tabinfo.table_id();
1556
1557
0
    db_stream_info->push_back(std::make_pair(stream_id, table_id));
1558
0
  }
1559
1560
0
  return Status::OK();
1561
0
}
1562
1563
void YBClient::GetCDCDBStreamInfo(
1564
    const std::string& db_stream_id,
1565
    const std::shared_ptr<std::vector<pair<std::string, std::string>>>& db_stream_info,
1566
0
    const StdStatusCallback& callback) {
1567
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1568
0
  data_->GetCDCDBStreamInfo(this, db_stream_id, db_stream_info, deadline, callback);
1569
0
}
1570
1571
Status YBClient::UpdateCDCStream(const CDCStreamId& stream_id,
1572
0
                                 const master::SysCDCStreamEntryPB& new_entry) {
1573
0
  if (stream_id.empty()) {
1574
0
    return STATUS(InvalidArgument, "Stream id is required.");
1575
0
  }
1576
1577
  // Setting up request.
1578
0
  UpdateCDCStreamRequestPB req;
1579
0
  req.set_stream_id(stream_id);
1580
0
  req.mutable_entry()->CopyFrom(new_entry);
1581
1582
0
  UpdateCDCStreamResponsePB resp;
1583
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, UpdateCDCStream);
1584
0
  return Status::OK();
1585
0
}
1586
1587
Status YBClient::UpdateConsumerOnProducerSplit(
1588
    const string& producer_id,
1589
    const CDCStreamId& stream_id,
1590
0
    const master::ProducerSplitTabletInfoPB& split_info) {
1591
0
  if (producer_id.empty()) {
1592
0
    return STATUS(InvalidArgument, "Producer id is required.");
1593
0
  }
1594
0
  if (stream_id.empty()) {
1595
0
    return STATUS(InvalidArgument, "Stream id is required.");
1596
0
  }
1597
1598
0
  UpdateConsumerOnProducerSplitRequestPB req;
1599
0
  req.set_producer_id(producer_id);
1600
0
  req.set_stream_id(stream_id);
1601
0
  req.mutable_producer_split_tablet_info()->CopyFrom(split_info);
1602
1603
0
  UpdateConsumerOnProducerSplitResponsePB resp;
1604
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, UpdateConsumerOnProducerSplit);
1605
0
  return Status::OK();
1606
0
}
1607
1608
6
void YBClient::DeleteNotServingTablet(const TabletId& tablet_id, StdStatusCallback callback) {
1609
6
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1610
6
  data_->DeleteNotServingTablet(this, tablet_id, deadline, callback);
1611
6
}
1612
1613
void YBClient::GetTableLocations(
1614
    const TableId& table_id, int32_t max_tablets, RequireTabletsRunning require_tablets_running,
1615
134k
    GetTableLocationsCallback callback) {
1616
134k
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
1617
134k
  data_->GetTableLocations(
1618
134k
      this, table_id, max_tablets, require_tablets_running, deadline, std::move(callback));
1619
134k
}
1620
1621
6.58k
Status YBClient::TabletServerCount(int *tserver_count, bool primary_only, bool use_cache) {
1622
6.58k
  int tserver_count_cached = data_->tserver_count_cached_[primary_only].load(
1623
6.58k
      std::memory_order_acquire);
1624
6.58k
  if (use_cache && 
tserver_count_cached > 0148
) {
1625
137
    *tserver_count = tserver_count_cached;
1626
137
    return Status::OK();
1627
137
  }
1628
1629
6.45k
  ListTabletServersRequestPB req;
1630
6.45k
  ListTabletServersResponsePB resp;
1631
6.45k
  req.set_primary_only(primary_only);
1632
6.45k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, ListTabletServers);
1633
6.45k
  data_->tserver_count_cached_[primary_only].store(resp.servers_size(), std::memory_order_release);
1634
6.45k
  *tserver_count = resp.servers_size();
1635
6.45k
  return Status::OK();
1636
6.45k
}
1637
1638
2
Result<std::vector<YBTabletServer>> YBClient::ListTabletServers() {
1639
2
  ListTabletServersRequestPB req;
1640
2
  ListTabletServersResponsePB resp;
1641
2
  std::vector<YBTabletServer> result;
1642
2
  CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, ListTabletServers);
1643
2
  result.reserve(resp.servers_size());
1644
8
  for (int i = 0; i < resp.servers_size(); 
i++6
) {
1645
6
    const ListTabletServersResponsePB_Entry& e = resp.servers(i);
1646
6
    result.push_back(YBTabletServer::FromPB(e, data_->cloud_info_pb_));
1647
6
  }
1648
2
  return result;
1649
2
}
1650
1651
4
Result<TabletServersInfo> YBClient::ListLiveTabletServers(bool primary_only) {
1652
4
  ListLiveTabletServersRequestPB req;
1653
4
  if (primary_only) {
1654
0
    req.set_primary_only(true);
1655
0
  }
1656
4
  ListLiveTabletServersResponsePB resp;
1657
4
  CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, ListLiveTabletServers);
1658
1659
4
  TabletServersInfo result;
1660
4
  result.resize(resp.servers_size());
1661
16
  for (int i = 0; i < resp.servers_size(); 
i++12
) {
1662
12
    const ListLiveTabletServersResponsePB_Entry& entry = resp.servers(i);
1663
12
    auto& out = result[i];
1664
12
    out.server = YBTabletServer::FromPB(entry, data_->cloud_info_pb_);
1665
12
    const CloudInfoPB& cloud_info = entry.registration().common().cloud_info();
1666
1667
12
    const auto& private_addresses = entry.registration().common().private_rpc_addresses();
1668
12
    if (!private_addresses.empty()) {
1669
12
      out.server.hostname = private_addresses.Get(0).host();
1670
12
    }
1671
1672
12
    const auto& broadcast_addresses = entry.registration().common().broadcast_addresses();
1673
12
    if (!broadcast_addresses.empty()) {
1674
0
      out.public_ip = broadcast_addresses.Get(0).host();
1675
0
    }
1676
1677
12
    out.is_primary = !entry.isfromreadreplica();
1678
12
    if (cloud_info.has_placement_cloud()) {
1679
12
      out.cloud = cloud_info.placement_cloud();
1680
12
      if (cloud_info.has_placement_region()) {
1681
12
        out.region = cloud_info.placement_region();
1682
12
      }
1683
12
      if (cloud_info.has_placement_zone()) {
1684
12
        out.zone = cloud_info.placement_zone();
1685
12
      }
1686
12
    }
1687
12
    out.pg_port = entry.registration().common().pg_port();
1688
12
  }
1689
1690
4
  return result;
1691
4
}
1692
1693
void YBClient::SetLocalTabletServer(const string& ts_uuid,
1694
                                    const shared_ptr<tserver::TabletServerServiceProxy>& proxy,
1695
7.97k
                                    const tserver::LocalTabletServer* local_tserver) {
1696
7.97k
  data_->meta_cache_->SetLocalTabletServer(ts_uuid, proxy, local_tserver);
1697
7.97k
}
1698
1699
0
internal::RemoteTabletServer* YBClient::GetLocalTabletServer() {
1700
0
  return data_->meta_cache_->local_tserver();
1701
0
}
1702
1703
void YBClient::SetNodeLocalForwardProxy(
1704
0
  const shared_ptr<tserver::TabletServerForwardServiceProxy>& proxy) {
1705
0
  data_->node_local_forward_proxy_ = proxy;
1706
0
}
1707
1708
0
std::shared_ptr<tserver::TabletServerForwardServiceProxy>& YBClient::GetNodeLocalForwardProxy() {
1709
0
  return data_->node_local_forward_proxy_;
1710
0
}
1711
1712
0
void YBClient::SetNodeLocalTServerHostPort(const ::yb::HostPort& hostport) {
1713
0
  data_->node_local_tserver_host_port_ = hostport;
1714
0
}
1715
1716
0
const ::yb::HostPort& YBClient::GetNodeLocalTServerHostPort() {
1717
0
  return data_->node_local_tserver_host_port_;
1718
0
}
1719
1720
154
Result<bool> YBClient::IsLoadBalanced(uint32_t num_servers) {
1721
154
  IsLoadBalancedRequestPB req;
1722
154
  IsLoadBalancedResponsePB resp;
1723
1724
154
  req.set_expected_num_servers(num_servers);
1725
  // Cannot use CALL_SYNC_LEADER_MASTER_RPC directly since this is substituted with RETURN_NOT_OK
1726
  // and we want to capture the status to check if load is balanced.
1727
154
  Status s = [&, this]() -> Status {
1728
154
    CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, IsLoadBalanced);
1729
2
    return Status::OK();
1730
154
  }();
1731
154
  return s.ok();
1732
154
}
1733
1734
2.23k
Result<bool> YBClient::IsLoadBalancerIdle() {
1735
2.23k
  IsLoadBalancerIdleRequestPB req;
1736
2.23k
  IsLoadBalancerIdleResponsePB resp;
1737
1738
2.23k
  Status s = [&]() -> Status {
1739
2.23k
    CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, IsLoadBalancerIdle);
1740
450
    return Status::OK();
1741
2.23k
  }();
1742
1743
2.23k
  if (s.ok()) {
1744
450
    return true;
1745
1.78k
  } else if (master::MasterError(s) == master::MasterErrorPB::LOAD_BALANCER_RECENTLY_ACTIVE) {
1746
1.78k
    return false;
1747
1.78k
  } else {
1748
0
    return s;
1749
0
  }
1750
2.23k
}
1751
1752
Status YBClient::ModifyTablePlacementInfo(const YBTableName& table_name,
1753
4
                                          master::PlacementInfoPB* replicas) {
1754
4
  master::ReplicationInfoPB replication_info;
1755
  // Merge the obtained info with the existing table replication info.
1756
4
  std::shared_ptr<client::YBTable> table;
1757
4
  RETURN_NOT_OK_PREPEND(OpenTable(table_name, &table), "Fetching table schema failed!");
1758
1759
  // If it does not exist, fetch the cluster replication info.
1760
4
  if (!table->replication_info()) {
1761
2
    GetMasterClusterConfigRequestPB req;
1762
2
    GetMasterClusterConfigResponsePB resp;
1763
2
    CALL_SYNC_LEADER_MASTER_RPC_EX(Cluster, req, resp, GetMasterClusterConfig);
1764
2
    master::SysClusterConfigEntryPB* sys_cluster_config_entry = resp.mutable_cluster_config();
1765
2
    replication_info.CopyFrom(sys_cluster_config_entry->replication_info());
1766
    // TODO(bogdan): Figure out how to handle read replias and leader affinity.
1767
2
    replication_info.clear_read_replicas();
1768
2
    replication_info.clear_affinitized_leaders();
1769
2
  } else {
1770
    // Table replication info exists, copy it over.
1771
2
    replication_info.CopyFrom(table->replication_info().get());
1772
2
  }
1773
1774
  // Put in the new live placement info.
1775
4
  replication_info.set_allocated_live_replicas(replicas);
1776
1777
4
  std::unique_ptr<yb::client::YBTableAlterer> table_alterer(NewTableAlterer(table_name));
1778
4
  return table_alterer->replication_info(replication_info)->Alter();
1779
4
}
1780
1781
0
Status YBClient::CreateTransactionsStatusTable(const string& table_name) {
1782
0
  if (table_name.rfind(kTransactionTablePrefix, 0) != 0) {
1783
0
    return STATUS_FORMAT(
1784
0
        InvalidArgument, "Name '$0' for transaction table does not start with '$1'", table_name,
1785
0
        kTransactionTablePrefix);
1786
0
  }
1787
0
  master::CreateTransactionStatusTableRequestPB req;
1788
0
  master::CreateTransactionStatusTableResponsePB resp;
1789
0
  req.set_table_name(table_name);
1790
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Admin, req, resp, CreateTransactionStatusTable);
1791
0
  return Status::OK();
1792
0
}
1793
1794
Status YBClient::GetTabletsFromTableId(const string& table_id,
1795
                                       const int32_t max_tablets,
1796
5.21k
                                       RepeatedPtrField<TabletLocationsPB>* tablets) {
1797
5.21k
  GetTableLocationsRequestPB req;
1798
5.21k
  GetTableLocationsResponsePB resp;
1799
5.21k
  req.mutable_table()->set_table_id(table_id);
1800
1801
5.21k
  if (max_tablets == 0) {
1802
5.21k
    req.set_max_returned_locations(std::numeric_limits<int32_t>::max());
1803
5.21k
  } else 
if (0
max_tablets > 00
) {
1804
0
    req.set_max_returned_locations(max_tablets);
1805
0
  }
1806
5.21k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, GetTableLocations);
1807
5.21k
  *tablets = resp.tablet_locations();
1808
5.21k
  return Status::OK();
1809
5.21k
}
1810
1811
Status YBClient::GetTablets(const YBTableName& table_name,
1812
                            const int32_t max_tablets,
1813
                            RepeatedPtrField<TabletLocationsPB>* tablets,
1814
                            PartitionListVersion* partition_list_version,
1815
                            const RequireTabletsRunning require_tablets_running,
1816
470
                            const master::IncludeInactive include_inactive) {
1817
470
  GetTableLocationsRequestPB req;
1818
470
  GetTableLocationsResponsePB resp;
1819
470
  if (table_name.has_table()) {
1820
470
    table_name.SetIntoTableIdentifierPB(req.mutable_table());
1821
470
  } else 
if (0
table_name.has_table_id()0
) {
1822
0
    req.mutable_table()->set_table_id(table_name.table_id());
1823
0
  }
1824
1825
470
  if (max_tablets == 0) {
1826
465
    req.set_max_returned_locations(std::numeric_limits<int32_t>::max());
1827
465
  } else 
if (5
max_tablets > 05
) {
1828
1
    req.set_max_returned_locations(max_tablets);
1829
1
  }
1830
470
  req.set_require_tablets_running(require_tablets_running);
1831
470
  req.set_include_inactive(include_inactive);
1832
470
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, GetTableLocations);
1833
470
  *tablets = resp.tablet_locations();
1834
470
  if (partition_list_version) {
1835
114
    *partition_list_version = resp.partition_list_version();
1836
114
  }
1837
470
  return Status::OK();
1838
470
}
1839
1840
Status YBClient::GetTabletLocation(const TabletId& tablet_id,
1841
0
                                   master::TabletLocationsPB* tablet_location) {
1842
0
  GetTabletLocationsRequestPB req;
1843
0
  GetTabletLocationsResponsePB resp;
1844
0
  req.add_tablet_ids(tablet_id);
1845
0
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, GetTabletLocations);
1846
1847
0
  if (resp.tablet_locations_size() != 1) {
1848
0
    return STATUS_SUBSTITUTE(IllegalState, "Expected single tablet for $0, received $1",
1849
0
                             tablet_id, resp.tablet_locations_size());
1850
0
  }
1851
1852
0
  *tablet_location = resp.tablet_locations(0);
1853
0
  return Status::OK();
1854
0
}
1855
1856
Result<TransactionStatusTablets> YBClient::GetTransactionStatusTablets(
1857
3.36k
    const CloudInfoPB& placement) {
1858
3.36k
  GetTransactionStatusTabletsRequestPB req;
1859
3.36k
  GetTransactionStatusTabletsResponsePB resp;
1860
1861
3.36k
  req.mutable_placement()->CopyFrom(placement);
1862
1863
3.36k
  CALL_SYNC_LEADER_MASTER_RPC_EX(Client, req, resp, GetTransactionStatusTablets);
1864
1865
3.36k
  TransactionStatusTablets tablets;
1866
1867
3.36k
  MoveCollection(&resp.global_tablet_id(), &tablets.global_tablets);
1868
3.36k
  MoveCollection(&resp.placement_local_tablet_id(), &tablets.placement_local_tablets);
1869
1870
3.36k
  return tablets;
1871
3.36k
}
1872
1873
Status YBClient::GetTablets(const YBTableName& table_name,
1874
                            const int32_t max_tablets,
1875
                            vector<TabletId>* tablet_uuids,
1876
                            vector<string>* ranges,
1877
                            std::vector<master::TabletLocationsPB>* locations,
1878
                            const RequireTabletsRunning require_tablets_running,
1879
133
                            master::IncludeInactive include_inactive) {
1880
133
  RepeatedPtrField<TabletLocationsPB> tablets;
1881
133
  RETURN_NOT_OK(GetTablets(
1882
133
      table_name, max_tablets, &tablets, /* partition_list_version =*/ nullptr,
1883
133
      require_tablets_running, include_inactive));
1884
133
  FillFromRepeatedTabletLocations(tablets, tablet_uuids, ranges, locations);
1885
133
  return Status::OK();
1886
133
}
1887
1888
Status YBClient::GetTabletsAndUpdateCache(
1889
    const YBTableName& table_name,
1890
    const int32_t max_tablets,
1891
    vector<TabletId>* tablet_uuids,
1892
    vector<string>* ranges,
1893
114
    std::vector<master::TabletLocationsPB>* locations) {
1894
114
  RepeatedPtrField<TabletLocationsPB> tablets;
1895
114
  PartitionListVersion partition_list_version;
1896
114
  RETURN_NOT_OK(GetTablets(
1897
114
      table_name, max_tablets, &tablets, &partition_list_version, RequireTabletsRunning::kFalse));
1898
114
  FillFromRepeatedTabletLocations(tablets, tablet_uuids, ranges, locations);
1899
1900
114
  RETURN_NOT_OK(data_->meta_cache_->ProcessTabletLocations(
1901
114
      tablets, partition_list_version, /* lookup_rpc = */ nullptr));
1902
1903
114
  return Status::OK();
1904
114
}
1905
1906
14.7M
rpc::Messenger* YBClient::messenger() const {
1907
14.7M
  return data_->messenger_;
1908
14.7M
}
1909
1910
141k
const scoped_refptr<MetricEntity>& YBClient::metric_entity() const {
1911
141k
  return data_->metric_entity_;
1912
141k
}
1913
1914
14.1M
rpc::ProxyCache& YBClient::proxy_cache() const {
1915
14.1M
  return *data_->proxy_cache_;
1916
14.1M
}
1917
1918
11.5M
ThreadPool *YBClient::callback_threadpool() {
1919
11.5M
  return data_->use_threadpool_for_callbacks_ ? 
data_->threadpool_.get()0
: nullptr;
1920
11.5M
}
1921
1922
9.53M
const std::string& YBClient::proxy_uuid() const {
1923
9.53M
  return data_->uuid_;
1924
9.53M
}
1925
1926
2.54M
const ClientId& YBClient::id() const {
1927
2.54M
  return data_->id_;
1928
2.54M
}
1929
1930
219
const CloudInfoPB& YBClient::cloud_info() const {
1931
219
  return data_->cloud_info_pb_;
1932
219
}
1933
1934
std::pair<RetryableRequestId, RetryableRequestId> YBClient::NextRequestIdAndMinRunningRequestId(
1935
2.54M
    const TabletId& tablet_id) {
1936
2.54M
  std::lock_guard<simple_spinlock> lock(data_->tablet_requests_mutex_);
1937
2.54M
  auto& tablet = data_->tablet_requests_[tablet_id];
1938
2.54M
  if (tablet.request_id_seq == kInitializeFromMinRunning) {
1939
0
    return std::make_pair(kInitializeFromMinRunning, kInitializeFromMinRunning);
1940
0
  }
1941
2.54M
  auto id = tablet.request_id_seq++;
1942
2.54M
  tablet.running_requests.insert(id);
1943
2.54M
  return std::make_pair(id, *tablet.running_requests.begin());
1944
2.54M
}
1945
1946
2.54M
void YBClient::RequestFinished(const TabletId& tablet_id, RetryableRequestId request_id) {
1947
2.54M
  if (request_id == kInitializeFromMinRunning) {
1948
0
    return;
1949
0
  }
1950
2.54M
  std::lock_guard<simple_spinlock> lock(data_->tablet_requests_mutex_);
1951
2.54M
  auto& tablet = data_->tablet_requests_[tablet_id];
1952
2.54M
  auto it = tablet.running_requests.find(request_id);
1953
2.54M
  if (
it != tablet.running_requests.end()2.54M
) {
1954
2.54M
    tablet.running_requests.erase(it);
1955
18.4E
  } else {
1956
18.4E
    LOG(DFATAL) << "RequestFinished called for an unknown request: "
1957
18.4E
                << tablet_id << ", " << request_id;
1958
18.4E
  }
1959
2.54M
}
1960
1961
void YBClient::MaybeUpdateMinRunningRequestId(
1962
0
    const TabletId& tablet_id, RetryableRequestId min_running_request_id) {
1963
0
  std::lock_guard<simple_spinlock> lock(data_->tablet_requests_mutex_);
1964
0
  auto& tablet = data_->tablet_requests_[tablet_id];
1965
0
  if (tablet.request_id_seq == kInitializeFromMinRunning) {
1966
0
    tablet.request_id_seq = min_running_request_id + (1 << 24);
1967
0
    VLOG(1) << "Set request_id_seq for tablet " << tablet_id << " to " << tablet.request_id_seq;
1968
0
  }
1969
0
}
1970
1971
void YBClient::LookupTabletByKey(const std::shared_ptr<YBTable>& table,
1972
                                 const std::string& partition_key,
1973
                                 CoarseTimePoint deadline,
1974
208k
                                 LookupTabletCallback callback) {
1975
208k
  data_->meta_cache_->LookupTabletByKey(table, partition_key, deadline, std::move(callback));
1976
208k
}
1977
1978
void YBClient::LookupTabletById(const std::string& tablet_id,
1979
                                const std::shared_ptr<const YBTable>& table,
1980
                                master::IncludeInactive include_inactive,
1981
                                CoarseTimePoint deadline,
1982
                                LookupTabletCallback callback,
1983
1.94M
                                UseCache use_cache) {
1984
1.94M
  data_->meta_cache_->LookupTabletById(
1985
1.94M
      tablet_id, table, include_inactive, deadline, std::move(callback), use_cache);
1986
1.94M
}
1987
1988
void YBClient::LookupAllTablets(const std::shared_ptr<const YBTable>& table,
1989
                                CoarseTimePoint deadline,
1990
0
                                LookupTabletRangeCallback callback) {
1991
0
  data_->meta_cache_->LookupAllTablets(table, deadline, std::move(callback));
1992
0
}
1993
1994
std::future<Result<internal::RemoteTabletPtr>> YBClient::LookupTabletByKeyFuture(
1995
    const std::shared_ptr<YBTable>& table,
1996
    const std::string& partition_key,
1997
0
    CoarseTimePoint deadline) {
1998
0
  return data_->meta_cache_->LookupTabletByKeyFuture(table, partition_key, deadline);
1999
0
}
2000
2001
std::future<Result<std::vector<internal::RemoteTabletPtr>>> YBClient::LookupAllTabletsFuture(
2002
    const std::shared_ptr<const YBTable>& table,
2003
0
    CoarseTimePoint deadline) {
2004
0
  return MakeFuture<Result<std::vector<internal::RemoteTabletPtr>>>([&](auto callback) {
2005
0
    this->LookupAllTablets(table, deadline, std::move(callback));
2006
0
  });
2007
0
}
2008
2009
114
HostPort YBClient::GetMasterLeaderAddress() {
2010
114
  return data_->leader_master_hostport();
2011
114
}
2012
2013
0
Status YBClient::ListMasters(CoarseTimePoint deadline, std::vector<std::string>* master_uuids) {
2014
0
  ListMastersRequestPB req;
2015
0
  ListMastersResponsePB resp;
2016
0
  CALL_SYNC_LEADER_MASTER_RPC_WITH_DEADLINE(Cluster, req, resp, deadline, ListMasters);
2017
2018
0
  master_uuids->clear();
2019
0
  for (const ServerEntryPB& master : resp.masters()) {
2020
0
    if (master.has_error()) {
2021
0
      LOG(ERROR) << "Master " << master.ShortDebugString() << " hit error "
2022
0
        << master.error().ShortDebugString();
2023
0
      return StatusFromPB(master.error());
2024
0
    }
2025
0
    master_uuids->push_back(master.instance_id().permanent_uuid());
2026
0
  }
2027
0
  return Status::OK();
2028
0
}
2029
2030
0
Result<HostPort> YBClient::RefreshMasterLeaderAddress() {
2031
0
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
2032
0
  RETURN_NOT_OK(data_->SetMasterServerProxy(deadline));
2033
2034
0
  return GetMasterLeaderAddress();
2035
0
}
2036
2037
775
void YBClient::RefreshMasterLeaderAddressAsync() {
2038
775
  data_->SetMasterServerProxyAsync(
2039
775
      CoarseMonoClock::Now() + default_admin_operation_timeout(),
2040
775
      false /* skip_resolution */, true /* wait_for_leader_election */, /* callback */ [](auto)
{}773
);
2041
775
}
2042
2043
2
Status YBClient::RemoveMasterFromClient(const HostPort& remove) {
2044
2
  return data_->RemoveMasterAddress(remove);
2045
2
}
2046
2047
1
Status YBClient::AddMasterToClient(const HostPort& add) {
2048
1
  return data_->AddMasterAddress(add);
2049
1
}
2050
2051
0
Status YBClient::SetMasterAddresses(const std::string& addrs) {
2052
0
  return data_->SetMasterAddresses(addrs);
2053
0
}
2054
2055
1
Status YBClient::GetMasterUUID(const string& host, uint16_t port, string* uuid) {
2056
1
  HostPort hp(host, port);
2057
1
  ServerEntryPB server;
2058
1
  RETURN_NOT_OK(master::GetMasterEntryForHosts(
2059
1
      data_->proxy_cache_.get(), {hp}, default_rpc_timeout(), &server));
2060
2061
1
  if (server.has_error()) {
2062
0
    return STATUS_FORMAT(
2063
0
      RuntimeError,
2064
0
      "Error while getting uuid of $0.",
2065
0
      HostPortToString(host, port));
2066
0
  }
2067
2068
1
  *uuid = server.instance_id().permanent_uuid();
2069
2070
1
  return Status::OK();
2071
1
}
2072
2073
4
Status YBClient::SetReplicationInfo(const ReplicationInfoPB& replication_info) {
2074
4
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
2075
4
  return data_->SetReplicationInfo(this, replication_info, deadline);
2076
4
}
2077
2078
1
Status YBClient::ValidateReplicationInfo(const ReplicationInfoPB& replication_info) {
2079
1
  auto deadline = CoarseMonoClock::Now() + default_rpc_timeout();
2080
1
  return data_->ValidateReplicationInfo(replication_info, deadline);
2081
1
}
2082
2083
Result<std::vector<YBTableName>> YBClient::ListTables(const std::string& filter,
2084
582
                                                      bool exclude_ysql) {
2085
582
  ListTablesRequestPB req;
2086
582
  ListTablesResponsePB resp;
2087
2088
582
  if (!filter.empty()) {
2089
529
    req.set_name_filter(filter);
2090
529
  }
2091
2092
582
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListTables);
2093
582
  std::vector<YBTableName> result;
2094
582
  result.reserve(resp.tables_size());
2095
2096
23.1k
  for (int i = 0; i < resp.tables_size(); 
i++22.5k
) {
2097
22.5k
    const ListTablesResponsePB_TableInfo& table_info = resp.tables(i);
2098
22.5k
    DCHECK(table_info.has_namespace_());
2099
22.5k
    DCHECK(table_info.namespace_().has_name());
2100
22.5k
    DCHECK(table_info.namespace_().has_id());
2101
22.5k
    if (exclude_ysql && 
table_info.table_type() == TableType::PGSQL_TABLE_TYPE1
) {
2102
0
      continue;
2103
0
    }
2104
22.5k
    result.emplace_back(master::GetDatabaseTypeForTable(table_info.table_type()),
2105
22.5k
                        table_info.namespace_().id(),
2106
22.5k
                        table_info.namespace_().name(),
2107
22.5k
                        table_info.id(),
2108
22.5k
                        table_info.name(),
2109
22.5k
                        table_info.pgschema_name(),
2110
22.5k
                        table_info.relation_type());
2111
22.5k
  }
2112
582
  return result;
2113
582
}
2114
2115
306
Result<std::vector<YBTableName>> YBClient::ListUserTables(const NamespaceId& ns_id) {
2116
306
  ListTablesRequestPB req;
2117
306
  ListTablesResponsePB resp;
2118
306
  bool exclude_ysql = false;
2119
2120
306
  if (!ns_id.empty()) {
2121
306
    req.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL);
2122
306
    req.mutable_namespace_()->set_id(ns_id);
2123
306
  }
2124
2125
306
  req.add_relation_type_filter(master::USER_TABLE_RELATION);
2126
306
  CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListTables);
2127
306
  std::vector<YBTableName> result;
2128
306
  result.reserve(resp.tables_size());
2129
2130
5.49k
  for (int i = 0; i < resp.tables_size(); 
i++5.19k
) {
2131
5.19k
    const ListTablesResponsePB_TableInfo& table_info = resp.tables(i);
2132
5.19k
    DCHECK(table_info.has_namespace_());
2133
5.19k
    DCHECK(table_info.namespace_().has_name());
2134
5.19k
    DCHECK(table_info.namespace_().has_id());
2135
5.19k
    if (exclude_ysql && 
table_info.table_type() == TableType::PGSQL_TABLE_TYPE0
) {
2136
0
      continue;
2137
0
    }
2138
5.19k
    result.emplace_back(master::GetDatabaseTypeForTable(table_info.table_type()),
2139
5.19k
                        table_info.namespace_().id(),
2140
5.19k
                        table_info.namespace_().name(),
2141
5.19k
                        table_info.id(),
2142
5.19k
                        table_info.name(),
2143
5.19k
                        table_info.pgschema_name(),
2144
5.19k
                        table_info.relation_type());
2145
5.19k
  }
2146
306
  return result;
2147
306
}
2148
2149
382
Result<bool> YBClient::TableExists(const YBTableName& table_name) {
2150
382
  for (const YBTableName& table : VERIFY_RESULT(ListTables(table_name.table_name()))) {
2151
193
    if (table == table_name) {
2152
193
      return true;
2153
193
    }
2154
193
  }
2155
189
  return false;
2156
382
}
2157
2158
49.3k
Status YBClient::OpenTable(const YBTableName& table_name, YBTablePtr* table) {
2159
49.3k
  YBTableInfo info;
2160
49.3k
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
2161
49.3k
  RETURN_NOT_OK(data_->GetTableSchema(this, table_name, deadline, &info));
2162
47.0k
  auto future = FetchPartitionsFuture(this, info.table_id);
2163
  // In the future, probably will look up the table in some map to reuse YBTable instances.
2164
47.0k
  *table = std::make_shared<YBTable>(info, VERIFY_RESULT(future.get()));
2165
0
  return Status::OK();
2166
47.0k
}
2167
2168
Status YBClient::OpenTable(
2169
87.6k
    const TableId& table_id, YBTablePtr* table, master::GetTableSchemaResponsePB* resp) {
2170
  // Fetch partitions first to run GetTableSchema and GetTableLocations RPCs in parallel.
2171
87.6k
  auto future = FetchPartitionsFuture(this, table_id);
2172
2173
87.6k
  YBTableInfo info;
2174
87.6k
  auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
2175
87.6k
  RETURN_NOT_OK(data_->GetTableSchema(this, table_id, deadline, &info, resp));
2176
  // In the future, probably will look up the table in some map to reuse YBTable instances.
2177
87.3k
  *table = std::make_shared<YBTable>(info, VERIFY_RESULT(future.get()));
2178
0
  return Status::OK();
2179
87.3k
}
2180
2181
3.04k
shared_ptr<YBSession> YBClient::NewSession() {
2182
3.04k
  return std::make_shared<YBSession>(this);
2183
3.04k
}
2184
2185
0
bool YBClient::IsMultiMaster() const {
2186
0
  return data_->IsMultiMaster();
2187
0
}
2188
2189
6.47k
Result<int> YBClient::NumTabletsForUserTable(TableType table_type) {
2190
6.47k
  if (table_type == TableType::PGSQL_TABLE_TYPE &&
2191
6.47k
        
FLAGS_ysql_num_tablets > 04.79k
) {
2192
2
    VLOG(1) << "num_tablets = " << FLAGS_ysql_num_tablets
2193
0
              << ": --ysql_num_tablets is specified.";
2194
2
    return FLAGS_ysql_num_tablets;
2195
6.47k
  } else if (FLAGS_ycql_num_tablets > 0) {
2196
33
    VLOG(1) << "num_tablets = " << FLAGS_ycql_num_tablets
2197
0
              << ": --ycql_num_tablets is specified.";
2198
33
    return FLAGS_ycql_num_tablets;
2199
6.43k
  } else {
2200
6.43k
    int tserver_count = 0;
2201
6.43k
    RETURN_NOT_OK(TabletServerCount(&tserver_count, true /* primary_only */));
2202
6.43k
    int num_tablets = 0;
2203
6.43k
    if (table_type == TableType::PGSQL_TABLE_TYPE) {
2204
4.78k
      num_tablets = tserver_count * FLAGS_ysql_num_shards_per_tserver;
2205
4.78k
      VLOG(1) << "num_tablets = " << num_tablets << ": "
2206
0
              << "calculated as tserver_count * FLAGS_ysql_num_shards_per_tserver ("
2207
0
              << tserver_count << " * " << FLAGS_ysql_num_shards_per_tserver << ")";
2208
4.78k
    } else {
2209
1.65k
      num_tablets = tserver_count * FLAGS_yb_num_shards_per_tserver;
2210
1.65k
      VLOG(1) << "num_tablets = " << num_tablets << ": "
2211
0
              << "calculated as tserver_count * FLAGS_yb_num_shards_per_tserver ("
2212
0
              << tserver_count << " * " << FLAGS_yb_num_shards_per_tserver << ")";
2213
1.65k
    }
2214
6.43k
    return num_tablets;
2215
6.43k
  }
2216
6.47k
}
2217
2218
0
void YBClient::TEST_set_admin_operation_timeout(const MonoDelta& timeout) {
2219
0
  data_->default_admin_operation_timeout_ = timeout;
2220
0
}
2221
2222
488k
const MonoDelta& YBClient::default_admin_operation_timeout() const {
2223
488k
  return data_->default_admin_operation_timeout_;
2224
488k
}
2225
2226
2
const MonoDelta& YBClient::default_rpc_timeout() const {
2227
2
  return data_->default_rpc_timeout_;
2228
2
}
2229
2230
const uint64_t YBClient::kNoHybridTime = 0;
2231
2232
0
uint64_t YBClient::GetLatestObservedHybridTime() const {
2233
0
  return data_->GetLatestObservedHybridTime();
2234
0
}
2235
2236
0
void YBClient::SetLatestObservedHybridTime(uint64_t ht_hybrid_time) {
2237
0
  data_->UpdateLatestObservedHybridTime(ht_hybrid_time);
2238
0
}
2239
2240
7.85k
CoarseTimePoint YBClient::PatchAdminDeadline(CoarseTimePoint deadline) const {
2241
7.85k
  if (deadline != CoarseTimePoint()) {
2242
4.35k
    return deadline;
2243
4.35k
  }
2244
3.50k
  return CoarseMonoClock::Now() + default_admin_operation_timeout();
2245
7.85k
}
2246
2247
20
Result<vector<master::NamespaceIdentifierPB>> YBClient::ListNamespaces() {
2248
20
  return ListNamespaces(boost::none);
2249
20
}
2250
2251
6
Result<YBTablePtr> YBClient::OpenTable(const TableId& table_id) {
2252
6
  YBTablePtr result;
2253
6
  RETURN_NOT_OK(OpenTable(table_id, &result));
2254
6
  return result;
2255
6
}
2256
2257
0
Result<YBTablePtr> YBClient::OpenTable(const YBTableName& name) {
2258
0
  YBTablePtr result;
2259
0
  RETURN_NOT_OK(OpenTable(name, &result));
2260
0
  return result;
2261
0
}
2262
2263
0
Result<TableId> GetTableId(YBClient* client, const YBTableName& table_name) {
2264
0
  return VERIFY_RESULT(client->GetYBTableInfo(table_name)).table_id;
2265
0
}
2266
2267
}  // namespace client
2268
}  // namespace yb