YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/meta_cache.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/client/meta_cache.h"
34
35
#include <stdint.h>
36
37
#include <atomic>
38
#include <list>
39
#include <memory>
40
#include <shared_mutex>
41
#include <string>
42
#include <unordered_map>
43
#include <unordered_set>
44
#include <vector>
45
46
#include <boost/optional/optional_io.hpp>
47
#include <glog/logging.h>
48
49
#include "yb/client/client.h"
50
#include "yb/client/client_error.h"
51
#include "yb/client/client_master_rpc.h"
52
#include "yb/client/client-internal.h"
53
#include "yb/client/schema.h"
54
#include "yb/client/table.h"
55
#include "yb/client/yb_table_name.h"
56
57
#include "yb/common/wire_protocol.h"
58
59
#include "yb/gutil/map-util.h"
60
#include "yb/gutil/ref_counted.h"
61
#include "yb/gutil/strings/substitute.h"
62
63
#include "yb/master/master_client.proxy.h"
64
65
#include "yb/rpc/rpc_fwd.h"
66
67
#include "yb/tserver/local_tablet_server.h"
68
#include "yb/tserver/tserver_service.proxy.h"
69
70
#include "yb/util/async_util.h"
71
#include "yb/util/atomic.h"
72
#include "yb/util/flag_tags.h"
73
#include "yb/util/locks.h"
74
#include "yb/util/logging.h"
75
#include "yb/util/metrics.h"
76
#include "yb/util/monotime.h"
77
#include "yb/util/net/dns_resolver.h"
78
#include "yb/util/net/net_util.h"
79
#include "yb/util/net/sockaddr.h"
80
#include "yb/util/random_util.h"
81
#include "yb/util/result.h"
82
#include "yb/util/scope_exit.h"
83
#include "yb/util/shared_lock.h"
84
#include "yb/util/status_format.h"
85
#include "yb/util/unique_lock.h"
86
87
using std::map;
88
using std::shared_ptr;
89
using strings::Substitute;
90
using namespace std::literals;
91
using namespace std::placeholders;
92
93
DEFINE_int32(max_concurrent_master_lookups, 500,
94
             "Maximum number of concurrent tablet location lookups from YB client to master");
95
96
DEFINE_test_flag(bool, verify_all_replicas_alive, false,
97
                 "If set, when a RemoteTablet object is destroyed, we will verify that all its "
98
                 "replicas are not marked as failed");
99
100
DEFINE_int32(retry_failed_replica_ms, 60 * 1000,
101
             "Time in milliseconds to wait for before retrying a failed replica");
102
103
DEFINE_int64(meta_cache_lookup_throttling_step_ms, 5,
104
             "Step to increment delay between calls during lookup throttling.");
105
106
DEFINE_int64(meta_cache_lookup_throttling_max_delay_ms, 1000,
107
             "Max delay between calls during lookup throttling.");
108
109
DEFINE_test_flag(bool, force_master_lookup_all_tablets, false,
110
                 "If set, force the client to go to the master for all tablet lookup "
111
                 "instead of reading from cache.");
112
113
DEFINE_test_flag(double, simulate_lookup_timeout_probability, 0,
114
                 "If set, mark an RPC as failed and force retry on the first attempt.");
115
DEFINE_test_flag(double, simulate_lookup_partition_list_mismatch_probability, 0,
116
                 "Probability for simulating the partition list mismatch error on tablet lookup.");
117
118
METRIC_DEFINE_coarse_histogram(
119
  server, dns_resolve_latency_during_init_proxy,
120
  "yb.client.MetaCache.InitProxy DNS Resolve",
121
  yb::MetricUnit::kMicroseconds,
122
  "Microseconds spent resolving DNS requests during MetaCache::InitProxy");
