YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tools/ysck_remote.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tools/ysck_remote.h"
34
35
#include "yb/common/schema.h"
36
#include "yb/common/wire_protocol.h"
37
38
#include "yb/gutil/callback.h"
39
#include "yb/gutil/map-util.h"
40
#include "yb/gutil/strings/substitute.h"
41
42
#include "yb/master/master_client.proxy.h"
43
#include "yb/master/master_cluster.proxy.h"
44
#include "yb/master/master_ddl.proxy.h"
45
#include "yb/master/master_util.h"
46
47
#include "yb/rpc/messenger.h"
48
#include "yb/rpc/proxy.h"
49
#include "yb/rpc/rpc_controller.h"
50
51
#include "yb/tserver/tserver_service.proxy.h"
52
53
#include "yb/util/net/net_util.h"
54
#include "yb/util/net/sockaddr.h"
55
#include "yb/util/result.h"
56
#include "yb/util/status_format.h"
57
58
DEFINE_bool(checksum_cache_blocks, false, "Should the checksum scanners cache the read blocks");
59
DEFINE_int64(timeout_ms, 1000 * 60, "RPC timeout in milliseconds");
60
DEFINE_int32(tablets_batch_size_max, 100, "How many tablets to get from the Master per RPC");
61
DECLARE_int64(outbound_rpc_block_size);
62
DECLARE_int64(outbound_rpc_memory_limit);
63
64
using namespace std::literals;
65
66
namespace yb {
67
namespace tools {
68
69
static const char kMessengerName[] = "ysck";
70
71
using rpc::Messenger;
72
using rpc::MessengerBuilder;
73
using rpc::RpcController;
74
using std::shared_ptr;
75
using std::string;
76
using std::vector;
77
using strings::Substitute;
78
using client::YBTableName;
79
80
48.1k
MonoDelta GetDefaultTimeout() {
81
48.1k
  return MonoDelta::FromMilliseconds(FLAGS_timeout_ms);
82
48.1k
}
83
84
RemoteYsckTabletServer::RemoteYsckTabletServer(const std::string& id,
85
                                               const HostPort& address,
86
                                               rpc::ProxyCache* proxy_cache)
87
    : YsckTabletServer(id),
88
      address_(address.ToString()),
89
      generic_proxy_(new server::GenericServiceProxy(proxy_cache, address)),
90
2.95k
      ts_proxy_(new tserver::TabletServerServiceProxy(proxy_cache, address)) {
91
2.95k
}
92
93
2.93k
Status RemoteYsckTabletServer::Connect() const {
94
2.93k
  server::PingRequestPB req;
95
2.93k
  server::PingResponsePB resp;
96
2.93k
  RpcController rpc;
97
2.93k
  rpc.set_timeout(GetDefaultTimeout());
98
2.93k
  return generic_proxy_->Ping(req, &resp, &rpc);
99
2.93k
}
100
101
0
Status RemoteYsckTabletServer::CurrentHybridTime(uint64_t* hybrid_time) const {
102
0
  server::ServerClockRequestPB req;
103
0
  server::ServerClockResponsePB resp;
104
0
  RpcController rpc;
105
0
  rpc.set_timeout(GetDefaultTimeout());
106
0
  RETURN_NOT_OK(generic_proxy_->ServerClock(req, &resp, &rpc));
107
0
  CHECK(resp.has_hybrid_time());
108
0
  *hybrid_time = resp.hybrid_time();
109
0
  return Status::OK();
110
0
}
111
112
class ChecksumStepper;
113
114
// Simple class to act as a callback in order to collate results from parallel
115
// checksum scans.
116
class ChecksumCallbackHandler {
117
 public:
118
  explicit ChecksumCallbackHandler(ChecksumStepper* const stepper)
119
4.37k
      : stepper(DCHECK_NOTNULL(stepper)) {
120
4.37k
  }
121
122
  // Invoked by an RPC completion callback. Simply calls back into the stepper.
123
  // Then the call to the stepper returns, deletes 'this'.
124
  void Run();
125
126
 private:
127
  ChecksumStepper* const stepper;
128
};
129
130
// Simple class to have a "conversation" over multiple requests to a server
131
// to carry out a multi-part checksum scan.
132
// If any errors or timeouts are encountered, the checksum operation fails.
133
// After the ChecksumStepper reports its results to the reporter, it deletes itself.
134
class ChecksumStepper {
135
 public:
136
  ChecksumStepper(string tablet_id, const Schema& schema, string server_uuid,
137
                  ChecksumOptions options, ReportResultCallback callback,
138
                  shared_ptr<tserver::TabletServerServiceProxy> proxy)
139
      : schema_(schema),
140
        tablet_id_(std::move(tablet_id)),
141
        server_uuid_(std::move(server_uuid)),
142
        options_(std::move(options)),
143
        reporter_callback_(std::move(callback)),
144
4.37k
        proxy_(std::move(proxy)) {
145
4.37k
    DCHECK(proxy_);
146
4.37k
  }
147
148
4.37k
  void Start() {
149
4.37k
    SchemaToColumnPBs(schema_, &cols_, SCHEMA_PB_WITHOUT_IDS);
150
4.37k
    SendRequest();
151
4.37k
  }
152
153
4.37k
  void HandleResponse() {
154
4.37k
    std::unique_ptr<ChecksumStepper> deleter(this);
155
4.37k
    Status s = rpc_.status();
156
4.37k
    if (s.ok() && 
resp_.has_error()3.30k
) {
157
1
      s = StatusFromPB(resp_.error().status());
158
1
    }
159
4.37k
    if (!s.ok()) {
160
1.07k
      reporter_callback_.Run(s, 0);
161
1.07k
      return; // Deletes 'this'.
162
1.07k
    }
163
164
3.30k
    DCHECK(resp_.has_checksum());
165
166
3.30k
    reporter_callback_.Run(s, resp_.checksum());
167
3.30k
  }
168
169
 private:
170
4.37k
  void SendRequest() {
171
4.37k
    req_.set_tablet_id(tablet_id_);
172
4.37k
    req_.set_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX);
173
4.37k
    rpc_.set_timeout(GetDefaultTimeout());
174
4.37k
    auto handler = std::make_unique<ChecksumCallbackHandler>(this);
175
4.37k
    rpc::ResponseCallback cb = std::bind(&ChecksumCallbackHandler::Run, handler.get());
176
4.37k
    proxy_->ChecksumAsync(req_, &resp_, &rpc_, cb);
177
4.37k
    handler.release();
178
4.37k
  }
179
180
  const Schema schema_;
181
  google::protobuf::RepeatedPtrField<ColumnSchemaPB> cols_;
182
183
  const string tablet_id_;
184
  const string server_uuid_;
185
  const ChecksumOptions options_;
186
  const ReportResultCallback reporter_callback_;
187
  const shared_ptr<tserver::TabletServerServiceProxy> proxy_;
188
189
  tserver::ChecksumRequestPB req_;
190
  tserver::ChecksumResponsePB resp_;
191
  RpcController rpc_;
192
};
193
194
4.37k
void ChecksumCallbackHandler::Run() {
195
4.37k
  stepper->HandleResponse();
196
4.37k
  delete this;
197
4.37k
}
198
199
void RemoteYsckTabletServer::RunTabletChecksumScanAsync(
200
        const string& tablet_id,
201
        const Schema& schema,
202
        const ChecksumOptions& options,
203
4.37k
        const ReportResultCallback& callback) {
204
4.37k
  std::unique_ptr<ChecksumStepper> stepper(
205
4.37k
      new ChecksumStepper(tablet_id, schema, uuid(), options, callback, ts_proxy_));
206
4.37k
  stepper->Start();
207
4.37k
  stepper.release(); // Deletes self on callback.
208
4.37k
}
209
210
2.36k
Status RemoteYsckMaster::Connect() const {
211
2.36k
  server::PingRequestPB req;
212
2.36k
  server::PingResponsePB resp;
213
2.36k
  RpcController rpc;
214
2.36k
  rpc.set_timeout(GetDefaultTimeout());
215
2.36k
  return generic_proxy_->Ping(req, &resp, &rpc);
216
2.36k
}
217
218
1.18k
Status RemoteYsckMaster::Build(const HostPort& address, shared_ptr<YsckMaster>* master) {
219
1.18k
  MessengerBuilder builder(kMessengerName);
220
1.18k
  auto messenger = VERIFY_RESULT(builder.Build());
221
1.18k
  messenger->TEST_SetOutboundIpBase(VERIFY_RESULT(HostToAddress("127.0.0.1")));
222
0
  master->reset(new RemoteYsckMaster(address, std::move(messenger)));
223
1.18k
  return Status::OK();
224
1.18k
}
225
226
991
Status RemoteYsckMaster::RetrieveTabletServers(TSMap* tablet_servers) {
227
991
  master::ListTabletServersRequestPB req;
228
991
  master::ListTabletServersResponsePB resp;
229
991
  RpcController rpc;
230
231
991
  rpc.set_timeout(GetDefaultTimeout());
232
991
  RETURN_NOT_OK(cluster_proxy_->ListTabletServers(req, &resp, &rpc));
233
991
  tablet_servers->clear();
234
2.95k
  for (const master::ListTabletServersResponsePB_Entry& e : resp.servers()) {
235
2.95k
    const HostPortPB& addr = DesiredHostPort(e.registration().common(), CloudInfoPB());
236
2.95k
    shared_ptr<YsckTabletServer> ts(new RemoteYsckTabletServer(
237
2.95k
        e.instance_id().permanent_uuid(), HostPortFromPB(addr), proxy_cache_.get()));
238
2.95k
    InsertOrDie(tablet_servers, ts->uuid(), ts);
239
2.95k
  }
240
991
  return Status::OK();
241
991
}
242
243
1.15k
Status RemoteYsckMaster::RetrieveTablesList(vector<shared_ptr<YsckTable> >* tables) {
244
1.15k
  master::ListTablesRequestPB req;
245
1.15k
  master::ListTablesResponsePB resp;
246
1.15k
  RpcController rpc;
247
248
1.15k
  rpc.set_timeout(GetDefaultTimeout());
249
1.15k
  RETURN_NOT_OK(ddl_proxy_->ListTables(req, &resp, &rpc));
250
1.14k
  if (resp.has_error()) {
251
0
    return StatusFromPB(resp.error().status());
252
0
  }
253
1.14k
  vector<shared_ptr<YsckTable> > tables_temp;
254
18.4k
  for (const master::ListTablesResponsePB_TableInfo& info : resp.tables()) {
255
18.4k
    Schema schema;
256
18.4k
    int num_replicas = 0;
257
18.4k
    CHECK(info.has_namespace_());
258
18.4k
    CHECK(info.namespace_().has_name());
259
18.4k
    YBTableName name(
260
18.4k
        master::GetDatabaseTypeForTable(info.table_type()), info.namespace_().name(), info.name());
261
18.4k
    bool is_pg_table = false;
262
18.4k
    RETURN_NOT_OK(GetTableInfo(info.id(), &schema, &num_replicas, &is_pg_table));
263
18.2k
    if (is_pg_table) {
264
      // Skip PostgreSQL tables in ysck for now. If we enable this, we'll have to fix lots of unit
265
      // tests that expect a certain number of system tables.
266
0
      continue;
267
0
    }
268
18.2k
    auto table = std::make_shared<YsckTable>(
269
18.2k
        info.id(), name, schema, num_replicas, info.table_type());
270
18.2k
    tables_temp.push_back(table);
271
18.2k
  }
272
991
  *tables = std::move(tables_temp);
273
991
  return Status::OK();
274
1.14k
}
275
276
17.9k
Status RemoteYsckMaster::RetrieveTabletsList(const shared_ptr<YsckTable>& table) {
277
17.9k
  vector<shared_ptr<YsckTablet> > tablets;
278
17.9k
  bool more_tablets = true;
279
17.9k
  string last_key;
280
17.9k
  auto deadline = CoarseMonoClock::now() + 60s;
281
35.8k
  while (more_tablets) {
282
17.9k
    auto status = GetTabletsBatch(table->id(), table->name(), &last_key, &tablets, &more_tablets);
283
17.9k
    if (status.IsTryAgain()) {
284
0
      if (CoarseMonoClock::now() >= deadline) {
285
0
        return status.CloneAndReplaceCode(Status::kTimedOut);
286
0
      }
287
0
      tablets.clear();
288
0
      last_key.clear();
289
0
      more_tablets = true;
290
0
      std::this_thread::sleep_for(100ms);
291
0
      continue;
292
0
    }
293
17.9k
    RETURN_NOT_OK(status);
294
17.9k
  }
295
296
17.9k
  table->set_tablets(tablets);
297
17.9k
  return Status::OK();
298
17.9k
}
299
300
Status RemoteYsckMaster::GetTabletsBatch(
301
    const TableId& table_id,
302
    const YBTableName& table_name,
303
    string* last_partition_key,
304
    vector<shared_ptr<YsckTablet> >* tablets,
305
17.9k
    bool* more_tablets) {
306
17.9k
  master::GetTableLocationsRequestPB req;
307
17.9k
  master::GetTableLocationsResponsePB resp;
308
17.9k
  RpcController rpc;
309
310
17.9k
  req.mutable_table()->set_table_id(table_id);
311
17.9k
  req.set_max_returned_locations(FLAGS_tablets_batch_size_max);
312
17.9k
  req.set_partition_key_start(*last_partition_key);
313
314
17.9k
  rpc.set_timeout(GetDefaultTimeout());
315
17.9k
  RETURN_NOT_OK(client_proxy_->GetTableLocations(req, &resp, &rpc));
316
17.9k
  if (resp.creating()) {
317
0
    return STATUS_FORMAT(TryAgain, "Table $0 is being created", table_name);
318
0
  }
319
19.1k
  
for (const master::TabletLocationsPB& locations : resp.tablet_locations())17.9k
{
320
19.1k
    shared_ptr<YsckTablet> tablet(new YsckTablet(locations.tablet_id()));
321
19.1k
    vector<shared_ptr<YsckTabletReplica> > replicas;
322
23.9k
    for (const master::TabletLocationsPB_ReplicaPB& replica : locations.replicas()) {
323
23.9k
      bool is_leader = replica.role() == PeerRole::LEADER;
324
23.9k
      bool is_follower = replica.role() == PeerRole::FOLLOWER;
325
23.9k
      replicas.push_back(shared_ptr<YsckTabletReplica>(
326
23.9k
          new YsckTabletReplica(replica.ts_info().permanent_uuid(), is_leader, is_follower)));
327
23.9k
    }
328
19.1k
    tablet->set_replicas(replicas);
329
19.1k
    tablets->push_back(tablet);
330
19.1k
  }
331
17.9k
  if (resp.tablet_locations_size() != 0) {
332
17.9k
    *last_partition_key = (resp.tablet_locations().end() - 1)->partition().partition_key_end();
333
17.9k
  } else {
334
6
    return STATUS_FORMAT(
335
6
        NotFound,
336
6
        "The Master returned 0 tablets for GetTableLocations of table $0 at start key $1",
337
6
        table_name.ToString(), *(last_partition_key));
338
6
  }
339
17.9k
  if (last_partition_key->empty()) {
340
17.9k
    *more_tablets = false;
341
17.9k
  }
342
17.9k
  return Status::OK();
343
17.9k
}
344
345
Status RemoteYsckMaster::GetTableInfo(const TableId& table_id,
346
                                      Schema* schema,
347
                                      int* num_replicas,
348
18.4k
                                      bool* is_pg_table) {
349
18.4k
  master::GetTableSchemaRequestPB req;
350
18.4k
  master::GetTableSchemaResponsePB resp;
351
18.4k
  RpcController rpc;
352
353
18.4k
  req.mutable_table()->set_table_id(table_id);
354
355
18.4k
  rpc.set_timeout(GetDefaultTimeout());
356
18.4k
  RETURN_NOT_OK(ddl_proxy_->GetTableSchema(req, &resp, &rpc));
357
18.2k
  if (resp.has_error()) {
358
0
    return StatusFromPB(resp.error().status());
359
0
  }
360
361
18.2k
  RETURN_NOT_OK(SchemaFromPB(resp.schema(), schema));
362
18.2k
  *num_replicas = resp.replication_info().live_replicas().num_replicas();
363
364
18.2k
  *is_pg_table = resp.table_type() == yb::TableType::PGSQL_TABLE_TYPE;
365
18.2k
  return Status::OK();
366
18.2k
}
367
368
RemoteYsckMaster::RemoteYsckMaster(
369
    const HostPort& address, std::unique_ptr<rpc::Messenger>&& messenger)
370
    : messenger_(std::move(messenger)),
371
      proxy_cache_(new rpc::ProxyCache(messenger_.get())),
372
      generic_proxy_(new server::GenericServiceProxy(proxy_cache_.get(), address)),
373
      client_proxy_(new master::MasterClientProxy(proxy_cache_.get(), address)),
374
      cluster_proxy_(new master::MasterClusterProxy(proxy_cache_.get(), address)),
375
1.18k
      ddl_proxy_(new master::MasterDdlProxy(proxy_cache_.get(), address)) {}
376
377
1.18k
RemoteYsckMaster::~RemoteYsckMaster() {
378
1.18k
  messenger_->Shutdown();
379
1.18k
}
380
381
} // namespace tools
382
} // namespace yb