123
124
DECLARE_string(placement_cloud);
125
DECLARE_string(placement_region);
126
127
namespace yb {
128
129
using consensus::RaftPeerPB;
130
using master::GetTableLocationsRequestPB;
131
using master::GetTableLocationsResponsePB;
132
using master::TabletLocationsPB;
133
using master::TabletLocationsPB_ReplicaPB;
134
using master::TSInfoPB;
135
using rpc::Messenger;
136
using rpc::Rpc;
137
using tablet::RaftGroupStatePB;
138
using tserver::LocalTabletServer;
139
using tserver::TabletServerServiceProxy;
140
using tserver::TabletServerForwardServiceProxy;
141
142
namespace client {
143
144
namespace internal {
145
146
using ProcessedTablesMap =
147
    std::unordered_map<TableId, std::unordered_map<PartitionKey, RemoteTabletPtr>>;
148
149
namespace {
150
151
// We join tablet partitions to groups, so tablet state info in one group requested with single
152
// RPC call to master.
153
// kPartitionGroupSize defines size of this group.
154
#ifdef NDEBUG
155
const size_t kPartitionGroupSize = 64;
156
#else
157
const size_t kPartitionGroupSize = 4;
158
#endif
159
160
std::atomic<int64_t> lookup_serial_{1};
161
162
} // namespace
163
164
0
int64_t TEST_GetLookupSerial() {
165
0
  return lookup_serial_.load(std::memory_order_acquire);
166
0
}
167
168
////////////////////////////////////////////////////////////
169
170
RemoteTabletServer::RemoteTabletServer(const master::TSInfoPB& pb)
171
22.6k
    : uuid_(pb.permanent_uuid()) {
172
22.6k
  Update(pb);
173
22.6k
}
174
175
RemoteTabletServer::RemoteTabletServer(const string& uuid,
176
                                       const shared_ptr<TabletServerServiceProxy>& proxy,
177
                                       const LocalTabletServer* local_tserver)
178
    : uuid_(uuid),
179
      proxy_(proxy),
180
7.97k
      local_tserver_(local_tserver) {
181
7.97k
  LOG_IF
(DFATAL, proxy && !IsLocal()) << "Local tserver has non-local proxy"0
;
182
7.97k
}
183
184
163
RemoteTabletServer::~RemoteTabletServer() = default;
185
186
15.1M
Status RemoteTabletServer::InitProxy(YBClient* client) {
187
15.1M
  {
188
15.1M
    SharedLock<rw_spinlock> lock(mutex_);
189
190
15.1M
    if (
proxy_15.1M
) {
191
      // Already have a proxy created.
192
15.1M
      return Status::OK();
193
15.1M
    }
194
15.1M
  }
195
196
18.4E
  std::lock_guard<rw_spinlock> lock(mutex_);
197
198
18.4E
  if (proxy_) {
199
    // Already have a proxy created.
200
6
    return Status::OK();
201
6
  }
202
203
18.4E
  if (!dns_resolve_histogram_) {
204
10.4k
    auto metric_entity = client->metric_entity();
205
10.4k
    if (metric_entity) {
206
10.4k
      dns_resolve_histogram_ = METRIC_dns_resolve_latency_during_init_proxy.Instantiate(
207
10.4k
          metric_entity);
208
10.4k
    }
209
10.4k
  }
210
211
  // TODO: if the TS advertises multiple host/ports, pick the right one
212
  // based on some kind of policy. For now just use the first always.
213
18.4E
  auto hostport = HostPortFromPB(yb::DesiredHostPort(
214
18.4E
      public_rpc_hostports_, private_rpc_hostports_, cloud_info_pb_,
215
18.4E
      client->data_->cloud_info_pb_));
216
18.4E
  CHECK(!hostport.host().empty());
217
18.4E
  ScopedDnsTracker dns_tracker(dns_resolve_histogram_.get());
218
18.4E
  proxy_.reset(new TabletServerServiceProxy(client->data_->proxy_cache_.get(), hostport));
219
18.4E
  proxy_endpoint_ = hostport;
220
221
18.4E
  return Status::OK();
222
18.4E
}
223
224
289k
void RemoteTabletServer::Update(const master::TSInfoPB& pb) {
225
289k
  CHECK_EQ(pb.permanent_uuid(), uuid_);
226
227
289k
  std::lock_guard<rw_spinlock> lock(mutex_);
228
289k
  private_rpc_hostports_ = pb.private_rpc_addresses();
229
289k
  public_rpc_hostports_ = pb.broadcast_addresses();
230
289k
  cloud_info_pb_ = pb.cloud_info();
231
289k
  capabilities_.assign(pb.capabilities().begin(), pb.capabilities().end());
232
289k
  std::sort(capabilities_.begin(), capabilities_.end());
233
289k
}
234
235
11.7M
bool RemoteTabletServer::IsLocal() const {
236
11.7M
  return local_tserver_ != nullptr;
237
11.7M
}
238
239
1.82M
const std::string& RemoteTabletServer::permanent_uuid() const {
240
1.82M
  return uuid_;
241
1.82M
}
242
243
15.1M
shared_ptr<TabletServerServiceProxy> RemoteTabletServer::proxy() const {
244
15.1M
  SharedLock<rw_spinlock> lock(mutex_);
245
15.1M
  return proxy_;
246
15.1M
}
247
248
0
::yb::HostPort RemoteTabletServer::ProxyEndpoint() const {
249
0
  std::shared_lock<rw_spinlock> lock(mutex_);
250
0
  return proxy_endpoint_;
251
0
}
252
253
252k
string RemoteTabletServer::ToString() const {
254
252k
  string ret = "{ uuid: " + uuid_;
255
252k
  SharedLock<rw_spinlock> lock(mutex_);
256
252k
  if (!private_rpc_hostports_.empty()) {
257
252k
    ret += Format(" private: $0", private_rpc_hostports_);
258
252k
  }
259
252k
  if (!public_rpc_hostports_.empty()) {
260
0
    ret += Format(" public: $0", public_rpc_hostports_);
261
0
  }
262
252k
  ret += Format(" cloud_info: $0", cloud_info_pb_);
263
252k
  return ret;
264
252k
}
265
266
39.4k
bool RemoteTabletServer::HasHostFrom(const std::unordered_set<std::string>& hosts) const {
267
39.4k
  SharedLock<rw_spinlock> lock(mutex_);
268
39.4k
  for (const auto& hp : private_rpc_hostports_) {
269
39.4k
    if (hosts.count(hp.host())) {
270
0
      return true;
271
0
    }
272
39.4k
  }
273
39.4k
  for (const auto& hp : public_rpc_hostports_) {
274
0
    if (hosts.count(hp.host())) {
275
0
      return true;
276
0
    }
277
0
  }
278
39.4k
  return false;
279
39.4k
}
280
281
12.3M
bool RemoteTabletServer::HasCapability(CapabilityId capability) const {
282
12.3M
  SharedLock<rw_spinlock> lock(mutex_);
283
12.3M
  return std::binary_search(capabilities_.begin(), capabilities_.end(), capability);
284
12.3M
}
285
286
93
bool RemoteTabletServer::IsLocalRegion() const {
287
93
  SharedLock<rw_spinlock> lock(mutex_);
288
93
  return cloud_info_pb_.placement_cloud() == FLAGS_placement_cloud &&
289
93
         cloud_info_pb_.placement_region() == FLAGS_placement_region;
290
93
}
291
292
39.4k
LocalityLevel RemoteTabletServer::LocalityLevelWith(const CloudInfoPB& cloud_info) const {
293
39.4k
  SharedLock<rw_spinlock> lock(mutex_);
294
39.4k
  if (!cloud_info_pb_.has_placement_region() || 
!cloud_info.has_placement_region()39.4k
||
295
39.4k
      
cloud_info_pb_.placement_region() != cloud_info.placement_region()39.4k
) {
296
665
    return LocalityLevel::kNone;
297
665
  }
298
38.7k
  
if (38.7k
!cloud_info_pb_.has_placement_zone()38.7k
|| !cloud_info.has_placement_zone() ||
299
38.7k
      cloud_info_pb_.placement_zone() != cloud_info.placement_zone()) {
300
9.61k
    return LocalityLevel::kRegion;
301
9.61k
  }
302
29.1k
  return LocalityLevel::kZone;
303
38.7k
}
304
305
219
HostPortPB RemoteTabletServer::DesiredHostPort(const CloudInfoPB& cloud_info) const {
306
219
  SharedLock<rw_spinlock> lock(mutex_);
307
219
  return yb::DesiredHostPort(
308
219
      public_rpc_hostports_, private_rpc_hostports_, cloud_info_pb_, cloud_info);
309
219
}
310
311
6.50k
std::string RemoteTabletServer::TEST_PlacementZone() const {
312
6.50k
  SharedLock<rw_spinlock> lock(mutex_);
313
6.50k
  return cloud_info_pb_.placement_zone();
314
6.50k
}
315
316
0
std::string ReplicasCount::ToString() {
317
0
  return Format(
318
0
      " live replicas $0, read replicas $1, expected live replicas $2, expected read replicas $3",
319
0
      num_alive_live_replicas, num_alive_read_replicas,
320
0
      expected_live_replicas, expected_read_replicas);
321
0
}
322
323
////////////////////////////////////////////////////////////
324
325
RemoteTablet::RemoteTablet(std::string tablet_id,
326
                           Partition partition,
327
                           boost::optional<PartitionListVersion> partition_list_version,
328
                           uint64 split_depth,
329
                           const TabletId& split_parent_tablet_id)
330
    : tablet_id_(std::move(tablet_id)),
331
      log_prefix_(Format("T $0: ", tablet_id_)),
332
      partition_(std::move(partition)),
333
      partition_list_version_(partition_list_version),
334
      split_depth_(split_depth),
335
      split_parent_tablet_id_(split_parent_tablet_id),
336
90.3k
      stale_(false) {
337
90.3k
}
338
339
71
RemoteTablet::~RemoteTablet() {
340
71
  if (PREDICT_FALSE(FLAGS_TEST_verify_all_replicas_alive)) {
341
    // Let's verify that none of the replicas are marked as failed. The test should always wait
342
    // enough time so that the lookup cache can be refreshed after force_lookup_cache_refresh_secs.
343
0
    for (const auto& replica : replicas_) {
344
0
      if (replica.Failed()) {
345
0
        LOG_WITH_PREFIX(FATAL) << "Remote tablet server " << replica.ts->ToString()
346
0
                               << " with role " << PeerRole_Name(replica.role)
347
0
                               << " is marked as failed";
348
0
      }
349
0
    }
350
0
  }
351
71
}
352
353
void RemoteTablet::Refresh(
354
    const TabletServerMap& tservers,
355
759k
    const google::protobuf::RepeatedPtrField<TabletLocationsPB_ReplicaPB>& replicas) {
356
  // Adopt the data from the successful response.
357
759k
  std::lock_guard<rw_spinlock> lock(mutex_);
358
759k
  std::vector<std::string> old_uuids;
359
759k
  old_uuids.reserve(replicas_.size());
360
1.68M
  for (const auto& replica : replicas_) {
361
1.68M
    old_uuids.push_back(replica.ts->permanent_uuid());
362
1.68M
  }
363
759k
  std::sort(old_uuids.begin(), old_uuids.end());
364
759k
  replicas_.clear();
365
759k
  bool has_new_replica = false;
366
1.95M
  for (const TabletLocationsPB_ReplicaPB& r : replicas) {
367
1.95M
    auto it = tservers.find(r.ts_info().permanent_uuid());
368
1.95M
    CHECK(it != tservers.end());
369
1.95M
    replicas_.emplace_back(it->second.get(), r.role());
370
1.95M
    has_new_replica =
371
1.95M
        has_new_replica ||
372
1.95M
        
!std::binary_search(old_uuids.begin(), old_uuids.end(), r.ts_info().permanent_uuid())1.77M
;
373
1.95M
  }
374
759k
  if (has_new_replica) {
375
92.3k
    lookups_without_new_replicas_ = 0;
376
667k
  } else {
377
667k
    ++lookups_without_new_replicas_;
378
667k
  }
379
759k
  stale_ = false;
380
759k
  refresh_time_.store(MonoTime::Now(), std::memory_order_release);
381
759k
}
382
383
68
void RemoteTablet::MarkStale() {
384
68
  std::lock_guard<rw_spinlock> lock(mutex_);
385
68
  stale_ = true;
386
68
}
387
388
23.2M
bool RemoteTablet::stale() const {
389
23.2M
  SharedLock<rw_spinlock> lock(mutex_);
390
23.2M
  return stale_;
391
23.2M
}
392
393
35
void RemoteTablet::MarkAsSplit() {
394
35
  std::lock_guard<rw_spinlock> lock(mutex_);
395
35
  is_split_ = true;
396
35
}
397
398
12.0M
bool RemoteTablet::is_split() const {
399
12.0M
  SharedLock<rw_spinlock> lock(mutex_);
400
12.0M
  return is_split_;
401
12.0M
}
402
403
4.22k
bool RemoteTablet::MarkReplicaFailed(RemoteTabletServer *ts, const Status& status) {
404
4.22k
  std::lock_guard<rw_spinlock> lock(mutex_);
405
18.4E
  VLOG_WITH_PREFIX(2) << "Current remote replicas in meta cache: "
406
18.4E
                      << ReplicasAsStringUnlocked() << ". Replica " << ts->ToString()
407
18.4E
                      << " has failed: " << status.ToString();
408
8.02k
  for (RemoteReplica& rep : replicas_) {
409
8.02k
    if (rep.ts == ts) {
410
4.22k
      rep.MarkFailed();
411
4.22k
      return true;
412
4.22k
    }
413
8.02k
  }
414
18.4E
  return false;
415
4.22k
}
416
417
3.36k
int RemoteTablet::GetNumFailedReplicas() const {
418
3.36k
  int failed = 0;
419
3.36k
  SharedLock<rw_spinlock> lock(mutex_);
420
20.1k
  for (const RemoteReplica& rep : replicas_) {
421
20.1k
    if (rep.Failed()) {
422
29
      failed++;
423
29
    }
424
20.1k
  }
425
3.36k
  return failed;
426
3.36k
}
427
428
3.59k
bool RemoteTablet::IsReplicasCountConsistent() const {
429
3.59k
  return replicas_count_.load(std::memory_order_acquire).IsReplicasCountConsistent();
430
3.59k
}
431
432
0
string RemoteTablet::ReplicasCountToString() const {
433
0
  return replicas_count_.load(std::memory_order_acquire).ToString();
434
0
}
435
436
757k
void RemoteTablet::SetExpectedReplicas(int expected_live_replicas, int expected_read_replicas) {
437
757k
  ReplicasCount old_replicas_count = replicas_count_.load(std::memory_order_acquire);
438
757k
  for (;;) {
439
757k
    ReplicasCount new_replicas_count = old_replicas_count;
440
757k
    new_replicas_count.SetExpectedReplicas(expected_live_replicas, expected_read_replicas);
441
757k
    if (replicas_count_.compare_exchange_weak(
442
757k
        old_replicas_count, new_replicas_count, std::memory_order_acq_rel)) {
443
757k
      break;
444
757k
    }
445
757k
  }
446
757k
}
447
448
368k
void RemoteTablet::SetAliveReplicas(int alive_live_replicas, int alive_read_replicas) {
449
368k
  ReplicasCount old_replicas_count = replicas_count_.load(std::memory_order_acquire);
450
368k
  for (;;) {
451
368k
    ReplicasCount new_replicas_count = old_replicas_count;
452
368k
    new_replicas_count.SetAliveReplicas(alive_live_replicas, alive_read_replicas);
453
368k
    if (replicas_count_.compare_exchange_weak(
454
368k
        old_replicas_count, new_replicas_count, std::memory_order_acq_rel)) {
455
368k
      break;
456
368k
    }
457
368k
  }
458
368k
}
459
460
39.2M
RemoteTabletServer* RemoteTablet::LeaderTServer() const {
461
39.2M
  SharedLock<rw_spinlock> lock(mutex_);
462
77.9M
  for (const RemoteReplica& replica : replicas_) {
463
77.9M
    if (!replica.Failed() && 
replica.role == PeerRole::LEADER77.7M
) {
464
39.1M
      return replica.ts;
465
39.1M
    }
466
77.9M
  }
467
22.9k
  return nullptr;
468
39.2M
}
469
470
25.1M
bool RemoteTablet::HasLeader() const {
471
25.1M
  return LeaderTServer() != nullptr;
472
25.1M
}
473
474
void RemoteTablet::GetRemoteTabletServers(
475
368k
    std::vector<RemoteTabletServer*>* servers, IncludeFailedReplicas include_failed_replicas) {
476
368k
  DCHECK(servers->empty());
477
368k
  struct ReplicaUpdate {
478
368k
    RemoteReplica* replica;
479
368k
    tablet::RaftGroupStatePB new_state;
480
368k
    bool clear_failed;
481
368k
  };
482
368k
  std::vector<ReplicaUpdate> replica_updates;
483
368k
  {
484
368k
    SharedLock<rw_spinlock> lock(mutex_);
485
368k
    int num_alive_live_replicas = 0;
486
368k
    int num_alive_read_replicas = 0;
487
1.11M
    for (RemoteReplica& replica : replicas_) {
488
1.11M
      if (replica.Failed()) {
489
22.0k
        if (include_failed_replicas) {
490
22
          servers->push_back(replica.ts);
491
22
          continue;
492
22
        }
493
22.0k
        ReplicaUpdate replica_update = {&replica, RaftGroupStatePB::UNKNOWN, false};
494
22.0k
        
VLOG_WITH_PREFIX13
(4)
495
13
            << "Replica " << replica.ts->ToString()
496
13
            << " failed, state: " << RaftGroupStatePB_Name(replica.state)
497
13
            << ", is local: " << replica.ts->IsLocal()
498
13
            << ", time since failure: " << (MonoTime::Now() - replica.last_failed_time);
499
22.0k
        switch (replica.state) {
500
21.9k
          case RaftGroupStatePB::UNKNOWN: FALLTHROUGH_INTENDED;
501
21.9k
          case RaftGroupStatePB::NOT_STARTED: FALLTHROUGH_INTENDED;
502
21.9k
          case RaftGroupStatePB::BOOTSTRAPPING: FALLTHROUGH_INTENDED;
503
21.9k
          case RaftGroupStatePB::RUNNING:
504
            // These are non-terminal states that may retry. Check and update failed local replica's
505
            // current state. For remote replica, just wait for some time before retrying.
506
21.9k
            if (replica.ts->IsLocal()) {
507
1.08k
              tserver::GetTabletStatusRequestPB req;
508
1.08k
              tserver::GetTabletStatusResponsePB resp;
509
1.08k
              req.set_tablet_id(tablet_id_);
510
1.08k
              const Status status =
511
1.08k
                  CHECK_NOTNULL(replica.ts->local_tserver())->GetTabletStatus(&req, &resp);
512
1.08k
              if (!status.ok() || 
resp.has_error()527
) {
513
557
                LOG_WITH_PREFIX(ERROR)
514
557
                    << "Received error from GetTabletStatus: "
515
557
                    << (!status.ok() ? status : 
StatusFromPB(resp.error().status())0
);
516
557
                continue;
517
557
              }
518
519
527
              DCHECK_EQ(resp.tablet_status().tablet_id(), tablet_id_);
520
527
              
VLOG_WITH_PREFIX0
(3) << "GetTabletStatus returned status: "
521
0
                                  << tablet::RaftGroupStatePB_Name(resp.tablet_status().state())
522
0
                                  << " for replica " << replica.ts->ToString();
523
527
              replica_update.new_state = resp.tablet_status().state();
524
527
              if (replica_update.new_state != tablet::RaftGroupStatePB::RUNNING) {
525
60
                if (replica_update.new_state != replica.state) {
526
                  // Cannot update replica here directly because holding only shared lock on mutex.
527
58
                  replica_updates.push_back(replica_update); // Update only state
528
58
                }
529
60
                continue;
530
60
              }
531
467
              if (!replica.ts->local_tserver()->LeaderAndReady(
532
467
                      tablet_id_, /* allow_stale */ true)) {
533
                // Should continue here because otherwise failed state will be cleared.
534
451
                continue;
535
451
              }
536
20.8k
            } else if ((MonoTime::Now() - replica.last_failed_time) <
537
20.8k
                       FLAGS_retry_failed_replica_ms * 1ms) {
538
20.8k
              continue;
539
20.8k
            }
540
30
            break;
541
30
          
case RaftGroupStatePB::FAILED: 0
FALLTHROUGH_INTENDED0
;
542
16
          case RaftGroupStatePB::QUIESCING: FALLTHROUGH_INTENDED;
543
65
          case RaftGroupStatePB::SHUTDOWN:
544
            // These are terminal states, so we won't retry.
545
65
            continue;
546
22.0k
        }
547
548
20
        
VLOG_WITH_PREFIX0
(3) << "Changing state of replica " << replica.ts->ToString()
549
0
                            << " from failed to not failed";
550
20
        replica_update.clear_failed = true;
551
        // Cannot update replica here directly because holding only shared lock on mutex.
552
20
        replica_updates.push_back(replica_update);
553
1.09M
      } else {
554
1.09M
        if (replica.role == PeerRole::READ_REPLICA) {
555
12.4k
          num_alive_read_replicas++;
556
1.08M
        } else if (replica.role == PeerRole::FOLLOWER || 
replica.role == PeerRole::LEADER338k
) {
557
1.07M
          num_alive_live_replicas++;
558
1.07M
        }
559
1.09M
      }
560
1.09M
      servers->push_back(replica.ts);
561
1.09M
    }
562
368k
    SetAliveReplicas(num_alive_live_replicas, num_alive_read_replicas);
563
368k
  }
564
368k
  if (!replica_updates.empty()) {
565
78
    std::lock_guard<rw_spinlock> lock(mutex_);
566
78
    for (const auto& update : replica_updates) {
567
78
      if (update.new_state != RaftGroupStatePB::UNKNOWN) {
568
74
        update.replica->state = update.new_state;
569
74
      }
570
78
      if (update.clear_failed) {
571
20
        update.replica->ClearFailed();
572
20
      }
573
78
    }
574
78
  }
575
368k
}
576
577
1.64k
bool RemoteTablet::MarkTServerAsLeader(const RemoteTabletServer* server) {
578
1.64k
  bool found = false;
579
1.64k
  std::lock_guard<rw_spinlock> lock(mutex_);
580
5.16k
  for (RemoteReplica& replica : replicas_) {
581
5.16k
    if (replica.ts == server) {
582
1.64k
      replica.role = PeerRole::LEADER;
583
1.64k
      found = true;
584
3.52k
    } else if (replica.role == PeerRole::LEADER) {
585
175
      replica.role = PeerRole::FOLLOWER;
586
175
    }
587
5.16k
  }
588
1.64k
  
VLOG_WITH_PREFIX0
(3) << "Latest replicas: " << ReplicasAsStringUnlocked()0
;
589
1.64k
  
VLOG_IF_WITH_PREFIX0
(3, !found) << "Specified server not found: " << server->ToString()
590
0
                                 << ". Replicas: " << ReplicasAsStringUnlocked();
591
1.64k
  return found;
592
1.64k
}
593
594
3.36k
void RemoteTablet::MarkTServerAsFollower(const RemoteTabletServer* server) {
595
3.36k
  bool found = false;
596
3.36k
  std::lock_guard<rw_spinlock> lock(mutex_);
597
10.2k
  for (RemoteReplica& replica : replicas_) {
598
10.2k
    if (replica.ts == server) {
599
3.36k
      replica.role = PeerRole::FOLLOWER;
600
3.36k
      found = true;
601
3.36k
    }
602
10.2k
  }
603
3.36k
  
VLOG_WITH_PREFIX0
(3) << "Latest replicas: " << ReplicasAsStringUnlocked()0
;
604
18.4E
  DCHECK(found) << "Tablet " << tablet_id_ << ": Specified server not found: "
605
18.4E
                << server->ToString() << ". Replicas: " << ReplicasAsStringUnlocked();
606
3.36k
}
607
608
3
std::string RemoteTablet::ReplicasAsString() const {
609
3
  SharedLock<rw_spinlock> lock(mutex_);
610
3
  return ReplicasAsStringUnlocked();
611
3
}
612
613
3
std::string RemoteTablet::ReplicasAsStringUnlocked() const {
614
3
  DCHECK(mutex_.is_locked());
615
3
  string replicas_str;
616
9
  for (const RemoteReplica& rep : replicas_) {
617
9
    if (!replicas_str.empty()) 
replicas_str += ", "6
;
618
9
    replicas_str += rep.ToString();
619
9
  }
620
3
  return replicas_str;
621
3
}
622
623
5.08k
std::string RemoteTablet::ToString() const {
624
5.08k
  return YB_CLASS_TO_STRING(tablet_id, partition, partition_list_version, split_depth);
625
5.08k
}
626
627
285
PartitionListVersion RemoteTablet::GetLastKnownPartitionListVersion() const {
628
285
  SharedLock<rw_spinlock> lock(mutex_);
629
285
  return last_known_partition_list_version_;
630
285
}
631
632
void RemoteTablet::MakeLastKnownPartitionListVersionAtLeast(
633
737k
    PartitionListVersion partition_list_version) {
634
737k
  std::lock_guard<rw_spinlock> lock(mutex_);
635
737k
  last_known_partition_list_version_ =
636
737k
      std::max(last_known_partition_list_version_, partition_list_version);
637
737k
}
638
639
159k
void LookupCallbackVisitor::operator()(const LookupTabletCallback& tablet_callback) const {
640
159k
  if (error_status_) {
641
1.32k
    tablet_callback(*error_status_);
642
1.32k
    return;
643
1.32k
  }
644
158k
  auto remote_tablet = boost::get<RemoteTabletPtr>(param_);
645
158k
  if (remote_tablet == nullptr) {
646
0
    static const Status error_status = STATUS(
647
0
        TryAgain, "Tablet for requested partition is not yet running",
648
0
        ClientError(ClientErrorCode::kTabletNotYetRunning));
649
0
    tablet_callback(error_status);
650
0
    return;
651
0
  }
652
158k
  tablet_callback(remote_tablet);
653
158k
}
654
655
void LookupCallbackVisitor::operator()(
656
0
    const LookupTabletRangeCallback& tablet_range_callback) const {
657
0
  if (error_status_) {
658
0
    tablet_range_callback(*error_status_);
659
0
    return;
660
0
  }
661
0
  auto result = boost::get<std::vector<RemoteTabletPtr>>(param_);
662
0
  tablet_range_callback(result);
663
0
}
664
665
////////////////////////////////////////////////////////////
666
667
MetaCache::MetaCache(YBClient* client)
668
  : client_(client),
669
    master_lookup_sem_(FLAGS_max_concurrent_master_lookups),
670
30.7k
    log_prefix_(Format("MetaCache($0): ", static_cast<void*>(this))) {
671
30.7k
}
672
673
691
MetaCache::~MetaCache() {
674
691
}
675
676
void MetaCache::SetLocalTabletServer(const string& permanent_uuid,
677
                                     const shared_ptr<TabletServerServiceProxy>& proxy,
678
7.97k
                                     const LocalTabletServer* local_tserver) {
679
7.97k
  const auto entry = ts_cache_.emplace(permanent_uuid,
680
7.97k
                                       std::make_unique<RemoteTabletServer>(permanent_uuid,
681
7.97k
                                                                            proxy,
682
7.97k
                                                                            local_tserver));
683
7.97k
  CHECK(entry.second);
684
7.97k
  local_tserver_ = entry.first->second.get();
685
7.97k
}
686
687
289k
void MetaCache::UpdateTabletServerUnlocked(const master::TSInfoPB& pb) {
688
289k
  const std::string& permanent_uuid = pb.permanent_uuid();
689
289k
  auto it = ts_cache_.find(permanent_uuid);
690
289k
  if (it != ts_cache_.end()) {
691
267k
    it->second->Update(pb);
692
267k
    return;
693
267k
  }
694
695
22.6k
  
VLOG_WITH_PREFIX0
(1) << "Client caching new TabletServer " << permanent_uuid0
;
696
22.6k
  CHECK(ts_cache_.emplace(permanent_uuid, std::make_unique<RemoteTabletServer>(pb)).second);
697
22.6k
}
698
699
// A (table, partition_key) --> tablet lookup. May be in-flight to a master, or
700
// may be handled locally.
701
//
702
// Keeps a reference on the owning metacache while alive.
703
class LookupRpc : public internal::ClientMasterRpcBase, public RequestCleanup {
704
 public:
705
  LookupRpc(const scoped_refptr<MetaCache>& meta_cache,
706
            const std::shared_ptr<const YBTable>& table,
707
            int64_t request_no,
708
            CoarseTimePoint deadline);
709
710
  virtual ~LookupRpc();
711
712
  void SendRpc() override;
713
714
192k
  MetaCache* meta_cache() { return meta_cache_.get(); }
715
0
  YBClient* client() const { return meta_cache_->client_; }
716
717
  virtual void NotifyFailure(const Status& status) = 0;
718
719
  template <class Response>
720
  void DoProcessResponse(const Status& status, const Response& resp);
721
722
  // Subclasses can override VerifyResponse for implementing additional response checks. Called
723
  // from Finished if there are no errors passed in response.
724
15.0k
  virtual CHECKED_STATUS VerifyResponse() { return Status::OK(); }
725
726
69.6k
  int64_t request_no() const {
727
69.6k
    return request_no_;
728
69.6k
  }
729
730
184k
  const std::shared_ptr<const YBTable>& table() const {
731
184k
    return table_;
732
184k
  }
733
734
  // When we receive a response for a key lookup or full table rpc, add callbacks to be fired off
735
  // to the to_notify set corresponding to the rpc type. Modify tables pointer by removing lookups
736
  // as we add to the to_notify set.
737
  virtual void AddCallbacksToBeNotified(
738
      const ProcessedTablesMap& processed_tables,
739
      std::unordered_map<TableId, TableData>* tables,
740
      std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) = 0;
741
742
  // When looking up by key or full table, update the processe table with the returned location.
743
  virtual void UpdateProcessedTable(const TabletLocationsPB& loc,
744
                                    RemoteTabletPtr remote,
745
                                    ProcessedTablesMap::mapped_type* processed_table) = 0;
746
747
 private:
748
  virtual CHECKED_STATUS ProcessTabletLocations(
749
     const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations,
750
     boost::optional<PartitionListVersion> table_partition_list_version) = 0;
751
752
  const int64_t request_no_;
753
754
  // Pointer back to the tablet cache. Populated with location information
755
  // if the lookup finishes successfully.
756
  //
757
  // When the RPC is destroyed, a master lookup permit is returned to the
758
  // cache if one was acquired in the first place.
759
  scoped_refptr<MetaCache> meta_cache_;
760
761
  // Table to lookup. Optional in case lookup by ID, but if specified used to check if table
762
  // partitions are stale.
763
  const std::shared_ptr<const YBTable> table_;
764
765
  // Whether this lookup has acquired a master lookup permit.
766
  bool has_permit_ = false;
767
};
768
769
LookupRpc::LookupRpc(const scoped_refptr<MetaCache>& meta_cache,
770
                     const std::shared_ptr<const YBTable>& table,
771
                     int64_t request_no,
772
                     CoarseTimePoint deadline)
773
    : ClientMasterRpcBase(meta_cache->client_, deadline),
774
      request_no_(request_no),
775
      meta_cache_(meta_cache),
776
69.4k
      table_(table) {
777
69.4k
  DCHECK(deadline != CoarseTimePoint());
778
69.4k
}
779
780
69.3k
LookupRpc::~LookupRpc() {
781
69.3k
  if (has_permit_) {
782
69.3k
    meta_cache_->ReleaseMasterLookupPermit();
783
69.3k
  }
784
69.3k
}
785
786
69.5k
void LookupRpc::SendRpc() {
787
69.5k
  if (!has_permit_) {
788
69.4k
    has_permit_ = meta_cache_->AcquireMasterLookupPermit();
789
69.4k
  }
790
69.5k
  if (!has_permit_) {
791
    // Couldn't get a permit, try again in a little while.
792
0
    ScheduleRetry(STATUS(TryAgain, "Client has too many outstanding requests to the master"));
793
0
    return;
794
0
  }
795
796
  // See YBClient::Data::SyncLeaderMasterRpc().
797
69.5k
  auto now = CoarseMonoClock::Now();
798
69.5k
  if (retrier().deadline() < now) {
799
30
    Finished(STATUS_FORMAT(TimedOut, "Timed out after deadline expired, passed: $0",
800
30
                           MonoDelta(now - retrier().start())));
801
30
    return;
802
30
  }
803
69.5k
  mutable_retrier()->PrepareController();
804
805
69.5k
  ClientMasterRpcBase::SendRpc();
806
69.5k
}
807
808
namespace {
809
810
15.0k
CHECKED_STATUS GetFirstErrorForTabletById(const master::GetTabletLocationsResponsePB& resp) {
811
15.0k
  return resp.errors_size() > 0 ? 
StatusFromPB(resp.errors(0).status())360
:
Status::OK()14.6k
;
812
15.0k
}
813
814
54.2k
CHECKED_STATUS GetFirstErrorForTabletById(const master::GetTableLocationsResponsePB& resp) {
815
  // There are no per-tablet lookup errors inside GetTableLocationsResponsePB.
816
54.2k
  return Status::OK();
817
54.2k
}
818
819
template <class Response>
820
69.3k
boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) {
821
69.3k
  return resp.has_partition_list_version()
822
69.3k
             ? 
boost::make_optional<PartitionListVersion>(resp.partition_list_version())56.3k
823
69.3k
             : 
boost::none12.9k
;
824
69.3k
}
meta_cache.cc:boost::optional<unsigned int> yb::client::internal::(anonymous namespace)::GetPartitionListVersion<yb::master::GetTableLocationsResponsePB>(yb::master::GetTableLocationsResponsePB const&)
Line
Count
Source
820
54.2k
boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) {
821
54.2k
  return resp.has_partition_list_version()
822
54.2k
             ? 
boost::make_optional<PartitionListVersion>(resp.partition_list_version())54.2k
823
54.2k
             : 
boost::none1
;
824
54.2k
}
meta_cache.cc:boost::optional<unsigned int> yb::client::internal::(anonymous namespace)::GetPartitionListVersion<yb::master::GetTabletLocationsResponsePB>(yb::master::GetTabletLocationsResponsePB const&)
Line
Count
Source
820
15.0k
boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) {
821
15.0k
  return resp.has_partition_list_version()
822
15.0k
             ? 
boost::make_optional<PartitionListVersion>(resp.partition_list_version())2.08k
823
15.0k
             : 
boost::none12.9k
;
824
15.0k
}
825
826
} // namespace
827
828
template <class Response>
829
69.3k
void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) {
830
69.3k
  auto new_status = status;
831
69.3k
  if (new_status.ok()) {
832
69.3k
    new_status = VerifyResponse();
833
69.3k
  }
834
835
  // Prefer response failures over no tablets found.
836
69.3k
  if (new_status.ok()) {
837
69.3k
    new_status = GetFirstErrorForTabletById(resp);
838
69.3k
  }
839
840
69.3k
  if (new_status.ok() && 
resp.tablet_locations_size() == 068.9k
) {
841
0
    new_status = STATUS(NotFound, "No such tablet found");
842
0
  }
843
844
69.3k
  if (new_status.ok()) {
845
68.9k
    if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) {
846
0
      const auto s = STATUS(TimedOut, "Simulate timeout for test.");
847
0
      NotifyFailure(s);
848
0
      return;
849
0
    }
850
68.9k
    new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp));
851
68.9k
  }
852
69.3k
  if (!new_status.ok()) {
853
434
    
YB_LOG_WITH_PREFIX_EVERY_N_SECS287
(WARNING, 1) << new_status287
;
854
434
    new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString()));
855
434
    NotifyFailure(new_status);
856
434
  }
857
69.3k
}
void yb::client::internal::LookupRpc::DoProcessResponse<yb::master::GetTableLocationsResponsePB>(yb::Status const&, yb::master::GetTableLocationsResponsePB const&)
Line
Count
Source
829
54.3k
void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) {
830
54.3k
  auto new_status = status;
831
54.3k
  if (new_status.ok()) {
832
54.2k
    new_status = VerifyResponse();
833
54.2k
  }
834
835
  // Prefer response failures over no tablets found.
836
54.3k
  if (new_status.ok()) {
837
54.2k
    new_status = GetFirstErrorForTabletById(resp);
838
54.2k
  }
839
840
54.3k
  if (new_status.ok() && 
resp.tablet_locations_size() == 054.2k
) {
841
0
    new_status = STATUS(NotFound, "No such tablet found");
842
0
  }
843
844
54.3k
  if (new_status.ok()) {
845
54.2k
    if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) {
846
0
      const auto s = STATUS(TimedOut, "Simulate timeout for test.");
847
0
      NotifyFailure(s);
848
0
      return;
849
0
    }
850
54.2k
    new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp));
851
54.2k
  }
852
54.3k
  if (!new_status.ok()) {
853
57
    
YB_LOG_WITH_PREFIX_EVERY_N_SECS22
(WARNING, 1) << new_status22
;
854
57
    new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString()));
855
57
    NotifyFailure(new_status);
856
57
  }
857
54.3k
}
void yb::client::internal::LookupRpc::DoProcessResponse<yb::master::GetTabletLocationsResponsePB>(yb::Status const&, yb::master::GetTabletLocationsResponsePB const&)
Line
Count
Source
829
15.0k
void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) {
830
15.0k
  auto new_status = status;
831
15.0k
  if (new_status.ok()) {
832
15.0k
    new_status = VerifyResponse();
833
15.0k
  }
834
835
  // Prefer response failures over no tablets found.
836
15.0k
  if (new_status.ok()) {
837
15.0k
    new_status = GetFirstErrorForTabletById(resp);
838
15.0k
  }
839
840
15.0k
  if (new_status.ok() && 
resp.tablet_locations_size() == 014.6k
) {
841
0
    new_status = STATUS(NotFound, "No such tablet found");
842
0
  }
843
844
15.0k
  if (new_status.ok()) {
845
14.6k
    if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) {
846
0
      const auto s = STATUS(TimedOut, "Simulate timeout for test.");
847
0
      NotifyFailure(s);
848
0
      return;
849
0
    }
850
14.6k
    new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp));
851
14.6k
  }
852
15.0k
  if (!new_status.ok()) {
853
377
    
YB_LOG_WITH_PREFIX_EVERY_N_SECS265
(WARNING, 1) << new_status265
;
854
377
    new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString()));
855
377
    NotifyFailure(new_status);
856
377
  }
857
15.0k
}
858
859
namespace {
860
861
Status CheckTabletLocations(
862
69.0k
    const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations) {
863
69.0k
  const std::string* prev_partition_end = nullptr;
864
97.9k
  for (const TabletLocationsPB& loc : locations) {
865
97.9k
    if (prev_partition_end && 
*prev_partition_end > loc.partition().partition_key_start()28.8k
) {
866
0
      LOG(DFATAL) << "There should be no overlaps in tablet partitions and they should be sorted "
867
0
                  << "by partition_key_start. Prev partition end: "
868
0
                  << Slice(*prev_partition_end).ToDebugHexString() << ", current partition start: "
869
0
                  << Slice(loc.partition().partition_key_start()).ToDebugHexString()
870
0
                  << ". Tablet locations: " << [&locations] {
871
0
                       std::string result;
872
0
                       for (auto& loc : locations) {
873
0
                         result += "\n  " + AsString(loc);
874
0
                       }
875
0
                       return result;
876
0
                     }();
877
0
      return STATUS(IllegalState, "Wrong order or overlaps in partitions");
878
0
    }
879
97.9k
    prev_partition_end = &loc.partition().partition_key_end();
880
97.9k
  }
881
69.0k
  return Status::OK();
882
69.0k
}
883
884
class TabletIdLookup : public ToStringable {
885
 public:
886
15.0k
  explicit TabletIdLookup(const TabletId& tablet_id) : tablet_id_(tablet_id) {}
887
888
0
  std::string ToString() const override {
889
0
    return Format("Tablet: $0", tablet_id_);
890
0
  }
891
892
 private:
893
  const TabletId& tablet_id_;
894
};
895
896
class TablePartitionLookup : public ToStringable {
897
 public:
898
  explicit TablePartitionLookup(
899
      const TableId& table_id, const VersionedPartitionGroupStartKey& partition_group_start)
900
54.3k
      : table_id_(table_id), partition_group_start_(partition_group_start) {}
901
902
0
  std::string ToString() const override {
903
0
    return Format("Table: $0, partition: $1, partition list version: $2",
904
0
                  table_id_, Slice(*partition_group_start_.key).ToDebugHexString(),
905
0
                  partition_group_start_.partition_list_version);
906
0
  }
907
908
 private:
909
  const TableId& table_id_;
910
  const VersionedPartitionGroupStartKey partition_group_start_;
911
};
912
913
class FullTableLookup : public ToStringable {
914
 public:
915
  explicit FullTableLookup(const TableId& table_id)
916
0
      : table_id_(table_id) {}
917
918
0
  std::string ToString() const override {
919
0
    return Format("FullTableLookup: $0", table_id_);
920
0
  }
921
 private:
922
  const TableId& table_id_;
923
};
924
925
} // namespace
926
927
Status MetaCache::ProcessTabletLocations(
928
    const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations,
929
69.0k
    boost::optional<PartitionListVersion> table_partition_list_version, LookupRpc* lookup_rpc) {
930
69.0k
  if (VLOG_IS_ON(2)) {
931
0
    VLOG_WITH_PREFIX_AND_FUNC(2) << "lookup_rpc: " << AsString(lookup_rpc);
932
0
    for (const auto& loc : locations) {
933
0
      for (const auto& table_id : loc.table_ids()) {
934
0
        VLOG_WITH_PREFIX_AND_FUNC(2) << loc.tablet_id() << ", " << table_id;
935
0
      }
936
0
    }
937
0
    VLOG_WITH_PREFIX_AND_FUNC(4) << AsString(locations);
938
0
  }
939
940
69.0k
  RETURN_NOT_OK(CheckTabletLocations(locations));
941
942
69.0k
  std::vector<std::pair<LookupCallback, LookupCallbackVisitor>> to_notify;
943
69.0k
  {
944
69.0k
    std::lock_guard<decltype(mutex_)> lock(mutex_);
945
69.0k
    ProcessedTablesMap processed_tables;
946
947
97.9k
    for (const TabletLocationsPB& loc : locations) {
948
97.9k
      const std::string& tablet_id = loc.tablet_id();
949
      // Next, update the tablet caches.
950
97.9k
      RemoteTabletPtr remote = FindPtrOrNull(tablets_by_id_, tablet_id);
951
952
      // First, update the tserver cache, needed for the Refresh calls below.
953
289k
      for (const TabletLocationsPB_ReplicaPB& r : loc.replicas()) {
954
289k
        UpdateTabletServerUnlocked(r.ts_info());
955
289k
      }
956
957
97.9k
      VersionedTablePartitionListPtr colocated_table_partition_list;
958
97.9k
      if (loc.table_ids_size() > 1 && 
lookup_rpc1.29k
&&
lookup_rpc->table()1.29k
) {
959
        // When table_ids_size() == 1 we only receive info for the single table from the master
960
        // and we already have TableData initialized for it (this is done before sending an RPC to
961
        // the master). And when table_ids_size() > 1, it means we got response for lookup RPC for
962
        // co-located table and we can re-use TableData::partition_list from the table that was
963
        // requested by MetaCache::LookupTabletByKey caller for other tables co-located with this
964
        // one (since all co-located tables sharing the same set of tablets have the same table
965
        // partition list and now we have list of them returned by the master).
966
1.28k
        const auto lookup_table_it = tables_.find(lookup_rpc->table()->id());
967
1.28k
        if (lookup_table_it != tables_.end()) {
968
1.28k
          colocated_table_partition_list = lookup_table_it->second.partition_list;
969
1.28k
        } else {
970
          // We don't want to crash the server in that case for production, since this is not a
971
          // correctness issue, but gives some performance degradation on first lookups for
972
          // co-located tables.
973
          // But we do want it to crash in debug, so we can more reliably catch this if it happens.
974
0
          LOG_WITH_PREFIX(DFATAL) << Format(
975
0
              "Internal error: got response for lookup RPC for co-located table, but MetaCache "
976
0
              "table data wasn't initialized with partition list for this table. RPC: $0",
977
0
              AsString(lookup_rpc));
978
0
        }
979
1.28k
      }
980
981
757k
      for (const std::string& table_id : loc.table_ids()) {
982
757k
        auto& processed_table = processed_tables[table_id];
983
757k
        std::map<PartitionKey, RemoteTabletPtr>* tablets_by_key = nullptr;
984
985
757k
        auto table_it = tables_.find(table_id);
986
757k
        if (table_it == tables_.end() && 
loc.table_ids_size() > 1610k
&&
987
757k
            
colocated_table_partition_list596k
) {
988
590k
          table_it = InitTableDataUnlocked(table_id, colocated_table_partition_list);
989
590k
        }
990
757k
        if (table_it != tables_.end()) {
991
737k
          auto& table_data = table_it->second;
992
993
737k
          const auto msg_formatter = [&] {
994
0
            return Format(
995
0
                "Received table $0 partitions version: $1, MetaCache's table partitions version: "
996
0
                "$2",
997
0
                table_id, table_partition_list_version, table_data.partition_list->version);
998
0
          };
999
737k
          
VLOG_WITH_PREFIX_AND_FUNC0
(4) << msg_formatter()0
;
1000
737k
          if (table_partition_list_version.has_value()) {
1001
737k
            if (table_partition_list_version.get() != table_data.partition_list->version) {
1002
0
              return STATUS(
1003
0
                  TryAgain, msg_formatter(),
1004
0
                  ClientError(ClientErrorCode::kTablePartitionListIsStale));
1005
0
            }
1006
            // We need to guarantee that table_data.tablets_by_partition cache corresponds to
1007
            // table_data.partition_list (see comments for TableData::partitions).
1008
            // So, we don't update tablets_by_partition cache if we don't know table partitions
1009
            // version for both response and TableData.
1010
            // This only can happen for those LookupTabletById requests that don't specify table,
1011
            // because they don't care about partitions changing.
1012
737k
            tablets_by_key = &table_data.tablets_by_partition;
1013
737k
          }
1014
737k
        }
1015
1016
757k
        if (remote) {
1017
          // Partition should not have changed.
1018
666k
          DCHECK_EQ(loc.partition().partition_key_start(),
1019
666k
                    remote->partition().partition_key_start());
1020
666k
          DCHECK_EQ(loc.partition().partition_key_end(),
1021
666k
                    remote->partition().partition_key_end());
1022
1023
          // For colocated tables, RemoteTablet already exists because it was processed
1024
          // in a previous iteration of the for loop (for loc.table_ids()).
1025
          // We need to add this tablet to the current table's tablets_by_key map.
1026
666k
          if (tablets_by_key) {
1027
660k
            (*tablets_by_key)[remote->partition().partition_key_start()] = remote;
1028
660k
          }
1029
1030
666k
          
VLOG_WITH_PREFIX0
(5) << "Refreshing tablet " << tablet_id << ": "
1031
0
                              << loc.ShortDebugString();
1032
666k
        } else {
1033
90.3k
          
VLOG_WITH_PREFIX0
(5) << "Caching tablet " << tablet_id << ": " << loc.ShortDebugString()0
;
1034
1035
90.3k
          Partition partition;
1036
90.3k
          Partition::FromPB(loc.partition(), &partition);
1037
90.3k
          remote = new RemoteTablet(
1038
90.3k
              tablet_id, partition, table_partition_list_version, loc.split_depth(),
1039
90.3k
              loc.split_parent_tablet_id());
1040
1041
90.3k
          CHECK(tablets_by_id_.emplace(tablet_id, remote).second);
1042
90.3k
          if (tablets_by_key) {
1043
77.1k
            auto emplace_result = tablets_by_key->emplace(partition.partition_key_start(), remote);
1044
77.1k
            if (!emplace_result.second) {
1045
31
              const auto& old_tablet = emplace_result.first->second;
1046
31
              if (old_tablet->split_depth() < remote->split_depth()) {
1047
                // Only replace with tablet of higher split_depth.
1048
31
                emplace_result.first->second = remote;
1049
31
              } else {
1050
                // If split_depth is the same - it should be the same tablet.
1051
0
                if (old_tablet->split_depth() == loc.split_depth()
1052
0
                    && old_tablet->tablet_id() != tablet_id) {
1053
0
                  const auto error_msg = Format(
1054
0
                      "Can't replace tablet $0 with $1 at partition_key_start $2, split_depth $3",
1055
0
                      old_tablet->tablet_id(), tablet_id, loc.partition().partition_key_start(),
1056
0
                      old_tablet->split_depth());
1057
0
                  LOG_WITH_PREFIX(DFATAL) << error_msg;
1058
                  // Just skip updating this tablet for release build.
1059
0
                }
1060
0
              }
1061
31
            }
1062
77.1k
          }
1063
90.3k
          MaybeUpdateClientRequests(*remote);
1064
90.3k
        }
1065
757k
        remote->Refresh(ts_cache_, loc.replicas());
1066
757k
        remote->SetExpectedReplicas(loc.expected_live_replicas(), loc.expected_read_replicas());
1067
757k
        if (table_partition_list_version.has_value()) {
1068
737k
          remote->MakeLastKnownPartitionListVersionAtLeast(*table_partition_list_version);
1069
737k
        }
1070
757k
        if (lookup_rpc) {
1071
756k
          lookup_rpc->UpdateProcessedTable(loc, remote, &processed_table);
1072
756k
        }
1073
757k
      }
1074
1075
97.9k
      auto it = tablet_lookups_by_id_.find(tablet_id);
1076
97.9k
      if (it != tablet_lookups_by_id_.end()) {
1077
44.2k
        while (auto* lookup = it->second.lookups.Pop()) {
1078
27.6k
          to_notify.emplace_back(std::move(lookup->callback),
1079
27.6k
                                 LookupCallbackVisitor(remote));
1080
27.6k
          delete lookup;
1081
27.6k
        }
1082
16.5k
      }
1083
97.9k
    }
1084
69.0k
    if (lookup_rpc) {
1085
68.9k
      lookup_rpc->AddCallbacksToBeNotified(processed_tables, &tables_, &to_notify);
1086
68.9k
      lookup_rpc->CleanupRequest();
1087
68.9k
    }
1088
69.0k
  }
1089
1090
158k
  for (const auto& callback_and_param : to_notify) {
1091
158k
    boost::apply_visitor(callback_and_param.second, callback_and_param.first);
1092
158k
  }
1093
1094
69.0k
  return Status::OK();
1095
69.0k
}
1096
1097
90.3k
void MetaCache::MaybeUpdateClientRequests(const RemoteTablet& tablet) {
1098
90.3k
  
VLOG_WITH_PREFIX_AND_FUNC0
(2) << "Tablet: " << tablet.tablet_id()
1099
0
                    << " split parent: " << tablet.split_parent_tablet_id();
1100
90.3k
  if (tablet.split_parent_tablet_id().empty()) {
1101
90.3k
    
VLOG_WITH_PREFIX0
(2) << "Tablet " << tablet.tablet_id() << " is not a result of split"0
;
1102
90.3k
    return;
1103
90.3k
  }
1104
  // TODO: MetaCache is a friend of Client and tablet_requests_mutex_ with tablet_requests_ are
1105
  // public members of YBClient::Data. Consider refactoring that.
1106
68
  std::lock_guard<simple_spinlock> request_lock(client_->data_->tablet_requests_mutex_);
1107
68
  auto& tablet_requests = client_->data_->tablet_requests_;
1108
68
  const auto requests_it = tablet_requests.find(tablet.split_parent_tablet_id());
1109
68
  if (requests_it == tablet_requests.end()) {
1110
6
    
VLOG_WITH_PREFIX0
(2) << "Can't find request_id_seq for tablet "
1111
0
                        << tablet.split_parent_tablet_id();
1112
    // This can happen if client wasn't active (for example node was partitioned away) during
1113
    // sequence of splits that resulted in `tablet` creation, so we don't have info about `tablet`
1114
    // split parent.
1115
    // In this case we set request_id_seq to special value and will reset it on getting
1116
    // "request id is less than min" error. We will use min request ID plus 2^24 (there wouldn't be
1117
    // 2^24 client requests in progress from the same client to the same tablet, so it is safe to do
1118
    // this).
1119
6
    tablet_requests.emplace(
1120
6
        tablet.tablet_id(),
1121
6
        YBClient::Data::TabletRequests {
1122
6
            .request_id_seq = kInitializeFromMinRunning
1123
6
        });
1124
6
    return;
1125
6
  }
1126
62
  
VLOG_WITH_PREFIX0
(2) << "Setting request_id_seq for tablet " << tablet.tablet_id()
1127
0
                      << " from tablet " << tablet.split_parent_tablet_id() << " to "
1128
0
                      << requests_it->second.request_id_seq;
1129
62
  tablet_requests[tablet.tablet_id()].request_id_seq = requests_it->second.request_id_seq;
1130
62
}
1131
1132
std::unordered_map<TableId, TableData>::iterator MetaCache::InitTableDataUnlocked(
1133
640k
    const TableId& table_id, const VersionedTablePartitionListPtr& partitions) {
1134
640k
  
VLOG_WITH_PREFIX_AND_FUNC0
(4) << Format(
1135
0
      "MetaCache initializing TableData ($0 tables) for table $1: $2, "
1136
0
      "partition_list_version: $3",
1137
0
      tables_.size(), table_id, tables_.count(table_id), partitions->version);
1138
640k
  return tables_.emplace(
1139
640k
      std::piecewise_construct, std::forward_as_tuple(table_id),
1140
640k
      std::forward_as_tuple(partitions)).first;
1141
640k
}
1142
1143
19.2k
void MetaCache::InvalidateTableCache(const YBTable& table) {
1144
19.2k
  const auto& table_id = table.id();
1145
19.2k
  const auto table_partition_list = table.GetVersionedPartitions();
1146
19.2k
  
VLOG_WITH_PREFIX_AND_FUNC0
(1) << Format(
1147
0
      "table: $0, table.partition_list.version: $1", table_id, table_partition_list->version);
1148
1149
19.2k
  std::vector<LookupCallback> to_notify;
1150
1151
19.2k
  auto invalidate_needed = [this, &table_id, &table_partition_list](const auto& it) {
1152
19.2k
    const auto table_data_partition_list_version = it->second.partition_list->version;
1153
19.2k
    
VLOG_WITH_PREFIX0
(1) << Format(
1154
0
        "tables_[$0].partition_list.version: $1", table_id, table_data_partition_list_version);
1155
    // Only need to invalidate table cache, if it is has partition list version older than table's
1156
    // one.
1157
19.2k
    return table_data_partition_list_version < table_partition_list->version;
1158
19.2k
  };
1159
1160
19.2k
  {
1161
19.2k
    SharedLock<decltype(mutex_)> lock(mutex_);
1162
19.2k
    auto it = tables_.find(table_id);
1163
19.2k
    if (it != tables_.end()) {
1164
19.2k
      if (!invalidate_needed(it)) {
1165
19.1k
        return;
1166
19.1k
      }
1167
19.2k
    }
1168
19.2k
  }
1169
1170
31
  {
1171
31
    UniqueLock<decltype(mutex_)> lock(mutex_);
1172
31
    auto it = tables_.find(table_id);
1173
31
    if (it != tables_.end()) {
1174
31
      if (!invalidate_needed(it)) {
1175
0
        return;
1176
0
      }
1177
31
    } else {
1178
0
      it = InitTableDataUnlocked(table_id, table_partition_list);
1179
      // Nothing to invalidate, we have just initiliazed it first time.
1180
0
      return;
1181
0
    }
1182
1183
31
    auto& table_data = it->second;
1184
1185
    // Some partitions could be mapped to tablets that have been split and we need to re-fetch
1186
    // info about tablets serving partitions.
1187
68
    for (auto& tablet : table_data.tablets_by_partition) {
1188
68
      tablet.second->MarkStale();
1189
68
    }
1190
1191
31
    for (auto& tablet : table_data.all_tablets) {
1192
0
      tablet->MarkStale();
1193
0
    }
1194
    // TODO(tsplit): Optimize to retry only necessary lookups inside ProcessTabletLocations,
1195
    // detect which need to be retried by GetTableLocationsResponsePB.partition_list_version.
1196
33
    for (auto& group_lookups : table_data.tablet_lookups_by_group) {
1197
33
      while (auto* lookup = group_lookups.second.lookups.Pop()) {
1198
0
        to_notify.push_back(std::move(lookup->callback));
1199
0
        delete lookup;
1200
0
      }
1201
33
    }
1202
31
    table_data.tablet_lookups_by_group.clear();
1203
1204
    // Only update partitions here after invalidating TableData cache to avoid inconsistencies.
1205
    // See https://github.com/yugabyte/yugabyte-db/issues/6890.
1206
31
    table_data.partition_list = table_partition_list;
1207
31
  }
1208
0
  for (const auto& callback : to_notify) {
1209
0
    const auto s = STATUS_EC_FORMAT(
1210
0
        TryAgain, ClientError(ClientErrorCode::kMetaCacheInvalidated),
1211
0
        "MetaCache for table $0 has been invalidated.", table_id);
1212
0
    boost::apply_visitor(LookupCallbackVisitor(s), callback);
1213
0
  }
1214
31
}
1215
1216
class MetaCache::CallbackNotifier {
1217
 public:
1218
434
  explicit CallbackNotifier(const Status& status) : status_(status) {}
1219
1220
1.32k
  void Add(LookupCallback&& callback) {
1221
1.32k
    callbacks_.push_back(std::move(callback));
1222
1.32k
  }
1223
1224
2
  void SetStatus(const Status& status) {
1225
2
    status_ = status;
1226
2
  }
1227
1228
434
  ~CallbackNotifier() {
1229
1.32k
    for (const auto& callback : callbacks_) {
1230
1.32k
      boost::apply_visitor(LookupCallbackVisitor(status_), callback);
1231
1.32k
    }
1232
434
  }
1233
 private:
1234
  std::vector<LookupCallback> callbacks_;
1235
  Status status_;
1236
};
1237
1238
CoarseTimePoint MetaCache::LookupFailed(
1239
    const Status& status, int64_t request_no, const ToStringable& lookup_id,
1240
    LookupDataGroup* lookup_data_group,
1241
434
    CallbackNotifier* notifier) {
1242
434
  std::vector<LookupData*> retry;
1243
434
  auto now = CoarseMonoClock::Now();
1244
434
  CoarseTimePoint max_deadline;
1245
1246
2.04k
  while (auto* lookup = lookup_data_group->lookups.Pop()) {
1247
1.60k
    if (!status.IsTimedOut() || 
lookup->deadline <= now1.18k
) {
1248
1.32k
      notifier->Add(std::move(lookup->callback));
1249
1.32k
      delete lookup;
1250
1.32k
    } else {
1251
284
      max_deadline = std::max(max_deadline, lookup->deadline);
1252
284
      retry.push_back(lookup);
1253
284
    }
1254
1.60k
  }
1255
1256
434
  if (retry.empty()) {
1257
419
    lookup_data_group->Finished(request_no, lookup_id);
1258
419
  } else {
1259
284
    for (auto* lookup : retry) {
1260
284
      lookup_data_group->lookups.Push(lookup);
1261
284
    }
1262
15
  }
1263
1264
434
  return max_deadline;
1265
434
}
1266
1267
class LookupByIdRpc : public LookupRpc {
1268
 public:
1269
  LookupByIdRpc(const scoped_refptr<MetaCache>& meta_cache,
1270
                const TabletId& tablet_id,
1271
                const std::shared_ptr<const YBTable>& table,
1272
                master::IncludeInactive include_inactive,
1273
                int64_t request_no,
1274
                CoarseTimePoint deadline,
1275
                int64_t lookups_without_new_replicas)
1276
      : LookupRpc(meta_cache, table, request_no, deadline),
1277
        tablet_id_(tablet_id),
1278
15.1k
        include_inactive_(include_inactive) {
1279
15.1k
    if (lookups_without_new_replicas != 0) {
1280
1.44k
      send_delay_ = std::min(
1281
1.44k
          lookups_without_new_replicas * FLAGS_meta_cache_lookup_throttling_step_ms,
1282
1.44k
          FLAGS_meta_cache_lookup_throttling_max_delay_ms) * 1ms;
1283
1.44k
    }
1284
15.1k
  }
1285
1286
850
  std::string ToString() const override {
1287
850
    return Format("LookupByIdRpc(tablet: $0, num_attempts: $1)", tablet_id_, num_attempts());
1288
850
  }
1289
1290
16.6k
  void SendRpc() override {
1291
16.6k
    if (send_delay_) {
1292
1.44k
      auto delay = send_delay_;
1293
1.44k
      send_delay_ = MonoDelta();
1294
1.44k
      auto status = mutable_retrier()->DelayedRetry(this, Status::OK(), delay);
1295
1.44k
      if (!status.ok()) {
1296
0
        Finished(status);
1297
0
      }
1298
1.44k
      return;
1299
1.44k
    }
1300
1301
15.1k
    LookupRpc::SendRpc();
1302
15.1k
  }
1303
1304
15.1k
  void CallRemoteMethod() override {
1305
    // Fill out the request.
1306
15.1k
    req_.clear_tablet_ids();
1307
15.1k
    req_.add_tablet_ids(tablet_id_);
1308
15.1k
    if (table()) {
1309
2.18k
      req_.set_table_id(table()->id());
1310
2.18k
    }
1311
15.1k
    req_.set_include_inactive(include_inactive_);
1312
1313
15.1k
    master_client_proxy()->GetTabletLocationsAsync(
1314
15.1k
        req_, &resp_, mutable_retrier()->mutable_controller(),
1315
15.1k
        std::bind(&LookupByIdRpc::Finished, this, Status::OK()));
1316
15.1k
  }
1317
1318
  void AddCallbacksToBeNotified(
1319
    const ProcessedTablesMap& processed_tables,
1320
    std::unordered_map<TableId, TableData>* tables,
1321
14.6k
    std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override {
1322
    // Nothing to do when looking up by id.
1323
14.6k
    return;
1324
14.6k
  }
1325
1326
  void UpdateProcessedTable(const TabletLocationsPB& loc,
1327
                            RemoteTabletPtr remote,
1328
23.8k
                            ProcessedTablesMap::mapped_type* processed_table) override {
1329
23.8k
    return;
1330
23.8k
  }
1331
1332
 private:
1333
15.0k
  void ProcessResponse(const Status& status) override {
1334
15.0k
    DoProcessResponse(status, resp_);
1335
15.0k
  }
1336
1337
15.1k
  Status ResponseStatus() override {
1338
15.1k
    return StatusFromResp(resp_);
1339
15.1k
  }
1340
1341
14.6k
  void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS {
1342
14.6k
    auto& lookups = meta_cache()->tablet_lookups_by_id_;
1343
14.6k
    auto it = lookups.find(tablet_id_);
1344
14.6k
    TabletIdLookup tablet_id_lookup(tablet_id_);
1345
14.6k
    if (it != lookups.end()) {
1346
14.6k
      it->second.Finished(request_no(), tablet_id_lookup);
1347
14.6k
    } else {
1348
0
      LOG_WITH_PREFIX(INFO) << "Cleanup request for unknown tablet: "
1349
0
                            << tablet_id_lookup.ToString();
1350
0
    }
1351
14.6k
  }
1352
1353
377
  void NotifyFailure(const Status& status) override {
1354
377
    meta_cache()->LookupByIdFailed(
1355
377
        tablet_id_, table(), include_inactive_,
1356
377
        GetPartitionListVersion(resp_), request_no(), status);
1357
377
  }
1358
1359
  Status ProcessTabletLocations(
1360
      const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations,
1361
14.6k
      boost::optional<PartitionListVersion> table_partition_list_version) override {
1362
14.6k
    return meta_cache()->ProcessTabletLocations(
1363
14.6k
        locations, table_partition_list_version, this);
1364
14.6k
  }
1365
1366
  // Tablet to lookup.
1367
  const TabletId tablet_id_;
1368
1369
  // Whether or not to lookup inactive (hidden) tablets.
1370
  master::IncludeInactive include_inactive_;
1371
1372
  // Request body.
1373
  master::GetTabletLocationsRequestPB req_;
1374
1375
  // Response body.
1376
  master::GetTabletLocationsResponsePB resp_;
1377
1378
  MonoDelta send_delay_;
1379
};
1380
1381
class LookupFullTableRpc : public LookupRpc {
1382
 public:
1383
  LookupFullTableRpc(const scoped_refptr<MetaCache>& meta_cache,
1384
                     const std::shared_ptr<const YBTable>& table,
1385
                     int64_t request_no,
1386
                     CoarseTimePoint deadline)
1387
0
      : LookupRpc(meta_cache, table, request_no, deadline) {
1388
0
  }
1389
1390
0
  std::string ToString() const override {
1391
0
    return Format("LookupFullTableRpc($0, $1)", table()->name(), num_attempts());
1392
0
  }
1393
1394
0
  void CallRemoteMethod() override {
1395
    // Fill out the request.
1396
0
    req_.mutable_table()->set_table_id(table()->id());
1397
    // The end partition key is left unset intentionally so that we'll prefetch
1398
    // some additional tablets.
1399
0
    master_client_proxy()->GetTableLocationsAsync(
1400
0
        req_, &resp_, mutable_retrier()->mutable_controller(),
1401
0
        std::bind(&LookupFullTableRpc::Finished, this, Status::OK()));
1402
0
  }
1403
1404
  void AddCallbacksToBeNotified(
1405
      const ProcessedTablesMap& processed_tables,
1406
      std::unordered_map<TableId, TableData>* tables,
1407
0
      std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override {
1408
0
    for (const auto& processed_table : processed_tables) {
1409
      // Handle tablet range
1410
0
      const auto it = tables->find(processed_table.first);
1411
0
      if (it == tables->end()) {
1412
0
        continue;
1413
0
      }
1414
0
      auto& table_data = it->second;
1415
0
      auto& full_table_lookups = table_data.full_table_lookups;
1416
0
      while (auto* lookup = full_table_lookups.lookups.Pop()) {
1417
0
        std::vector<internal::RemoteTabletPtr> remote_tablets;
1418
0
        for (const auto& entry : processed_table.second) {
1419
0
          remote_tablets.push_back(entry.second);
1420
0
        }
1421
0
        table_data.all_tablets = remote_tablets;
1422
0
        to_notify->emplace_back(std::move(lookup->callback),
1423
0
                                LookupCallbackVisitor(std::move(remote_tablets)));
1424
0
        delete lookup;
1425
0
      }
1426
0
    }
1427
0
  }
1428
1429
  void UpdateProcessedTable(const TabletLocationsPB& loc,
1430
                            RemoteTabletPtr remote,
1431
0
                            ProcessedTablesMap::mapped_type* processed_table) override {
1432
0
    processed_table->emplace(loc.partition().partition_key_start(), remote);
1433
0
  }
1434
1435
 private:
1436
0
  void ProcessResponse(const Status& status) override {
1437
0
    DoProcessResponse(status, resp_);
1438
0
  }
1439
1440
0
  Status ResponseStatus() override {
1441
0
    return StatusFromResp(resp_);
1442
0
  }
1443
1444
0
  void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS {
1445
0
    const auto it = meta_cache()->tables_.find(table()->id());
1446
0
    if (it == meta_cache()->tables_.end()) {
1447
0
      return;
1448
0
    }
1449
0
    auto& table_data = it->second;
1450
0
    auto full_table_lookup = FullTableLookup(table()->id());
1451
0
    table_data.full_table_lookups.Finished(request_no(), full_table_lookup, false);
1452
0
  }
1453
1454
0
  void NotifyFailure(const Status& status) override {
1455
0
    meta_cache()->LookupFullTableFailed(table(), request_no(), status);
1456
0
  }
1457
1458
  Status ProcessTabletLocations(
1459
      const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations,
1460
0
      boost::optional<PartitionListVersion> table_partition_list_version) override {
1461
0
    return meta_cache()->ProcessTabletLocations(
1462
0
        locations, table_partition_list_version, this);
1463
0
  }
1464
1465
  // Request body.
1466
  GetTableLocationsRequestPB req_;
1467
1468
  // Response body.
1469
  GetTableLocationsResponsePB resp_;
1470
};
1471
1472
class LookupByKeyRpc : public LookupRpc {
1473
 public:
1474
  LookupByKeyRpc(const scoped_refptr<MetaCache>& meta_cache,
1475
                 const std::shared_ptr<const YBTable>& table,
1476
                 const VersionedPartitionGroupStartKey& partition_group_start,
1477
                 int64_t request_no,
1478
                 CoarseTimePoint deadline)
1479
      : LookupRpc(meta_cache, table, request_no, deadline),
1480
54.3k
        partition_group_start_(partition_group_start) {
1481
54.3k
  }
1482
1483
223
  std::string ToString() const override {
1484
223
    return Format(
1485
223
        "GetTableLocations { table_name: $0, table_id: $1, partition_start_key: $2, "
1486
223
        "partition_list_version: $3, "
1487
223
        "request_no: $4, num_attempts: $5 }",
1488
223
        table()->name(),
1489
223
        table()->id(),
1490
223
        table()->partition_schema().PartitionKeyDebugString(
1491
223
            *partition_group_start_.key, internal::GetSchema(table()->schema())),
1492
223
        partition_group_start_.partition_list_version,
1493
223
        request_no(),
1494
223
        num_attempts());
1495
223
  }
1496
1497
0
  const string& table_id() const { return table()->id(); }
1498
1499
54.3k
  void CallRemoteMethod() override {
1500
    // Fill out the request.
1501
54.3k
    req_.mutable_table()->set_table_id(table()->id());
1502
54.3k
    req_.set_partition_key_start(*partition_group_start_.key);
1503
54.3k
    req_.set_max_returned_locations(kPartitionGroupSize);
1504
1505
    // The end partition key is left unset intentionally so that we'll prefetch
1506
    // some additional tablets.
1507
54.3k
    master_client_proxy()->GetTableLocationsAsync(
1508
54.3k
        req_, &resp_, mutable_retrier()->mutable_controller(),
1509
54.3k
        std::bind(&LookupByKeyRpc::Finished, this, Status::OK()));
1510
54.3k
  }
1511
1512
  void AddCallbacksToBeNotified(
1513
      const ProcessedTablesMap& processed_tables,
1514
      std::unordered_map<TableId, TableData>* tables,
1515
54.2k
      std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override {
1516
704k
    for (const auto& processed_table : processed_tables) {
1517
704k
      const auto table_it = tables->find(processed_table.first);
1518
704k
      if (table_it == tables->end()) {
1519
0
        continue;
1520
0
      }
1521
704k
      auto& table_data = table_it->second;
1522
      // This should be guaranteed by ProcessTabletLocations before we get here:
1523
704k
      DCHECK(table_data.partition_list->version == partition_group_start_.partition_list_version)
1524
0
          << "table_data.partition_list->version: " << table_data.partition_list->version
1525
0
          << " partition_group_start_.partition_list_version: "
1526
0
          << partition_group_start_.partition_list_version;
1527
704k
      const auto lookup_by_group_iter =
1528
704k
          table_data.tablet_lookups_by_group.find(*partition_group_start_.key);
1529
704k
      if (lookup_by_group_iter != table_data.tablet_lookups_by_group.end()) {
1530
55.6k
        
VLOG_WITH_PREFIX_AND_FUNC0
(4)
1531
0
            << "Checking tablet_lookups_by_group for partition_group_start: "
1532
0
            << Slice(*partition_group_start_.key).ToDebugHexString();
1533
55.6k
        auto& lookups_group = lookup_by_group_iter->second;
1534
186k
        while (auto* lookup = lookups_group.lookups.Pop()) {
1535
130k
          auto remote_it = processed_table.second.find(*lookup->partition_start);
1536
130k
          auto lookup_visitor = LookupCallbackVisitor(
1537
130k
              remote_it != processed_table.second.end() ? remote_it->second : 
nullptr0
);
1538
130k
          to_notify->emplace_back(std::move(lookup->callback), lookup_visitor);
1539
130k
          delete lookup;
1540
130k
        }
1541
55.6k
      }
1542
704k
    }
1543
54.2k
  }
1544
1545
  void UpdateProcessedTable(const TabletLocationsPB& loc,
1546
                            RemoteTabletPtr remote,
1547
732k
                            ProcessedTablesMap::mapped_type* processed_table) override {
1548
732k
    processed_table->emplace(loc.partition().partition_key_start(), remote);
1549
732k
  }
1550
1551
 private:
1552
54.3k
  void ProcessResponse(const Status& status) override {
1553
54.3k
    DoProcessResponse(status, resp_);
1554
54.3k
  }
1555
1556
54.3k
  Status ResponseStatus() override {
1557
54.3k
    return StatusFromResp(resp_);
1558
54.3k
  }
1559
1560
54.2k
  Status VerifyResponse() override {
1561
    // Note: if LookupByIdRpc response has no partition list version this means this response
1562
    // is from master that doesn't support tablet splitting and it is OK to treat it as version 0
1563
    // (and 0 return value of resp_.partition_list_version() is OK).
1564
54.2k
    const auto req_partition_list_version = partition_group_start_.partition_list_version;
1565
54.2k
    const auto resp_partition_list_version = resp_.partition_list_version();
1566
1567
54.2k
    const auto versions_formatter = [&] {
1568
0
      return Format(
1569
0
          "RPC: $0, response partition_list_version: $1", this->ToString(),
1570
0
          resp_partition_list_version);
1571
0
    };
1572
1573
54.2k
    if (resp_partition_list_version < req_partition_list_version) {
1574
      // This means an issue on master side, table partition list version shouldn't decrease on
1575
      // master.
1576
0
      const auto msg = Format("Ignoring response for obsolete RPC call. $0", versions_formatter());
1577
0
      LOG_WITH_PREFIX(DFATAL) << msg;
1578
0
      return STATUS(IllegalState, msg);
1579
0
    }
1580
54.2k
    if (resp_partition_list_version > req_partition_list_version) {
1581
      // This request is for partition_group_start calculated based on obsolete table partition
1582
      // list.
1583
0
      const auto msg = Format(
1584
0
          "Cached table partition list version is obsolete, refresh required. $0",
1585
0
          versions_formatter());
1586
0
      VLOG_WITH_PREFIX_AND_FUNC(3) << msg;
1587
0
      return STATUS(TryAgain, msg, ClientError(ClientErrorCode::kTablePartitionListIsStale));
1588
0
    }
1589
    // resp_partition_list_version == req_partition_list_version
1590
54.2k
    return Status::OK();
1591
54.2k
  }
1592
1593
54.2k
  void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS {
1594
54.2k
    const auto it = meta_cache()->tables_.find(table()->id());
1595
54.2k
    if (it == meta_cache()->tables_.end()) {
1596
0
      return;
1597
0
    }
1598
54.2k
    auto& table_data = it->second;
1599
    // This should be guaranteed by ProcessTabletLocations before we get here:
1600
54.2k
    DCHECK(table_data.partition_list->version == partition_group_start_.partition_list_version)
1601
0
        << "table_data.partition_list->version: " << table_data.partition_list->version
1602
0
        << " partition_group_start_.partition_list_version: "
1603
0
        << partition_group_start_.partition_list_version;
1604
54.2k
    const auto lookup_by_group_iter = table_data.tablet_lookups_by_group.find(
1605
54.2k
        *partition_group_start_.key);
1606
54.2k
    TablePartitionLookup tablet_partition_lookup(table()->id(), partition_group_start_);
1607
54.2k
    if (lookup_by_group_iter != table_data.tablet_lookups_by_group.end()) {
1608
54.2k
      lookup_by_group_iter->second.Finished(request_no(), tablet_partition_lookup, false);
1609
54.2k
    } else {
1610
0
      LOG_WITH_PREFIX(INFO) << "Cleanup request for unknown partition group: "
1611
0
                << tablet_partition_lookup.ToString();
1612
0
    }
1613
54.2k
  }
1614
1615
57
  void NotifyFailure(const Status& status) override {
1616
57
    meta_cache()->LookupByKeyFailed(
1617
57
        table(), partition_group_start_, resp_.partition_list_version(), request_no(), status);
1618
57
  }
1619
1620
  Status ProcessTabletLocations(
1621
      const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations,
1622
54.2k
      boost::optional<PartitionListVersion> table_partition_list_version) override {
1623
18.4E
    VLOG_WITH_PREFIX_AND_FUNC(2) << "partition_group_start: " << partition_group_start_.ToString();
1624
    // This condition is guaranteed by VerifyResponse function:
1625
54.2k
    CHECK(resp_.partition_list_version() == partition_group_start_.partition_list_version);
1626
1627
54.2k
    return meta_cache()->ProcessTabletLocations(locations, table_partition_list_version, this);
1628
54.2k
  }
1629
1630
  // Encoded partition group start key to lookup.
1631
  VersionedPartitionGroupStartKey partition_group_start_;
1632
1633
  // Request body.
1634
  GetTableLocationsRequestPB req_;
1635
1636
  // Response body.
1637
  GetTableLocationsResponsePB resp_;
1638
};
1639
1640
void MetaCache::LookupByKeyFailed(
1641
    const std::shared_ptr<const YBTable>& table,
1642
    const VersionedPartitionGroupStartKey& partition_group_start,
1643
    PartitionListVersion response_partition_list_version,
1644
57
    int64_t request_no, const Status& status) {
1645
57
  const auto req_partition_list_version = partition_group_start.partition_list_version;
1646
57
  
VLOG_WITH_PREFIX0
(1) << "Lookup for table " << table->id() << " and partition group start "
1647
0
                      << Slice(*partition_group_start.key).ToDebugHexString()
1648
0
                      << ", request partition list version: " << req_partition_list_version
1649
0
                      << ", response partition list version: " << response_partition_list_version
1650
0
                      << " failed with: " << status;
1651
1652
57
  CallbackNotifier notifier(status);
1653
57
  CoarseTimePoint max_deadline;
1654
57
  {
1655
57
    std::lock_guard<decltype(mutex_)> lock(mutex_);
1656
57
    auto it = tables_.find(table->id());
1657
57
    if (it == tables_.end()) {
1658
0
      return;
1659
0
    }
1660
1661
57
    auto& table_data = it->second;
1662
57
    const auto table_data_partition_list_version = table_data.partition_list->version;
1663
57
    const auto versions_formatter = [&] {
1664
0
      return Format(
1665
0
          "MetaCache's table $0 partition list version: $1, stored in RPC call: $2, received: $3",
1666
0
          table->id(), table_data_partition_list_version,
1667
0
          req_partition_list_version, response_partition_list_version);
1668
0
    };
1669
1670
57
    if (table_data_partition_list_version < req_partition_list_version) {
1671
      // MetaCache partition list version is older than stored in LookupByKeyRpc for which we've
1672
      // received an answer.
1673
      // This shouldn't happen, because MetaCache partition list version for each table couldn't
1674
      // decrease and we store it inside LookupByKeyRpc when creating an RPC.
1675
0
      LOG_WITH_PREFIX(FATAL) << Format(
1676
0
          "Cached table partition list version is older than stored in RPC call. $0",
1677
0
          versions_formatter());
1678
57
    } else if (table_data_partition_list_version > req_partition_list_version) {
1679
      // MetaCache table  partition list version has updated since we've sent this RPC.
1680
      // We've already failed and cleaned all registered lookups for the table on MetaCache table
1681
      // partition list update, so we should ignore responses as well.
1682
0
      VLOG_WITH_PREFIX_AND_FUNC(3) << Format(
1683
0
          "Cached table partition list version is newer than stored in RPC call, ignoring "
1684
0
          "failure response. $0", versions_formatter());
1685
0
      return;
1686
0
    }
1687
1688
    // Since table_data_partition_list_version == req_partition_list_version,
1689
    // we will get tablet lookups for this exact RPC call there:
1690
57
    auto key_lookup_iterator = table_data.tablet_lookups_by_group.find(*partition_group_start.key);
1691
57
    if (key_lookup_iterator != table_data.tablet_lookups_by_group.end()) {
1692
57
      auto& lookup_data_group = key_lookup_iterator->second;
1693
57
      max_deadline = LookupFailed(status, request_no,
1694
57
                                  TablePartitionLookup(table->id(), partition_group_start),
1695
57
                                  &lookup_data_group, &notifier);
1696
57
    }
1697
57
  }
1698
1699
57
  if (max_deadline != CoarseTimePoint()) {
1700
10
    auto rpc = std::make_shared<LookupByKeyRpc>(
1701
10
        this, table, partition_group_start, request_no, max_deadline);
1702
10
    client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1703
10
  }
1704
57
}
1705
1706
void MetaCache::LookupFullTableFailed(const std::shared_ptr<const YBTable>& table,
1707
0
                                      int64_t request_no, const Status& status) {
1708
0
  VLOG_WITH_PREFIX(1) << "Lookup for table " << table->id() << " failed with: " << status;
1709
1710
0
  CallbackNotifier notifier(status);
1711
0
  CoarseTimePoint max_deadline;
1712
0
  {
1713
0
    std::lock_guard<decltype(mutex_)> lock(mutex_);
1714
0
    auto it = tables_.find(table->id());
1715
0
    if (it == tables_.end()) {
1716
0
      return;
1717
0
    }
1718
1719
0
    max_deadline = LookupFailed(status, request_no, FullTableLookup(table->id()),
1720
0
                                &it->second.full_table_lookups, &notifier);
1721
0
  }
1722
1723
0
  if (max_deadline != CoarseTimePoint()) {
1724
0
    auto rpc = std::make_shared<LookupFullTableRpc>(this, table, request_no, max_deadline);
1725
0
    client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1726
0
  }
1727
0
}
1728
1729
void MetaCache::LookupByIdFailed(
1730
    const TabletId& tablet_id,
1731
    const std::shared_ptr<const YBTable>& table,
1732
    master::IncludeInactive include_inactive,
1733
    const boost::optional<PartitionListVersion>& response_partition_list_version,
1734
    int64_t request_no,
1735
377
    const Status& status) {
1736
377
  
VLOG_WITH_PREFIX1
(1) << "Lookup for tablet " << tablet_id << ", failed with: " << status1
;
1737
1738
377
  CallbackNotifier notifier(status);
1739
377
  CoarseTimePoint max_deadline;
1740
377
  {
1741
377
    std::lock_guard<decltype(mutex_)> lock(mutex_);
1742
377
    if (status.IsNotFound() && 
response_partition_list_version.has_value()360
) {
1743
305
      auto tablet = LookupTabletByIdFastPathUnlocked(tablet_id);
1744
305
      if (tablet) {
1745
285
        const auto tablet_last_known_table_partition_list_version =
1746
285
            tablet->GetLastKnownPartitionListVersion();
1747
285
        if (tablet_last_known_table_partition_list_version <
1748
285
            response_partition_list_version.value()) {
1749
2
          const auto msg_formatter = [&] {
1750
2
            return Format(
1751
2
                "Received table $0 ($1) partitions version: $2, last known by MetaCache for "
1752
2
                "tablet $3: $4",
1753
2
                table->id(), table->name(), response_partition_list_version,
1754
2
                tablet_id, tablet_last_known_table_partition_list_version);
1755
2
          };
1756
2
          
VLOG_WITH_PREFIX_AND_FUNC0
(3) << msg_formatter()0
;
1757
2
          notifier.SetStatus(STATUS(
1758
2
              TryAgain, msg_formatter(), ClientError(ClientErrorCode::kTablePartitionListIsStale)));
1759
2
        }
1760
285
      }
1761
305
    }
1762
1763
377
    auto table_lookup_iterator = tablet_lookups_by_id_.find(tablet_id);
1764
377
    if (table_lookup_iterator != tablet_lookups_by_id_.end()) {
1765
377
      auto& lookup_data_group = table_lookup_iterator->second;
1766
377
      max_deadline = LookupFailed(status, request_no, TabletIdLookup(tablet_id),
1767
377
                                  &lookup_data_group, &notifier);
1768
377
    }
1769
377
  }
1770
1771
377
  if (max_deadline != CoarseTimePoint()) {
1772
5
    auto rpc = std::make_shared<LookupByIdRpc>(
1773
5
        this, tablet_id, table, include_inactive, request_no, max_deadline, 0);
1774
5
    client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1775
5
  }
1776
377
}
1777
1778
RemoteTabletPtr MetaCache::LookupTabletByKeyFastPathUnlocked(
1779
23.3M
    const TableId& table_id, const VersionedPartitionStartKey& versioned_partition_start_key) {
1780
23.3M
  auto it = tables_.find(table_id);
1781
23.3M
  if (PREDICT_FALSE(it == tables_.end())) {
1782
    // No cache available for this table.
1783
99.3k
    return nullptr;
1784
99.3k
  }
1785
1786
23.2M
  const auto& table_data = it->second;
1787
1788
23.2M
  if (PREDICT_FALSE(
1789
23.2M
          table_data.partition_list->version !=
1790
23.2M
          versioned_partition_start_key.partition_list_version)) {
1791
    // TableData::partition_list version in cache does not match partition_list_version used to
1792
    // calculate partition_key_start, can't use cache.
1793
1
    return nullptr;
1794
1
  }
1795
1796
23.2M
  const auto& partition_start_key = *versioned_partition_start_key.key;
1797
1798
23.2M
  DCHECK_EQ(
1799
23.2M
      partition_start_key,
1800
23.2M
      *client::FindPartitionStart(table_data.partition_list, partition_start_key));
1801
23.2M
  auto tablet_it = table_data.tablets_by_partition.find(partition_start_key);
1802
23.2M
  if (PREDICT_FALSE(tablet_it == it->second.tablets_by_partition.end())) {
1803
    // No tablets with a start partition key lower than 'partition_key'.
1804
75.1k
    return nullptr;
1805
75.1k
  }
1806
1807
23.2M
  const auto& result = tablet_it->second;
1808
1809
  // Stale entries must be re-fetched.
1810
23.2M
  if (result->stale()) {
1811
1.68k
    return nullptr;
1812
1.68k
  }
1813
1814
23.2M
  if (result->partition().partition_key_end().compare(partition_start_key) > 0 ||
1815
23.2M
      
result->partition().partition_key_end().empty()9.04M
) {
1816
    // partition_start_key < partition.end OR tablet does not end.
1817
23.1M
    return result;
1818
23.1M
  }
1819
1820
7.11k
  return nullptr;
1821
23.2M
}
1822
1823
boost::optional<std::vector<RemoteTabletPtr>> MetaCache::FastLookupAllTabletsUnlocked(
1824
0
    const std::shared_ptr<const YBTable>& table) {
1825
0
  auto tablets = std::vector<RemoteTabletPtr>();
1826
0
  auto it = tables_.find(table->id());
1827
0
  if (PREDICT_FALSE(it == tables_.end())) {
1828
    // No cache available for this table.
1829
0
    return boost::none;
1830
0
  }
1831
1832
0
  for (const auto& tablet : it->second.all_tablets) {
1833
0
    if (tablet->stale()) {
1834
0
      return boost::none;
1835
0
    }
1836
0
    tablets.push_back(tablet);
1837
0
  }
1838
1839
0
  if (tablets.empty()) {
1840
0
    return boost::none;
1841
0
  }
1842
0
  return tablets;
1843
0
}
1844
1845
// We disable thread safety analysis in this function due to manual conditional locking.
1846
RemoteTabletPtr MetaCache::FastLookupTabletByKeyUnlocked(
1847
23.3M
    const TableId& table_id, const VersionedPartitionStartKey& partition_start) {
1848
  // Fast path: lookup in the cache.
1849
23.3M
  auto result = LookupTabletByKeyFastPathUnlocked(table_id, partition_start);
1850
23.3M
  if (result && 
result->HasLeader()23.1M
) {
1851
23.1M
    
VLOG_WITH_PREFIX997
(5) << "Fast lookup: found tablet " << result->tablet_id()997
;
1852
23.1M
    return result;
1853
23.1M
  }
1854
1855
184k
  return nullptr;
1856
23.3M
}
1857
1858
template <class Mutex>
1859
116k
bool IsUniqueLock(const std::lock_guard<Mutex>*) {
1860
116k
  return true;
1861
116k
}
1862
1863
template <class Mutex>
1864
66.7k
bool IsUniqueLock(const SharedLock<Mutex>*) {
1865
66.7k
  return false;
1866
66.7k
}
1867
1868
// partition_group_start should be not nullptr and points to PartitionGroupStartKeyPtr that will be
1869
// initialized only if it is nullptr. This is an optimization to avoid recalculation of
1870
// partition_group_start in subsequent call of this function (see MetaCache::LookupTabletByKey).
1871
template <class Lock>
1872
bool MetaCache::DoLookupTabletByKey(
1873
    const std::shared_ptr<const YBTable>& table, const VersionedTablePartitionListPtr& partitions,
1874
    const PartitionKeyPtr& partition_start, CoarseTimePoint deadline,
1875
23.3M
    LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) {
1876
23.3M
  DCHECK_ONLY_NOTNULL(partition_group_start);
1877
23.3M
  RemoteTabletPtr tablet;
1878
23.3M
  auto scope_exit = ScopeExit([callback, &tablet] {
1879
23.3M
    if (tablet) {
1880
23.1M
      (*callback)(tablet);
1881
23.1M
    }
1882
23.3M
  });
bool yb::client::internal::MetaCache::DoLookupTabletByKey<yb::SharedLock<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> const&, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const>*)::'lambda'()::operator()() const
Line
Count
Source
1878
23.3M
  auto scope_exit = ScopeExit([callback, &tablet] {
1879
23.3M
    if (tablet) {
1880
23.1M
      (*callback)(tablet);
1881
23.1M
    }
1882
23.3M
  });
bool yb::client::internal::MetaCache::DoLookupTabletByKey<std::__1::lock_guard<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> const&, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const>*)::'lambda'()::operator()() const
Line
Count
Source
1878
53.1k
  auto scope_exit = ScopeExit([callback, &tablet] {
1879
53.1k
    if (tablet) {
1880
0
      (*callback)(tablet);
1881
0
    }
1882
53.1k
  });
1883
23.3M
  int64_t request_no;
1884
23.3M
  {
1885
23.3M
    Lock lock(mutex_);
1886
23.3M
    tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version});
1887
23.3M
    if (tablet) {
1888
23.1M
      return true;
1889
23.1M
    }
1890
1891
153k
    auto table_it = tables_.find(table->id());
1892
153k
    TableData* table_data;
1893
153k
    if (table_it == tables_.end()) {
1894
99.4k
      
VLOG_WITH_PREFIX_AND_FUNC147
(4) << Format(
1895
147
          "missed table_id $0", table->id());
1896
99.4k
      if (!IsUniqueLock(&lock)) {
1897
49.6k
        return false;
1898
49.6k
      }
1899
49.8k
      table_it = InitTableDataUnlocked(table->id(), partitions);
1900
49.8k
    }
1901
104k
    table_data = &table_it->second;
1902
1903
104k
    if (table_data->partition_list->version != partitions->version ||
1904
134k
        (PREDICT_FALSE(RandomActWithProbability(
1905
134k
            FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) &&
1906
134k
         
table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE0
)) {
1907
1
      (*callback)(STATUS(
1908
1
          TryAgain,
1909
1
          Format(
1910
1
              "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, "
1911
1
              "refresh required",
1912
1
              table->ToString(), table_data->partition_list->version, partitions->version),
1913
1
          ClientError(ClientErrorCode::kTablePartitionListIsStale)));
1914
1
      return true;
1915
1
    }
1916
1917
131k
    
if (104k
!*partition_group_start104k
) {
1918
131k
      *partition_group_start = client::FindPartitionStart(
1919
131k
          partitions, *partition_start, kPartitionGroupSize);
1920
131k
    }
1921
1922
104k
    auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group;
1923
104k
    LookupDataGroup* lookups_group;
1924
104k
    {
1925
104k
      auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start);
1926
104k
      if (lookups_group_it == tablet_lookups_by_group.end()) {
1927
56.3k
        if (!IsUniqueLock(&lock)) {
1928
3.31k
          return false;
1929
3.31k
        }
1930
53.0k
        lookups_group = &tablet_lookups_by_group[**partition_group_start];
1931
53.0k
      } else {
1932
47.7k
        lookups_group = &lookups_group_it->second;
1933
47.7k
      }
1934
104k
    }
1935
100k
    lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start));
1936
100k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
1937
100k
    int64_t expected = 0;
1938
100k
    if (!lookups_group->running_request_number.compare_exchange_strong(
1939
100k
            expected, request_no, std::memory_order_acq_rel)) {
1940
77.0k
      
VLOG_WITH_PREFIX_AND_FUNC1
(5)
1941
1
          << "Lookup is already running for table: " << table->ToString()
1942
1
          << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1943
1
          << ", partition_list_version: " << partitions->version
1944
1
          << ", request_no: " << expected;
1945
77.0k
      return true;
1946
77.0k
    }
1947
100k
  }
1948
1949
23.7k
  auto rpc = std::make_shared<LookupByKeyRpc>(
1950
23.7k
      this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version},
1951
23.7k
      request_no, deadline);
1952
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4)
1953
18.4E
      << "Started lookup for table: " << table->ToString()
1954
18.4E
      << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1955
18.4E
      << ", rpc: " << AsString(rpc);
1956
23.7k
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1957
23.7k
  return true;
1958
100k
}
bool yb::client::internal::MetaCache::DoLookupTabletByKey<yb::SharedLock<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> const&, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const>*)
Line
Count
Source
1875
23.3M
    LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) {
1876
23.3M
  DCHECK_ONLY_NOTNULL(partition_group_start);
1877
23.3M
  RemoteTabletPtr tablet;
1878
23.3M
  auto scope_exit = ScopeExit([callback, &tablet] {
1879
23.3M
    if (tablet) {
1880
23.3M
      (*callback)(tablet);
1881
23.3M
    }
1882
23.3M
  });
1883
23.3M
  int64_t request_no;
1884
23.3M
  {
1885
23.3M
    Lock lock(mutex_);
1886
23.3M
    tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version});
1887
23.3M
    if (tablet) {
1888
23.1M
      return true;
1889
23.1M
    }
1890
1891
100k
    auto table_it = tables_.find(table->id());
1892
100k
    TableData* table_data;
1893
100k
    if (table_it == tables_.end()) {
1894
49.6k
      
VLOG_WITH_PREFIX_AND_FUNC147
(4) << Format(
1895
147
          "missed table_id $0", table->id());
1896
49.6k
      if (!IsUniqueLock(&lock)) {
1897
49.6k
        return false;
1898
49.6k
      }
1899
83
      table_it = InitTableDataUnlocked(table->id(), partitions);
1900
83
    }
1901
51.1k
    table_data = &table_it->second;
1902
1903
51.1k
    if (table_data->partition_list->version != partitions->version ||
1904
81.6k
        (PREDICT_FALSE(RandomActWithProbability(
1905
81.6k
            FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) &&
1906
81.6k
         
table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE0
)) {
1907
1
      (*callback)(STATUS(
1908
1
          TryAgain,
1909
1
          Format(
1910
1
              "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, "
1911
1
              "refresh required",
1912
1
              table->ToString(), table_data->partition_list->version, partitions->version),
1913
1
          ClientError(ClientErrorCode::kTablePartitionListIsStale)));
1914
1
      return true;
1915
1
    }
1916
1917
81.6k
    
if (51.1k
!*partition_group_start51.1k
) {
1918
81.6k
      *partition_group_start = client::FindPartitionStart(
1919
81.6k
          partitions, *partition_start, kPartitionGroupSize);
1920
81.6k
    }
1921
1922
51.1k
    auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group;
1923
51.1k
    LookupDataGroup* lookups_group;
1924
51.1k
    {
1925
51.1k
      auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start);
1926
51.1k
      if (lookups_group_it == tablet_lookups_by_group.end()) {
1927
3.31k
        if (!IsUniqueLock(&lock)) {
1928
3.31k
          return false;
1929
3.31k
        }
1930
0
        lookups_group = &tablet_lookups_by_group[**partition_group_start];
1931
47.8k
      } else {
1932
47.8k
        lookups_group = &lookups_group_it->second;
1933
47.8k
      }
1934
51.1k
    }
1935
47.8k
    lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start));
1936
47.8k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
1937
47.8k
    int64_t expected = 0;
1938
47.8k
    if (!lookups_group->running_request_number.compare_exchange_strong(
1939
77.0k
            expected, request_no, std::memory_order_acq_rel)) {
1940
77.0k
      
VLOG_WITH_PREFIX_AND_FUNC1
(5)
1941
1
          << "Lookup is already running for table: " << table->ToString()
1942
1
          << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1943
1
          << ", partition_list_version: " << partitions->version
1944
1
          << ", request_no: " << expected;
1945
77.0k
      return true;
1946
77.0k
    }
1947
47.8k
  }
1948
1949
18.4E
  auto rpc = std::make_shared<LookupByKeyRpc>(
1950
18.4E
      this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version},
1951
18.4E
      request_no, deadline);
1952
18.4E
  
VLOG_WITH_PREFIX_AND_FUNC18.4E
(4)
1953
18.4E
      << "Started lookup for table: " << table->ToString()
1954
18.4E
      << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1955
18.4E
      << ", rpc: " << AsString(rpc);
1956
18.4E
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1957
18.4E
  return true;
1958
47.8k
}
bool yb::client::internal::MetaCache::DoLookupTabletByKey<std::__1::lock_guard<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> const&, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const>*)
Line
Count
Source
1875
53.0k
    LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) {
1876
53.0k
  DCHECK_ONLY_NOTNULL(partition_group_start);
1877
53.0k
  RemoteTabletPtr tablet;
1878
53.0k
  auto scope_exit = ScopeExit([callback, &tablet] {
1879
53.0k
    if (tablet) {
1880
53.0k
      (*callback)(tablet);
1881
53.0k
    }
1882
53.0k
  });
1883
53.0k
  int64_t request_no;
1884
53.0k
  {
1885
53.0k
    Lock lock(mutex_);
1886
53.0k
    tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version});
1887
53.0k
    if (tablet) {
1888
0
      return true;
1889
0
    }
1890
1891
53.0k
    auto table_it = tables_.find(table->id());
1892
53.0k
    TableData* table_data;
1893
53.0k
    if (table_it == tables_.end()) {
1894
49.7k
      
VLOG_WITH_PREFIX_AND_FUNC0
(4) << Format(
1895
0
          "missed table_id $0", table->id());
1896
49.7k
      if (!IsUniqueLock(&lock)) {
1897
0
        return false;
1898
0
      }
1899
49.7k
      table_it = InitTableDataUnlocked(table->id(), partitions);
1900
49.7k
    }
1901
53.0k
    table_data = &table_it->second;
1902
1903
53.0k
    if (table_data->partition_list->version != partitions->version ||
1904
53.1k
        (PREDICT_FALSE(RandomActWithProbability(
1905
53.1k
            FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) &&
1906
53.1k
         
table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE0
)) {
1907
0
      (*callback)(STATUS(
1908
0
          TryAgain,
1909
0
          Format(
1910
0
              "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, "
1911
0
              "refresh required",
1912
0
              table->ToString(), table_data->partition_list->version, partitions->version),
1913
0
          ClientError(ClientErrorCode::kTablePartitionListIsStale)));
1914
0
      return true;
1915
0
    }
1916
1917
53.0k
    if (!*partition_group_start) {
1918
49.8k
      *partition_group_start = client::FindPartitionStart(
1919
49.8k
          partitions, *partition_start, kPartitionGroupSize);
1920
49.8k
    }
1921
1922
53.0k
    auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group;
1923
53.0k
    LookupDataGroup* lookups_group;
1924
53.0k
    {
1925
53.0k
      auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start);
1926
53.0k
      if (
lookups_group_it == tablet_lookups_by_group.end()53.0k
) {
1927
53.0k
        if (!IsUniqueLock(&lock)) {
1928
0
          return false;
1929
0
        }
1930
53.0k
        lookups_group = &tablet_lookups_by_group[**partition_group_start];
1931
18.4E
      } else {
1932
18.4E
        lookups_group = &lookups_group_it->second;
1933
18.4E
      }
1934
53.0k
    }
1935
53.0k
    lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start));
1936
53.0k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
1937
53.0k
    int64_t expected = 0;
1938
53.0k
    if (!lookups_group->running_request_number.compare_exchange_strong(
1939
53.0k
            expected, request_no, std::memory_order_acq_rel)) {
1940
76
      
VLOG_WITH_PREFIX_AND_FUNC0
(5)
1941
0
          << "Lookup is already running for table: " << table->ToString()
1942
0
          << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1943
0
          << ", partition_list_version: " << partitions->version
1944
0
          << ", request_no: " << expected;
1945
76
      return true;
1946
76
    }
1947
53.0k
  }
1948
1949
52.9k
  auto rpc = std::make_shared<LookupByKeyRpc>(
1950
52.9k
      this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version},
1951
52.9k
      request_no, deadline);
1952
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4)
1953
18.4E
      << "Started lookup for table: " << table->ToString()
1954
18.4E
      << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1955
18.4E
      << ", rpc: " << AsString(rpc);
1956
52.9k
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1957
52.9k
  return true;
1958
53.0k
}
1959
1960
template <class Lock>
1961
bool MetaCache::DoLookupAllTablets(const std::shared_ptr<const YBTable>& table,
1962
                                   CoarseTimePoint deadline,
1963
0
                                   LookupTabletRangeCallback* callback) {
1964
0
  VLOG_WITH_PREFIX(3) << "DoLookupAllTablets() for table: " << table->ToString();
1965
0
  int64_t request_no;
1966
0
  {
1967
0
    Lock lock(mutex_);
1968
0
    if (PREDICT_TRUE(!FLAGS_TEST_force_master_lookup_all_tablets)) {
1969
0
      auto tablets = FastLookupAllTabletsUnlocked(table);
1970
0
      if (tablets.has_value()) {
1971
0
        VLOG_WITH_PREFIX(4) << "tablets has value";
1972
0
        (*callback)(*tablets);
1973
0
        return true;
1974
0
      }
1975
0
    }
1976
1977
0
    if (!IsUniqueLock(&lock)) {
1978
0
      return false;
1979
0
    }
1980
0
    auto table_it = tables_.find(table->id());
1981
0
    if (table_it == tables_.end()) {
1982
0
      table_it = InitTableDataUnlocked(table->id(), table->GetVersionedPartitions());
1983
0
    }
1984
0
    auto& table_data = table_it->second;
1985
1986
0
    auto& full_table_lookups = table_data.full_table_lookups;
1987
0
    full_table_lookups.lookups.Push(new LookupData(*callback, deadline, nullptr));
1988
0
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
1989
0
    int64_t expected = 0;
1990
0
    if (!full_table_lookups.running_request_number.compare_exchange_strong(
1991
0
        expected, request_no, std::memory_order_acq_rel)) {
1992
0
      VLOG_WITH_PREFIX_AND_FUNC(5)
1993
0
          << "Lookup is already running for table: " << table->ToString();
1994
0
      return true;
1995
0
    }
1996
0
  }
1997
1998
0
  VLOG_WITH_PREFIX_AND_FUNC(4)
1999
0
      << "Start lookup for table: " << table->ToString();
2000
2001
0
  auto rpc = std::make_shared<LookupFullTableRpc>(this, table, request_no, deadline);
2002
0
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
2003
0
  return true;
2004
0
}
Unexecuted instantiation: bool yb::client::internal::MetaCache::DoLookupAllTablets<yb::SharedLock<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<std::__1::vector<scoped_refptr<yb::client::internal::RemoteTablet>, std::__1::allocator<scoped_refptr<yb::client::internal::RemoteTablet> > > > const&)>*)
Unexecuted instantiation: bool yb::client::internal::MetaCache::DoLookupAllTablets<std::__1::lock_guard<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<std::__1::vector<scoped_refptr<yb::client::internal::RemoteTablet>, std::__1::allocator<scoped_refptr<yb::client::internal::RemoteTablet> > > > const&)>*)
2005
2006
// We disable thread safety analysis in this function due to manual conditional locking.
2007
void MetaCache::LookupTabletByKey(const std::shared_ptr<YBTable>& table,
2008
                                  const PartitionKey& partition_key,
2009
                                  CoarseTimePoint deadline,
2010
23.3M
                                  LookupTabletCallback callback) {
2011
23.3M
  if (table->ArePartitionsStale()) {
2012
19.2k
    table->RefreshPartitions(client_, [this, table, partition_key, deadline,
2013
19.2k
                              callback = std::move(callback)](const Status& status) {
2014
19.2k
      if (!status.ok()) {
2015
0
        callback(status);
2016
0
        return;
2017
0
      }
2018
19.2k
      InvalidateTableCache(*table);
2019
19.2k
      LookupTabletByKey(table, partition_key, deadline, std::move(callback));
2020
19.2k
    });
2021
19.2k
    return;
2022
19.2k
  }
2023
2024
23.3M
  const auto table_partition_list = table->GetVersionedPartitions();
2025
23.3M
  const auto partition_start = client::FindPartitionStart(table_partition_list, partition_key);
2026
23.3M
  
VLOG_WITH_PREFIX_AND_FUNC7.96k
(5) << "Table: " << table->ToString()
2027
7.96k
                    << ", table_partition_list: " << table_partition_list->ToString()
2028
7.96k
                    << ", partition_key: " << Slice(partition_key).ToDebugHexString()
2029
7.96k
                    << ", partition_start: " << Slice(*partition_start).ToDebugHexString();
2030
2031
23.3M
  PartitionGroupStartKeyPtr partition_group_start;
2032
23.3M
  if (DoLookupTabletByKey<SharedLock<std::shared_timed_mutex>>(
2033
23.3M
          table, table_partition_list, partition_start, deadline, &callback,
2034
23.3M
          &partition_group_start)) {
2035
23.2M
    return;
2036
23.2M
  }
2037
2038
51.2k
  bool result = DoLookupTabletByKey<std::lock_guard<std::shared_timed_mutex>>(
2039
51.2k
      table, table_partition_list, partition_start, deadline, &callback, &partition_group_start);
2040
18.4E
  LOG_IF(DFATAL, !result)
2041
18.4E
      << "Lookup was not started for table " << table->ToString()
2042
18.4E
      << ", partition_key: " << Slice(partition_key).ToDebugHexString();
2043
51.2k
}
2044
2045
void MetaCache::LookupAllTablets(const std::shared_ptr<const YBTable>& table,
2046
                                 CoarseTimePoint deadline,
2047
0
                                 LookupTabletRangeCallback callback) {
2048
  // We first want to check the cache in read-only mode, and only if we can't find anything
2049
  // do a lookup in write mode.
2050
0
  if (DoLookupAllTablets<SharedLock<std::shared_timed_mutex>>(table, deadline, &callback)) {
2051
0
    return;
2052
0
  }
2053
2054
0
  bool result = DoLookupAllTablets<std::lock_guard<std::shared_timed_mutex>>(
2055
0
      table, deadline, &callback);
2056
0
  LOG_IF(DFATAL, !result)
2057
0
      << "Full table lookup was not started for table " << table->ToString();
2058
0
}
2059
2060
1.95M
RemoteTabletPtr MetaCache::LookupTabletByIdFastPathUnlocked(const TabletId& tablet_id) {
2061
1.95M
  auto it = tablets_by_id_.find(tablet_id);
2062
1.95M
  if (it != tablets_by_id_.end()) {
2063
1.92M
    return it->second;
2064
1.92M
  }
2065
29.7k
  return nullptr;
2066
1.95M
}
2067
2068
template <class Lock>
2069
bool MetaCache::DoLookupTabletById(
2070
    const TabletId& tablet_id,
2071
    const std::shared_ptr<const YBTable>& table,
2072
    master::IncludeInactive include_inactive,
2073
    CoarseTimePoint deadline,
2074
    UseCache use_cache,
2075
1.95M
    LookupTabletCallback* callback) {
2076
1.95M
  RemoteTabletPtr tablet;
2077
1.95M
  auto scope_exit = ScopeExit([callback, &tablet] {
2078
1.95M
    if (tablet) {
2079
1.91M
      (*callback)(tablet);
2080
1.91M
    }
2081
1.95M
  });
bool yb::client::internal::MetaCache::DoLookupTabletById<yb::SharedLock<std::__1::shared_timed_mutex> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::client::YBTable const> const&, yb::StronglyTypedBool<yb::master::IncludeInactive_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::UseCache_Tag>, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*)::'lambda'()::operator()() const
Line
Count
Source
2077
1.94M
  auto scope_exit = ScopeExit([callback, &tablet] {
2078
1.94M
    if (tablet) {
2079
1.91M
      (*callback)(tablet);
2080
1.91M
    }
2081
1.94M
  });
bool yb::client::internal::MetaCache::DoLookupTabletById<std::__1::lock_guard<std::__1::shared_timed_mutex> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::client::YBTable const> const&, yb::StronglyTypedBool<yb::master::IncludeInactive_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::UseCache_Tag>, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*)::'lambda'()::operator()() const
Line
Count
Source
2077
13.7k
  auto scope_exit = ScopeExit([callback, &tablet] {
2078
13.7k
    if (tablet) {
2079
0
      (*callback)(tablet);
2080
0
    }
2081
13.7k
  });
2082
1.95M
  int64_t request_no;
2083
1.95M
  int64_t lookups_without_new_replicas = 0;
2084
1.95M
  {
2085
1.95M
    Lock lock(mutex_);
2086
2087
    // Fast path: lookup in the cache.
2088
1.95M
    tablet = LookupTabletByIdFastPathUnlocked(tablet_id);
2089
1.95M
    if (tablet) {
2090
1.93M
      
VLOG_WITH_PREFIX177
(5) << "Fast lookup: candidate tablet " << AsString(tablet)177
;
2091
1.93M
      if (use_cache && 
tablet->HasLeader()1.93M
) {
2092
        // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with
2093
        // tablet_id is found on all replicas.
2094
1.91M
        
VLOG_WITH_PREFIX184
(5) << "Fast lookup: found tablet " << tablet->tablet_id()184
;
2095
1.91M
        return true;
2096
1.91M
      }
2097
14.4k
      lookups_without_new_replicas = tablet->lookups_without_new_replicas();
2098
14.4k
      tablet = nullptr;
2099
14.4k
    }
2100
2101
38.3k
    LookupDataGroup* lookup;
2102
38.3k
    {
2103
38.3k
      auto lookup_it = tablet_lookups_by_id_.find(tablet_id);
2104
38.3k
      if (lookup_it == tablet_lookups_by_id_.end()) {
2105
27.0k
        if (!IsUniqueLock(&lock)) {
2106
13.5k
          return false;
2107
13.5k
        }
2108
13.5k
        lookup = &tablet_lookups_by_id_[tablet_id];
2109
13.5k
      } else {
2110
11.2k
        lookup = &lookup_it->second;
2111
11.2k
      }
2112
38.3k
    }
2113
24.8k
    lookup->lookups.Push(new LookupData(*callback, deadline, nullptr));
2114
24.8k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
2115
24.8k
    int64_t expected = 0;
2116
24.8k
    if (!lookup->running_request_number.compare_exchange_strong(
2117
24.8k
            expected, request_no, std::memory_order_acq_rel)) {
2118
18.4E
      VLOG_WITH_PREFIX_AND_FUNC(5) << "Lookup already running for tablet: " << tablet_id;
2119
13.2k
      return true;
2120
13.2k
    }
2121
24.8k
  }
2122
2123
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4) << "Start lookup for tablet " << tablet_id << ": " << request_no;
2124
2125
11.5k
  auto rpc = std::make_shared<LookupByIdRpc>(
2126
11.5k
      this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas);
2127
11.5k
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
2128
11.5k
  return true;
2129
24.8k
}
bool yb::client::internal::MetaCache::DoLookupTabletById<yb::SharedLock<std::__1::shared_timed_mutex> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::client::YBTable const> const&, yb::StronglyTypedBool<yb::master::IncludeInactive_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::UseCache_Tag>, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*)
Line
Count
Source
2075
1.94M
    LookupTabletCallback* callback) {
2076
1.94M
  RemoteTabletPtr tablet;
2077
1.94M
  auto scope_exit = ScopeExit([callback, &tablet] {
2078
1.94M
    if (tablet) {
2079
1.94M
      (*callback)(tablet);
2080
1.94M
    }
2081
1.94M
  });
2082
1.94M
  int64_t request_no;
2083
1.94M
  int64_t lookups_without_new_replicas = 0;
2084
1.94M
  {
2085
1.94M
    Lock lock(mutex_);
2086
2087
    // Fast path: lookup in the cache.
2088
1.94M
    tablet = LookupTabletByIdFastPathUnlocked(tablet_id);
2089
1.94M
    if (tablet) {
2090
1.92M
      
VLOG_WITH_PREFIX177
(5) << "Fast lookup: candidate tablet " << AsString(tablet)177
;
2091
1.92M
      if (use_cache && 
tablet->HasLeader()1.92M
) {
2092
        // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with
2093
        // tablet_id is found on all replicas.
2094
1.91M
        
VLOG_WITH_PREFIX184
(5) << "Fast lookup: found tablet " << tablet->tablet_id()184
;
2095
1.91M
        return true;
2096
1.91M
      }
2097
13.5k
      lookups_without_new_replicas = tablet->lookups_without_new_replicas();
2098
13.5k
      tablet = nullptr;
2099
13.5k
    }
2100
2101
24.6k
    LookupDataGroup* lookup;
2102
24.6k
    {
2103
24.6k
      auto lookup_it = tablet_lookups_by_id_.find(tablet_id);
2104
24.6k
      if (lookup_it == tablet_lookups_by_id_.end()) {
2105
13.5k
        if (
!IsUniqueLock(&lock)13.4k
) {
2106
13.5k
          return false;
2107
13.5k
        }
2108
18.4E
        lookup = &tablet_lookups_by_id_[tablet_id];
2109
18.4E
      } else {
2110
11.1k
        lookup = &lookup_it->second;
2111
11.1k
      }
2112
24.6k
    }
2113
11.1k
    lookup->lookups.Push(new LookupData(*callback, deadline, nullptr));
2114
11.1k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
2115
11.1k
    int64_t expected = 0;
2116
11.1k
    if (!lookup->running_request_number.compare_exchange_strong(
2117
13.1k
            expected, request_no, std::memory_order_acq_rel)) {
2118
18.4E
      VLOG_WITH_PREFIX_AND_FUNC(5) << "Lookup already running for tablet: " << tablet_id;
2119
13.1k
      return true;
2120
13.1k
    }
2121
11.1k
  }
2122
2123
18.4E
  
VLOG_WITH_PREFIX_AND_FUNC18.4E
(4) << "Start lookup for tablet " << tablet_id << ": " << request_no18.4E
;
2124
2125
18.4E
  auto rpc = std::make_shared<LookupByIdRpc>(
2126
18.4E
      this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas);
2127
18.4E
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
2128
18.4E
  return true;
2129
11.1k
}
bool yb::client::internal::MetaCache::DoLookupTabletById<std::__1::lock_guard<std::__1::shared_timed_mutex> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::client::YBTable const> const&, yb::StronglyTypedBool<yb::master::IncludeInactive_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::UseCache_Tag>, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*)
Line
Count
Source
2075
13.6k
    LookupTabletCallback* callback) {
2076
13.6k
  RemoteTabletPtr tablet;
2077
13.6k
  auto scope_exit = ScopeExit([callback, &tablet] {
2078
13.6k
    if (tablet) {
2079
13.6k
      (*callback)(tablet);
2080
13.6k
    }
2081
13.6k
  });
2082
13.6k
  int64_t request_no;
2083
13.6k
  int64_t lookups_without_new_replicas = 0;
2084
13.6k
  {
2085
13.6k
    Lock lock(mutex_);
2086
2087
    // Fast path: lookup in the cache.
2088
13.6k
    tablet = LookupTabletByIdFastPathUnlocked(tablet_id);
2089
13.6k
    if (tablet) {
2090
936
      
VLOG_WITH_PREFIX0
(5) << "Fast lookup: candidate tablet " << AsString(tablet)0
;
2091
936
      if (use_cache && 
tablet->HasLeader()769
) {
2092
        // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with
2093
        // tablet_id is found on all replicas.
2094
0
        VLOG_WITH_PREFIX(5) << "Fast lookup: found tablet " << tablet->tablet_id();
2095
0
        return true;
2096
0
      }
2097
936
      lookups_without_new_replicas = tablet->lookups_without_new_replicas();
2098
936
      tablet = nullptr;
2099
936
    }
2100
2101
13.6k
    LookupDataGroup* lookup;
2102
13.6k
    {
2103
13.6k
      auto lookup_it = tablet_lookups_by_id_.find(tablet_id);
2104
13.6k
      if (lookup_it == tablet_lookups_by_id_.end()) {
2105
13.5k
        if (!IsUniqueLock(&lock)) {
2106
0
          return false;
2107
0
        }
2108
13.5k
        lookup = &tablet_lookups_by_id_[tablet_id];
2109
13.5k
      } else {
2110
110
        lookup = &lookup_it->second;
2111
110
      }
2112
13.6k
    }
2113
13.6k
    lookup->lookups.Push(new LookupData(*callback, deadline, nullptr));
2114
13.6k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
2115
13.6k
    int64_t expected = 0;
2116
13.6k
    if (!lookup->running_request_number.compare_exchange_strong(
2117
13.6k
            expected, request_no, std::memory_order_acq_rel)) {
2118
129
      
VLOG_WITH_PREFIX_AND_FUNC0
(5) << "Lookup already running for tablet: " << tablet_id0
;
2119
129
      return true;
2120
129
    }
2121
13.6k
  }
2122
2123
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4) << "Start lookup for tablet " << tablet_id << ": " << request_no;
2124
2125
13.5k
  auto rpc = std::make_shared<LookupByIdRpc>(
2126
13.5k
      this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas);
2127
13.5k
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
2128
13.5k
  return true;
2129
13.6k
}
2130
2131
void MetaCache::LookupTabletById(const TabletId& tablet_id,
2132
                                 const std::shared_ptr<const YBTable>& table,
2133
                                 master::IncludeInactive include_inactive,
2134
                                 CoarseTimePoint deadline,
2135
                                 LookupTabletCallback callback,
2136
1.94M
                                 UseCache use_cache) {
2137
1.94M
  
VLOG_WITH_PREFIX_AND_FUNC804
(5) << "(" << tablet_id << ", " << use_cache << ")"804
;
2138
2139
1.94M
  if (DoLookupTabletById<SharedLock<decltype(mutex_)>>(
2140
1.94M
          tablet_id, table, include_inactive, deadline, use_cache, &callback)) {
2141
1.93M
    return;
2142
1.93M
  }
2143
2144
13.1k
  auto result = DoLookupTabletById<std::lock_guard<decltype(mutex_)>>(
2145
13.1k
      tablet_id, table, include_inactive, deadline, use_cache, &callback);
2146
18.4E
  LOG_IF(DFATAL, !result) << "Lookup was not started for tablet " << tablet_id;
2147
13.1k
}
2148
2149
void MetaCache::MarkTSFailed(RemoteTabletServer* ts,
2150
0
                             const Status& status) {
2151
0
  LOG_WITH_PREFIX(INFO) << "Marking tablet server " << ts->ToString() << " as failed.";
2152
0
  SharedLock<decltype(mutex_)> lock(mutex_);
2153
2154
0
  Status ts_status = status.CloneAndPrepend("TS failed");
2155
2156
  // TODO: replace with a ts->tablet multimap for faster lookup?
2157
0
  for (const auto& tablet : tablets_by_id_) {
2158
    // We just loop on all tablets; if a tablet does not have a replica on this
2159
    // TS, MarkReplicaFailed() returns false and we ignore the return value.
2160
0
    tablet.second->MarkReplicaFailed(ts, ts_status);
2161
0
  }
2162
0
}
2163
2164
69.4k
bool MetaCache::AcquireMasterLookupPermit() {
2165
69.4k
  return master_lookup_sem_.TryAcquire();
2166
69.4k
}
2167
2168
69.3k
void MetaCache::ReleaseMasterLookupPermit() {
2169
69.3k
  master_lookup_sem_.Release();
2170
69.3k
}
2171
2172
std::future<Result<internal::RemoteTabletPtr>> MetaCache::LookupTabletByKeyFuture(
2173
    const std::shared_ptr<YBTable>& table,
2174
    const PartitionKey& partition_key,
2175
0
    CoarseTimePoint deadline) {
2176
0
  return MakeFuture<Result<internal::RemoteTabletPtr>>([&](auto callback) {
2177
0
    this->LookupTabletByKey(table, partition_key, deadline, std::move(callback));
2178
0
  });
2179
0
}
2180
2181
122
LookupDataGroup::~LookupDataGroup() {
2182
122
  std::vector<LookupData*> leftovers;
2183
122
  while (auto* d = lookups.Pop()) {
2184
0
    leftovers.push_back(d);
2185
0
  }
2186
122
  if (!leftovers.empty()) {
2187
0
    LOG(DFATAL) << Format(
2188
0
        "Destructing LookupDataGroup($0), running_request_number: $1 with non empty lookups: $2",
2189
0
        static_cast<void*>(this), running_request_number, leftovers);
2190
0
  }
2191
122
}
2192
2193
void LookupDataGroup::Finished(
2194
69.3k
    int64_t request_no, const ToStringable& id, bool allow_absence) {
2195
69.3k
  int64_t expected = request_no;
2196
69.3k
  if (running_request_number.compare_exchange_strong(expected, 0, std::memory_order_acq_rel)) {
2197
69.3k
    max_completed_request_number = std::max(max_completed_request_number, request_no);
2198
69.3k
    
VLOG_WITH_FUNC0
(2) << "Finished lookup for " << id.ToString() << ", no: " << request_no0
;
2199
69.3k
    return;
2200
69.3k
  }
2201
2202
0
  if ((expected == 0 && max_completed_request_number <= request_no) && !allow_absence) {
2203
0
    LOG(DFATAL) << "Lookup was not running for " << id.ToString() << ", expected: " << request_no;
2204
0
    return;
2205
0
  }
2206
2207
0
  LOG(INFO)
2208
0
      << "Finished lookup for " << id.ToString() << ": " << request_no << ", while "
2209
0
      << expected << " was running, could happen during tablet split";
2210
0
}
2211
2212
TableData::TableData(const VersionedTablePartitionListPtr& partition_list_)
2213
640k
    : partition_list(partition_list_) {
2214
640k
  DCHECK_ONLY_NOTNULL(partition_list);
2215
640k
}
2216
2217
0
std::string VersionedPartitionStartKey::ToString() const {
2218
0
  return YB_STRUCT_TO_STRING(key, partition_list_version);
2219
0
}
2220
2221
9
std::string RemoteReplica::ToString() const {
2222
9
  return Format("$0 ($1, $2)",
2223
9
                ts->permanent_uuid(),
2224
9
                PeerRole_Name(role),
2225
9
                Failed() ? 
"FAILED"0
: "OK");
2226
9
}
2227
2228
} // namespace internal
2229
} // namespace client
2230
} // namespace yb