YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/meta_cache.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/client/meta_cache.h"
34
35
#include <stdint.h>
36
37
#include <atomic>
38
#include <list>
39
#include <memory>
40
#include <shared_mutex>
41
#include <string>
42
#include <unordered_map>
43
#include <unordered_set>
44
#include <vector>
45
46
#include <boost/optional/optional_io.hpp>
47
#include <glog/logging.h>
48
49
#include "yb/client/client.h"
50
#include "yb/client/client_error.h"
51
#include "yb/client/client_master_rpc.h"
52
#include "yb/client/client-internal.h"
53
#include "yb/client/schema.h"
54
#include "yb/client/table.h"
55
#include "yb/client/yb_table_name.h"
56
57
#include "yb/common/wire_protocol.h"
58
59
#include "yb/gutil/map-util.h"
60
#include "yb/gutil/ref_counted.h"
61
#include "yb/gutil/strings/substitute.h"
62
63
#include "yb/master/master_client.proxy.h"
64
65
#include "yb/rpc/rpc_fwd.h"
66
67
#include "yb/tserver/local_tablet_server.h"
68
#include "yb/tserver/tserver_service.proxy.h"
69
70
#include "yb/util/async_util.h"
71
#include "yb/util/atomic.h"
72
#include "yb/util/flag_tags.h"
73
#include "yb/util/locks.h"
74
#include "yb/util/logging.h"
75
#include "yb/util/metrics.h"
76
#include "yb/util/monotime.h"
77
#include "yb/util/net/dns_resolver.h"
78
#include "yb/util/net/net_util.h"
79
#include "yb/util/net/sockaddr.h"
80
#include "yb/util/random_util.h"
81
#include "yb/util/result.h"
82
#include "yb/util/scope_exit.h"
83
#include "yb/util/shared_lock.h"
84
#include "yb/util/status_format.h"
85
#include "yb/util/unique_lock.h"
86
87
using std::map;
88
using std::shared_ptr;
89
using strings::Substitute;
90
using namespace std::literals;
91
using namespace std::placeholders;
92
93
DEFINE_int32(max_concurrent_master_lookups, 500,
94
             "Maximum number of concurrent tablet location lookups from YB client to master");
95
96
DEFINE_test_flag(bool, verify_all_replicas_alive, false,
97
                 "If set, when a RemoteTablet object is destroyed, we will verify that all its "
98
                 "replicas are not marked as failed");
99
100
DEFINE_int32(retry_failed_replica_ms, 60 * 1000,
101
             "Time in milliseconds to wait for before retrying a failed replica");
102
103
DEFINE_int64(meta_cache_lookup_throttling_step_ms, 5,
104
             "Step to increment delay between calls during lookup throttling.");
105
106
DEFINE_int64(meta_cache_lookup_throttling_max_delay_ms, 1000,
107
             "Max delay between calls during lookup throttling.");
108
109
DEFINE_test_flag(bool, force_master_lookup_all_tablets, false,
110
                 "If set, force the client to go to the master for all tablet lookup "
111
                 "instead of reading from cache.");
112
113
DEFINE_test_flag(double, simulate_lookup_timeout_probability, 0,
114
                 "If set, mark an RPC as failed and force retry on the first attempt.");
115
DEFINE_test_flag(double, simulate_lookup_partition_list_mismatch_probability, 0,
116
                 "Probability for simulating the partition list mismatch error on tablet lookup.");
117
118
METRIC_DEFINE_coarse_histogram(
119
  server, dns_resolve_latency_during_init_proxy,
120
  "yb.client.MetaCache.InitProxy DNS Resolve",
121
  yb::MetricUnit::kMicroseconds,
122
  "Microseconds spent resolving DNS requests during MetaCache::InitProxy");
123
124
namespace yb {
125
126
using consensus::RaftPeerPB;
127
using master::GetTableLocationsRequestPB;
128
using master::GetTableLocationsResponsePB;
129
using master::TabletLocationsPB;
130
using master::TabletLocationsPB_ReplicaPB;
131
using master::TSInfoPB;
132
using rpc::Messenger;
133
using rpc::Rpc;
134
using tablet::RaftGroupStatePB;
135
using tserver::LocalTabletServer;
136
using tserver::TabletServerServiceProxy;
137
using tserver::TabletServerForwardServiceProxy;
138
139
namespace client {
140
141
namespace internal {
142
143
using ProcessedTablesMap =
144
    std::unordered_map<TableId, std::unordered_map<PartitionKey, RemoteTabletPtr>>;
145
146
namespace {
147
148
// We join tablet partitions to groups, so tablet state info in one group requested with single
149
// RPC call to master.
150
// kPartitionGroupSize defines size of this group.
151
#ifdef NDEBUG
152
const size_t kPartitionGroupSize = 64;
153
#else
154
const size_t kPartitionGroupSize = 4;
155
#endif
156
157
std::atomic<int64_t> lookup_serial_{1};
158
159
} // namespace
160
161
0
int64_t TEST_GetLookupSerial() {
162
0
  return lookup_serial_.load(std::memory_order_acquire);
163
0
}
164
165
////////////////////////////////////////////////////////////
166
167
RemoteTabletServer::RemoteTabletServer(const master::TSInfoPB& pb)
168
16.9k
    : uuid_(pb.permanent_uuid()) {
169
16.9k
  Update(pb);
170
16.9k
}
171
172
RemoteTabletServer::RemoteTabletServer(const string& uuid,
173
                                       const shared_ptr<TabletServerServiceProxy>& proxy,
174
                                       const LocalTabletServer* local_tserver)
175
    : uuid_(uuid),
176
      proxy_(proxy),
177
5.48k
      local_tserver_(local_tserver) {
178
0
  LOG_IF(DFATAL, proxy && !IsLocal()) << "Local tserver has non-local proxy";
179
5.48k
}
180
181
235
RemoteTabletServer::~RemoteTabletServer() = default;
182
183
7.84M
Status RemoteTabletServer::InitProxy(YBClient* client) {
184
7.84M
  {
185
7.84M
    SharedLock<rw_spinlock> lock(mutex_);
186
187
7.84M
    if (proxy_) {
188
      // Already have a proxy created.
189
7.84M
      return Status::OK();
190
7.84M
    }
191
596
  }
192
193
596
  std::lock_guard<rw_spinlock> lock(mutex_);
194
195
596
  if (proxy_) {
196
    // Already have a proxy created.
197
5
    return Status::OK();
198
5
  }
199
200
7.76k
  if (!dns_resolve_histogram_) {
201
7.76k
    auto metric_entity = client->metric_entity();
202
7.76k
    if (metric_entity) {
203
7.69k
      dns_resolve_histogram_ = METRIC_dns_resolve_latency_during_init_proxy.Instantiate(
204
7.69k
          metric_entity);
205
7.69k
    }
206
7.76k
  }
207
208
  // TODO: if the TS advertises multiple host/ports, pick the right one
209
  // based on some kind of policy. For now just use the first always.
210
591
  auto hostport = HostPortFromPB(DesiredHostPort(
211
591
      public_rpc_hostports_, private_rpc_hostports_, cloud_info_pb_,
212
591
      client->data_->cloud_info_pb_));
213
591
  CHECK(!hostport.host().empty());
214
591
  ScopedDnsTracker dns_tracker(dns_resolve_histogram_.get());
215
591
  proxy_.reset(new TabletServerServiceProxy(client->data_->proxy_cache_.get(), hostport));
216
591
  proxy_endpoint_ = hostport;
217
218
591
  return Status::OK();
219
591
}
220
221
219k
void RemoteTabletServer::Update(const master::TSInfoPB& pb) {
222
219k
  CHECK_EQ(pb.permanent_uuid(), uuid_);
223
224
219k
  std::lock_guard<rw_spinlock> lock(mutex_);
225
219k
  private_rpc_hostports_ = pb.private_rpc_addresses();
226
219k
  public_rpc_hostports_ = pb.broadcast_addresses();
227
219k
  cloud_info_pb_ = pb.cloud_info();
228
219k
  capabilities_.assign(pb.capabilities().begin(), pb.capabilities().end());
229
219k
  std::sort(capabilities_.begin(), capabilities_.end());
230
219k
}
231
232
5.72M
bool RemoteTabletServer::IsLocal() const {
233
5.72M
  return local_tserver_ != nullptr;
234
5.72M
}
235
236
804k
const std::string& RemoteTabletServer::permanent_uuid() const {
237
804k
  return uuid_;
238
804k
}
239
240
86.4k
const CloudInfoPB& RemoteTabletServer::cloud_info() const {
241
86.4k
  return cloud_info_pb_;
242
86.4k
}
243
244
const google::protobuf::RepeatedPtrField<HostPortPB>&
245
159
    RemoteTabletServer::public_rpc_hostports() const {
246
159
  return public_rpc_hostports_;
247
159
}
248
249
const google::protobuf::RepeatedPtrField<HostPortPB>&
250
159
    RemoteTabletServer::private_rpc_hostports() const {
251
159
  return private_rpc_hostports_;
252
159
}
253
254
7.84M
shared_ptr<TabletServerServiceProxy> RemoteTabletServer::proxy() const {
255
7.84M
  SharedLock<rw_spinlock> lock(mutex_);
256
7.84M
  return proxy_;
257
7.84M
}
258
259
0
::yb::HostPort RemoteTabletServer::ProxyEndpoint() const {
260
0
  std::shared_lock<rw_spinlock> lock(mutex_);
261
0
  return proxy_endpoint_;
262
0
}
263
264
181k
string RemoteTabletServer::ToString() const {
265
181k
  string ret = "{ uuid: " + uuid_;
266
181k
  SharedLock<rw_spinlock> lock(mutex_);
267
181k
  if (!private_rpc_hostports_.empty()) {
268
181k
    ret += Format(" private: $0", private_rpc_hostports_);
269
181k
  }
270
181k
  if (!public_rpc_hostports_.empty()) {
271
0
    ret += Format(" public: $0", public_rpc_hostports_);
272
0
  }
273
181k
  ret += Format(" cloud_info: $0", cloud_info_pb_);
274
181k
  return ret;
275
181k
}
276
277
21.0k
bool RemoteTabletServer::HasHostFrom(const std::unordered_set<std::string>& hosts) const {
278
21.0k
  SharedLock<rw_spinlock> lock(mutex_);
279
21.0k
  for (const auto& hp : private_rpc_hostports_) {
280
21.0k
    if (hosts.count(hp.host())) {
281
0
      return true;
282
0
    }
283
21.0k
  }
284
21.0k
  for (const auto& hp : public_rpc_hostports_) {
285
0
    if (hosts.count(hp.host())) {
286
0
      return true;
287
0
    }
288
0
  }
289
21.0k
  return false;
290
21.0k
}
291
292
6.21M
bool RemoteTabletServer::HasCapability(CapabilityId capability) const {
293
6.21M
  SharedLock<rw_spinlock> lock(mutex_);
294
6.21M
  return std::binary_search(capabilities_.begin(), capabilities_.end(), capability);
295
6.21M
}
296
297
0
std::string ReplicasCount::ToString() {
298
0
  return Format(
299
0
      " live replicas $0, read replicas $1, expected live replicas $2, expected read replicas $3",
300
0
      num_alive_live_replicas, num_alive_read_replicas,
301
0
      expected_live_replicas, expected_read_replicas);
302
0
}
303
304
////////////////////////////////////////////////////////////
305
306
RemoteTablet::RemoteTablet(std::string tablet_id,
307
                           Partition partition,
308
                           boost::optional<PartitionListVersion> partition_list_version,
309
                           uint64 split_depth,
310
                           const TabletId& split_parent_tablet_id)
311
    : tablet_id_(std::move(tablet_id)),
312
      log_prefix_(Format("T $0: ", tablet_id_)),
313
      partition_(std::move(partition)),
314
      partition_list_version_(partition_list_version),
315
      split_depth_(split_depth),
316
      split_parent_tablet_id_(split_parent_tablet_id),
317
70.6k
      stale_(false) {
318
70.6k
}
319
320
160
RemoteTablet::~RemoteTablet() {
321
160
  if (PREDICT_FALSE(FLAGS_TEST_verify_all_replicas_alive)) {
322
    // Let's verify that none of the replicas are marked as failed. The test should always wait
323
    // enough time so that the lookup cache can be refreshed after force_lookup_cache_refresh_secs.
324
0
    for (const auto& replica : replicas_) {
325
0
      if (replica.Failed()) {
326
0
        LOG_WITH_PREFIX(FATAL) << "Remote tablet server " << replica.ts->ToString()
327
0
                               << " with role " << PeerRole_Name(replica.role)
328
0
                               << " is marked as failed";
329
0
      }
330
0
    }
331
0
  }
332
160
}
333
334
void RemoteTablet::Refresh(
335
    const TabletServerMap& tservers,
336
353k
    const google::protobuf::RepeatedPtrField<TabletLocationsPB_ReplicaPB>& replicas) {
337
  // Adopt the data from the successful response.
338
353k
  std::lock_guard<rw_spinlock> lock(mutex_);
339
353k
  std::vector<std::string> old_uuids;
340
353k
  old_uuids.reserve(replicas_.size());
341
730k
  for (const auto& replica : replicas_) {
342
730k
    old_uuids.push_back(replica.ts->permanent_uuid());
343
730k
  }
344
353k
  std::sort(old_uuids.begin(), old_uuids.end());
345
353k
  replicas_.clear();
346
353k
  bool has_new_replica = false;
347
938k
  for (const TabletLocationsPB_ReplicaPB& r : replicas) {
348
938k
    auto it = tservers.find(r.ts_info().permanent_uuid());
349
938k
    CHECK(it != tservers.end());
350
938k
    replicas_.emplace_back(it->second.get(), r.role());
351
938k
    has_new_replica =
352
938k
        has_new_replica ||
353
800k
        !std::binary_search(old_uuids.begin(), old_uuids.end(), r.ts_info().permanent_uuid());
354
938k
  }
355
353k
  if (has_new_replica) {
356
71.3k
    lookups_without_new_replicas_ = 0;
357
282k
  } else {
358
282k
    ++lookups_without_new_replicas_;
359
282k
  }
360
353k
  stale_ = false;
361
353k
  refresh_time_.store(MonoTime::Now(), std::memory_order_release);
362
353k
}
363
364
32
void RemoteTablet::MarkStale() {
365
32
  std::lock_guard<rw_spinlock> lock(mutex_);
366
32
  stale_ = true;
367
32
}
368
369
11.0M
bool RemoteTablet::stale() const {
370
11.0M
  SharedLock<rw_spinlock> lock(mutex_);
371
11.0M
  return stale_;
372
11.0M
}
373
374
19
void RemoteTablet::MarkAsSplit() {
375
19
  std::lock_guard<rw_spinlock> lock(mutex_);
376
19
  is_split_ = true;
377
19
}
378
379
6.00M
bool RemoteTablet::is_split() const {
380
6.00M
  SharedLock<rw_spinlock> lock(mutex_);
381
6.00M
  return is_split_;
382
6.00M
}
383
384
4.92k
bool RemoteTablet::MarkReplicaFailed(RemoteTabletServer *ts, const Status& status) {
385
4.92k
  std::lock_guard<rw_spinlock> lock(mutex_);
386
11
  VLOG_WITH_PREFIX(2) << "Current remote replicas in meta cache: "
387
11
                      << ReplicasAsStringUnlocked() << ". Replica " << ts->ToString()
388
11
                      << " has failed: " << status.ToString();
389
10.1k
  for (RemoteReplica& rep : replicas_) {
390
10.1k
    if (rep.ts == ts) {
391
4.91k
      rep.MarkFailed();
392
4.91k
      return true;
393
4.91k
    }
394
10.1k
  }
395
15
  return false;
396
4.92k
}
397
398
1.69k
int RemoteTablet::GetNumFailedReplicas() const {
399
1.69k
  int failed = 0;
400
1.69k
  SharedLock<rw_spinlock> lock(mutex_);
401
10.1k
  for (const RemoteReplica& rep : replicas_) {
402
10.1k
    if (rep.Failed()) {
403
19
      failed++;
404
19
    }
405
10.1k
  }
406
1.69k
  return failed;
407
1.69k
}
408
409
1.79k
bool RemoteTablet::IsReplicasCountConsistent() const {
410
1.79k
  return replicas_count_.load(std::memory_order_acquire).IsReplicasCountConsistent();
411
1.79k
}
412
413
0
string RemoteTablet::ReplicasCountToString() const {
414
0
  return replicas_count_.load(std::memory_order_acquire).ToString();
415
0
}
416
417
353k
void RemoteTablet::SetExpectedReplicas(int expected_live_replicas, int expected_read_replicas) {
418
353k
  ReplicasCount old_replicas_count = replicas_count_.load(std::memory_order_acquire);
419
353k
  for (;;) {
420
353k
    ReplicasCount new_replicas_count = old_replicas_count;
421
353k
    new_replicas_count.SetExpectedReplicas(expected_live_replicas, expected_read_replicas);
422
353k
    if (replicas_count_.compare_exchange_weak(
423
353k
        old_replicas_count, new_replicas_count, std::memory_order_acq_rel)) {
424
353k
      break;
425
353k
    }
426
353k
  }
427
353k
}
428
429
230k
void RemoteTablet::SetAliveReplicas(int alive_live_replicas, int alive_read_replicas) {
430
230k
  ReplicasCount old_replicas_count = replicas_count_.load(std::memory_order_acquire);
431
230k
  for (;;) {
432
230k
    ReplicasCount new_replicas_count = old_replicas_count;
433
230k
    new_replicas_count.SetAliveReplicas(alive_live_replicas, alive_read_replicas);
434
230k
    if (replicas_count_.compare_exchange_weak(
435
230k
        old_replicas_count, new_replicas_count, std::memory_order_acq_rel)) {
436
230k
      break;
437
230k
    }
438
230k
  }
439
230k
}
440
441
19.4M
RemoteTabletServer* RemoteTablet::LeaderTServer() const {
442
19.4M
  SharedLock<rw_spinlock> lock(mutex_);
443
37.0M
  for (const RemoteReplica& replica : replicas_) {
444
37.0M
    if (!replica.Failed() && replica.role == PeerRole::LEADER) {
445
19.3M
      return replica.ts;
446
19.3M
    }
447
37.0M
  }
448
41.0k
  return nullptr;
449
19.4M
}
450
451
12.1M
bool RemoteTablet::HasLeader() const {
452
12.1M
  return LeaderTServer() != nullptr;
453
12.1M
}
454
455
void RemoteTablet::GetRemoteTabletServers(
456
230k
    std::vector<RemoteTabletServer*>* servers, IncludeFailedReplicas include_failed_replicas) {
457
230k
  DCHECK(servers->empty());
458
230k
  struct ReplicaUpdate {
459
230k
    RemoteReplica* replica;
460
230k
    tablet::RaftGroupStatePB new_state;
461
230k
    bool clear_failed;
462
230k
  };
463
230k
  std::vector<ReplicaUpdate> replica_updates;
464
230k
  {
465
230k
    SharedLock<rw_spinlock> lock(mutex_);
466
230k
    int num_alive_live_replicas = 0;
467
230k
    int num_alive_read_replicas = 0;
468
697k
    for (RemoteReplica& replica : replicas_) {
469
697k
      if (replica.Failed()) {
470
30.0k
        if (include_failed_replicas) {
471
1
          servers->push_back(replica.ts);
472
1
          continue;
473
1
        }
474
30.0k
        ReplicaUpdate replica_update = {&replica, RaftGroupStatePB::UNKNOWN, false};
475
18
        VLOG_WITH_PREFIX(4)
476
18
            << "Replica " << replica.ts->ToString()
477
18
            << " failed, state: " << RaftGroupStatePB_Name(replica.state)
478
18
            << ", is local: " << replica.ts->IsLocal()
479
18
            << ", time since failure: " << (MonoTime::Now() - replica.last_failed_time);
480
30.0k
        switch (replica.state) {
481
29.9k
          case RaftGroupStatePB::UNKNOWN: FALLTHROUGH_INTENDED;
482
29.9k
          case RaftGroupStatePB::NOT_STARTED: FALLTHROUGH_INTENDED;
483
29.9k
          case RaftGroupStatePB::BOOTSTRAPPING: FALLTHROUGH_INTENDED;
484
29.9k
          case RaftGroupStatePB::RUNNING:
485
            // These are non-terminal states that may retry. Check and update failed local replica's
486
            // current state. For remote replica, just wait for some time before retrying.
487
29.9k
            if (replica.ts->IsLocal()) {
488
954
              tserver::GetTabletStatusRequestPB req;
489
954
              tserver::GetTabletStatusResponsePB resp;
490
954
              req.set_tablet_id(tablet_id_);
491
954
              const Status status =
492
954
                  CHECK_NOTNULL(replica.ts->local_tserver())->GetTabletStatus(&req, &resp);
493
954
              if (!status.ok() || resp.has_error()) {
494
523
                LOG_WITH_PREFIX(ERROR)
495
523
                    << "Received error from GetTabletStatus: "
496
523
                    << (!status.ok() ? status : StatusFromPB(resp.error().status()));
497
523
                continue;
498
523
              }
499
500
431
              DCHECK_EQ(resp.tablet_status().tablet_id(), tablet_id_);
501
0
              VLOG_WITH_PREFIX(3) << "GetTabletStatus returned status: "
502
0
                                  << tablet::RaftGroupStatePB_Name(resp.tablet_status().state())
503
0
                                  << " for replica " << replica.ts->ToString();
504
431
              replica_update.new_state = resp.tablet_status().state();
505
431
              if (replica_update.new_state != tablet::RaftGroupStatePB::RUNNING) {
506
40
                if (replica_update.new_state != replica.state) {
507
                  // Cannot update replica here directly because holding only shared lock on mutex.
508
40
                  replica_updates.push_back(replica_update); // Update only state
509
40
                }
510
40
                continue;
511
40
              }
512
391
              if (!replica.ts->local_tserver()->LeaderAndReady(
513
391
                      tablet_id_, /* allow_stale */ true)) {
514
                // Should continue here because otherwise failed state will be cleared.
515
391
                continue;
516
391
              }
517
29.0k
            } else if ((MonoTime::Now() - replica.last_failed_time) <
518
28.9k
                       FLAGS_retry_failed_replica_ms * 1ms) {
519
28.9k
              continue;
520
28.9k
            }
521
51
            break;
522
0
          case RaftGroupStatePB::FAILED: FALLTHROUGH_INTENDED;
523
15
          case RaftGroupStatePB::QUIESCING: FALLTHROUGH_INTENDED;
524
50
          case RaftGroupStatePB::SHUTDOWN:
525
            // These are terminal states, so we won't retry.
526
50
            continue;
527
5
        }
528
529
5
        VLOG_WITH_PREFIX(3) << "Changing state of replica " << replica.ts->ToString()
530
0
                            << " from failed to not failed";
531
5
        replica_update.clear_failed = true;
532
        // Cannot update replica here directly because holding only shared lock on mutex.
533
5
        replica_updates.push_back(replica_update);
534
667k
      } else {
535
667k
        if (replica.role == PeerRole::READ_REPLICA) {
536
5.32k
          num_alive_read_replicas++;
537
662k
        } else if (replica.role == PeerRole::FOLLOWER || replica.role == PeerRole::LEADER) {
538
658k
          num_alive_live_replicas++;
539
658k
        }
540
667k
      }
541
667k
      servers->push_back(replica.ts);
542
667k
    }
543
230k
    SetAliveReplicas(num_alive_live_replicas, num_alive_read_replicas);
544
230k
  }
545
230k
  if (!replica_updates.empty()) {
546
45
    std::lock_guard<rw_spinlock> lock(mutex_);
547
45
    for (const auto& update : replica_updates) {
548
45
      if (update.new_state != RaftGroupStatePB::UNKNOWN) {
549
40
        update.replica->state = update.new_state;
550
40
      }
551
45
      if (update.clear_failed) {
552
5
        update.replica->ClearFailed();
553
5
      }
554
45
    }
555
45
  }
556
230k
}
557
558
939
bool RemoteTablet::MarkTServerAsLeader(const RemoteTabletServer* server) {
559
939
  bool found = false;
560
939
  std::lock_guard<rw_spinlock> lock(mutex_);
561
2.94k
  for (RemoteReplica& replica : replicas_) {
562
2.94k
    if (replica.ts == server) {
563
939
      replica.role = PeerRole::LEADER;
564
939
      found = true;
565
2.00k
    } else if (replica.role == PeerRole::LEADER) {
566
183
      replica.role = PeerRole::FOLLOWER;
567
183
    }
568
2.94k
  }
569
0
  VLOG_WITH_PREFIX(3) << "Latest replicas: " << ReplicasAsStringUnlocked();
570
0
  VLOG_IF_WITH_PREFIX(3, !found) << "Specified server not found: " << server->ToString()
571
0
                                 << ". Replicas: " << ReplicasAsStringUnlocked();
572
939
  return found;
573
939
}
574
575
1.56k
void RemoteTablet::MarkTServerAsFollower(const RemoteTabletServer* server) {
576
1.56k
  bool found = false;
577
1.56k
  std::lock_guard<rw_spinlock> lock(mutex_);
578
4.70k
  for (RemoteReplica& replica : replicas_) {
579
4.70k
    if (replica.ts == server) {
580
1.56k
      replica.role = PeerRole::FOLLOWER;
581
1.56k
      found = true;
582
1.56k
    }
583
4.70k
  }
584
0
  VLOG_WITH_PREFIX(3) << "Latest replicas: " << ReplicasAsStringUnlocked();
585
0
  DCHECK(found) << "Tablet " << tablet_id_ << ": Specified server not found: "
586
0
                << server->ToString() << ". Replicas: " << ReplicasAsStringUnlocked();
587
1.56k
}
588
589
4
std::string RemoteTablet::ReplicasAsString() const {
590
4
  SharedLock<rw_spinlock> lock(mutex_);
591
4
  return ReplicasAsStringUnlocked();
592
4
}
593
594
4
std::string RemoteTablet::ReplicasAsStringUnlocked() const {
595
4
  DCHECK(mutex_.is_locked());
596
4
  string replicas_str;
597
13
  for (const RemoteReplica& rep : replicas_) {
598
13
    if (!replicas_str.empty()) replicas_str += ", ";
599
13
    replicas_str += rep.ToString();
600
13
  }
601
4
  return replicas_str;
602
4
}
603
604
8.20k
std::string RemoteTablet::ToString() const {
605
8.20k
  return YB_CLASS_TO_STRING(tablet_id, partition, partition_list_version, split_depth);
606
8.20k
}
607
608
276
PartitionListVersion RemoteTablet::GetLastKnownPartitionListVersion() const {
609
276
  SharedLock<rw_spinlock> lock(mutex_);
610
276
  return last_known_partition_list_version_;
611
276
}
612
613
void RemoteTablet::MakeLastKnownPartitionListVersionAtLeast(
614
340k
    PartitionListVersion partition_list_version) {
615
340k
  std::lock_guard<rw_spinlock> lock(mutex_);
616
340k
  last_known_partition_list_version_ =
617
340k
      std::max(last_known_partition_list_version_, partition_list_version);
618
340k
}
619
620
96.1k
void LookupCallbackVisitor::operator()(const LookupTabletCallback& tablet_callback) const {
621
96.1k
  if (error_status_) {
622
2.26k
    tablet_callback(*error_status_);
623
2.26k
    return;
624
2.26k
  }
625
93.8k
  auto remote_tablet = boost::get<RemoteTabletPtr>(param_);
626
93.8k
  if (remote_tablet == nullptr) {
627
0
    static const Status error_status = STATUS(
628
0
        TryAgain, "Tablet for requested partition is not yet running",
629
0
        ClientError(ClientErrorCode::kTabletNotYetRunning));
630
0
    tablet_callback(error_status);
631
0
    return;
632
0
  }
633
93.8k
  tablet_callback(remote_tablet);
634
93.8k
}
635
636
void LookupCallbackVisitor::operator()(
637
0
    const LookupTabletRangeCallback& tablet_range_callback) const {
638
0
  if (error_status_) {
639
0
    tablet_range_callback(*error_status_);
640
0
    return;
641
0
  }
642
0
  auto result = boost::get<std::vector<RemoteTabletPtr>>(param_);
643
0
  tablet_range_callback(result);
644
0
}
645
646
////////////////////////////////////////////////////////////
647
648
MetaCache::MetaCache(YBClient* client)
649
  : client_(client),
650
    master_lookup_sem_(FLAGS_max_concurrent_master_lookups),
651
22.9k
    log_prefix_(Format("MetaCache($0): ", static_cast<void*>(this))) {
652
22.9k
}
653
654
2.11k
MetaCache::~MetaCache() {
655
2.11k
}
656
657
void MetaCache::SetLocalTabletServer(const string& permanent_uuid,
658
                                     const shared_ptr<TabletServerServiceProxy>& proxy,
659
5.48k
                                     const LocalTabletServer* local_tserver) {
660
5.48k
  const auto entry = ts_cache_.emplace(permanent_uuid,
661
5.48k
                                       std::make_unique<RemoteTabletServer>(permanent_uuid,
662
5.48k
                                                                            proxy,
663
5.48k
                                                                            local_tserver));
664
5.48k
  CHECK(entry.second);
665
5.48k
  local_tserver_ = entry.first->second.get();
666
5.48k
}
667
668
219k
void MetaCache::UpdateTabletServerUnlocked(const master::TSInfoPB& pb) {
669
219k
  const std::string& permanent_uuid = pb.permanent_uuid();
670
219k
  auto it = ts_cache_.find(permanent_uuid);
671
219k
  if (it != ts_cache_.end()) {
672
202k
    it->second->Update(pb);
673
202k
    return;
674
202k
  }
675
676
16.9k
  VLOG_WITH_PREFIX(1) << "Client caching new TabletServer " << permanent_uuid;
677
16.9k
  CHECK(ts_cache_.emplace(permanent_uuid, std::make_unique<RemoteTabletServer>(pb)).second);
678
16.9k
}
679
680
// A (table, partition_key) --> tablet lookup. May be in-flight to a master, or
681
// may be handled locally.
682
//
683
// Keeps a reference on the owning metacache while alive.
684
class LookupRpc : public internal::ClientMasterRpcBase, public RequestCleanup {
685
 public:
686
  LookupRpc(const scoped_refptr<MetaCache>& meta_cache,
687
            const std::shared_ptr<const YBTable>& table,
688
            int64_t request_no,
689
            CoarseTimePoint deadline);
690
691
  virtual ~LookupRpc();
692
693
  void SendRpc() override;
694
695
160k
  MetaCache* meta_cache() { return meta_cache_.get(); }
696
0
  YBClient* client() const { return meta_cache_->client_; }
697
698
  virtual void NotifyFailure(const Status& status) = 0;
699
700
  template <class Response>
701
  void DoProcessResponse(const Status& status, const Response& resp);
702
703
  // Subclasses can override VerifyResponse for implementing additional response checks. Called
704
  // from Finished if there are no errors passed in response.
705
9.03k
  virtual CHECKED_STATUS VerifyResponse() { return Status::OK(); }
706
707
56.8k
  int64_t request_no() const {
708
56.8k
    return request_no_;
709
56.8k
  }
710
711
155k
  const std::shared_ptr<const YBTable>& table() const {
712
155k
    return table_;
713
155k
  }
714
715
  // When we receive a response for a key lookup or full table rpc, add callbacks to be fired off
716
  // to the to_notify set corresponding to the rpc type. Modify tables pointer by removing lookups
717
  // as we add to the to_notify set.
718
  virtual void AddCallbacksToBeNotified(
719
      const ProcessedTablesMap& processed_tables,
720
      std::unordered_map<TableId, TableData>* tables,
721
      std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) = 0;
722
723
  // When looking up by key or full table, update the processe table with the returned location.
724
  virtual void UpdateProcessedTable(const TabletLocationsPB& loc,
725
                                    RemoteTabletPtr remote,
726
                                    ProcessedTablesMap::mapped_type* processed_table) = 0;
727
728
 private:
729
  virtual CHECKED_STATUS ProcessTabletLocations(
730
     const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations,
731
     boost::optional<PartitionListVersion> table_partition_list_version) = 0;
732
733
  const int64_t request_no_;
734
735
  // Pointer back to the tablet cache. Populated with location information
736
  // if the lookup finishes successfully.
737
  //
738
  // When the RPC is destroyed, a master lookup permit is returned to the
739
  // cache if one was acquired in the first place.
740
  scoped_refptr<MetaCache> meta_cache_;
741
742
  // Table to lookup. Optional in case lookup by ID, but if specified used to check if table
743
  // partitions are stale.
744
  const std::shared_ptr<const YBTable> table_;
745
746
  // Whether this lookup has acquired a master lookup permit.
747
  bool has_permit_ = false;
748
};
749
750
LookupRpc::LookupRpc(const scoped_refptr<MetaCache>& meta_cache,
751
                     const std::shared_ptr<const YBTable>& table,
752
                     int64_t request_no,
753
                     CoarseTimePoint deadline)
754
    : ClientMasterRpcBase(meta_cache->client_, deadline),
755
      request_no_(request_no),
756
      meta_cache_(meta_cache),
757
56.6k
      table_(table) {
758
56.6k
  DCHECK(deadline != CoarseTimePoint());
759
56.6k
}
760
761
56.6k
LookupRpc::~LookupRpc() {
762
56.6k
  if (has_permit_) {
763
56.6k
    meta_cache_->ReleaseMasterLookupPermit();
764
56.6k
  }
765
56.6k
}
766
767
54.7k
void LookupRpc::SendRpc() {
768
54.7k
  if (!has_permit_) {
769
54.7k
    has_permit_ = meta_cache_->AcquireMasterLookupPermit();
770
54.7k
  }
771
54.7k
  if (!has_permit_) {
772
    // Couldn't get a permit, try again in a little while.
773
0
    ScheduleRetry(STATUS(TryAgain, "Client has too many outstanding requests to the master"));
774
0
    return;
775
0
  }
776
777
  // See YBClient::Data::SyncLeaderMasterRpc().
778
54.7k
  auto now = CoarseMonoClock::Now();
779
54.7k
  if (retrier().deadline() < now) {
780
0
    Finished(STATUS_FORMAT(TimedOut, "Timed out after deadline expired, passed: $0",
781
0
                           MonoDelta(now - retrier().start())));
782
0
    return;
783
0
  }
784
54.7k
  mutable_retrier()->PrepareController();
785
786
54.7k
  ClientMasterRpcBase::SendRpc();
787
54.7k
}
788
789
namespace {
790
791
9.03k
CHECKED_STATUS GetFirstErrorForTabletById(const master::GetTabletLocationsResponsePB& resp) {
792
8.70k
  return resp.errors_size() > 0 ? StatusFromPB(resp.errors(0).status()) : Status::OK();
793
9.03k
}
794
795
47.5k
CHECKED_STATUS GetFirstErrorForTabletById(const master::GetTableLocationsResponsePB& resp) {
796
  // There are no per-tablet lookup errors inside GetTableLocationsResponsePB.
797
47.5k
  return Status::OK();
798
47.5k
}
799
800
template <class Response>
801
56.5k
boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) {
802
56.5k
  return resp.has_partition_list_version()
803
48.7k
             ? boost::make_optional<PartitionListVersion>(resp.partition_list_version())
804
7.79k
             : boost::none;
805
56.5k
}
meta_cache.cc:_ZN2yb6client8internal12_GLOBAL__N_123GetPartitionListVersionINS_6master27GetTableLocationsResponsePBEEEN5boost8optionalIjEERKT_
Line
Count
Source
801
47.5k
boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) {
802
47.5k
  return resp.has_partition_list_version()
803
47.5k
             ? boost::make_optional<PartitionListVersion>(resp.partition_list_version())
804
1
             : boost::none;
805
47.5k
}
meta_cache.cc:_ZN2yb6client8internal12_GLOBAL__N_123GetPartitionListVersionINS_6master28GetTabletLocationsResponsePBEEEN5boost8optionalIjEERKT_
Line
Count
Source
801
9.04k
boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) {
802
9.04k
  return resp.has_partition_list_version()
803
1.24k
             ? boost::make_optional<PartitionListVersion>(resp.partition_list_version())
804
7.79k
             : boost::none;
805
9.04k
}
806
807
} // namespace
808
809
template <class Response>
810
56.6k
void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) {
811
56.6k
  auto new_status = status;
812
56.6k
  if (new_status.ok()) {
813
56.5k
    new_status = VerifyResponse();
814
56.5k
  }
815
816
  // Prefer response failures over no tablets found.
817
56.6k
  if (new_status.ok()) {
818
56.5k
    new_status = GetFirstErrorForTabletById(resp);
819
56.5k
  }
820
821
56.6k
  if (new_status.ok() && resp.tablet_locations_size() == 0) {
822
0
    new_status = STATUS(NotFound, "No such tablet found");
823
0
  }
824
825
56.6k
  if (new_status.ok()) {
826
56.2k
    if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) {
827
0
      const auto s = STATUS(TimedOut, "Simulate timeout for test.");
828
0
      NotifyFailure(s);
829
0
      return;
830
0
    }
831
56.2k
    new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp));
832
56.2k
  }
833
56.6k
  if (!new_status.ok()) {
834
265
    YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1) << new_status;
835
421
    new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString()));
836
421
    NotifyFailure(new_status);
837
421
  }
838
56.6k
}
_ZN2yb6client8internal9LookupRpc17DoProcessResponseINS_6master27GetTableLocationsResponsePBEEEvRKNS_6StatusERKT_
Line
Count
Source
810
47.5k
void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) {
811
47.5k
  auto new_status = status;
812
47.5k
  if (new_status.ok()) {
813
47.5k
    new_status = VerifyResponse();
814
47.5k
  }
815
816
  // Prefer response failures over no tablets found.
817
47.5k
  if (new_status.ok()) {
818
47.5k
    new_status = GetFirstErrorForTabletById(resp);
819
47.5k
  }
820
821
47.5k
  if (new_status.ok() && resp.tablet_locations_size() == 0) {
822
0
    new_status = STATUS(NotFound, "No such tablet found");
823
0
  }
824
825
47.5k
  if (new_status.ok()) {
826
47.5k
    if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) {
827
0
      const auto s = STATUS(TimedOut, "Simulate timeout for test.");
828
0
      NotifyFailure(s);
829
0
      return;
830
0
    }
831
47.5k
    new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp));
832
47.5k
  }
833
47.5k
  if (!new_status.ok()) {
834
21
    YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1) << new_status;
835
85
    new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString()));
836
85
    NotifyFailure(new_status);
837
85
  }
838
47.5k
}
_ZN2yb6client8internal9LookupRpc17DoProcessResponseINS_6master28GetTabletLocationsResponsePBEEEvRKNS_6StatusERKT_
Line
Count
Source
810
9.04k
void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) {
811
9.04k
  auto new_status = status;
812
9.04k
  if (new_status.ok()) {
813
9.03k
    new_status = VerifyResponse();
814
9.03k
  }
815
816
  // Prefer response failures over no tablets found.
817
9.04k
  if (new_status.ok()) {
818
9.03k
    new_status = GetFirstErrorForTabletById(resp);
819
9.03k
  }
820
821
9.04k
  if (new_status.ok() && resp.tablet_locations_size() == 0) {
822
0
    new_status = STATUS(NotFound, "No such tablet found");
823
0
  }
824
825
9.04k
  if (new_status.ok()) {
826
8.70k
    if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) {
827
0
      const auto s = STATUS(TimedOut, "Simulate timeout for test.");
828
0
      NotifyFailure(s);
829
0
      return;
830
0
    }
831
8.70k
    new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp));
832
8.70k
  }
833
9.04k
  if (!new_status.ok()) {
834
244
    YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1) << new_status;
835
336
    new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString()));
836
336
    NotifyFailure(new_status);
837
336
  }
838
9.04k
}
839
840
namespace {
841
842
Status CheckTabletLocations(
843
56.2k
    const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations) {
844
56.2k
  const std::string* prev_partition_end = nullptr;
845
74.3k
  for (const TabletLocationsPB& loc : locations) {
846
74.3k
    if (prev_partition_end && *prev_partition_end > loc.partition().partition_key_start()) {
847
0
      LOG(DFATAL) << "There should be no overlaps in tablet partitions and they should be sorted "
848
0
                  << "by partition_key_start. Prev partition end: "
849
0
                  << Slice(*prev_partition_end).ToDebugHexString() << ", current partition start: "
850
0
                  << Slice(loc.partition().partition_key_start()).ToDebugHexString()
851
0
                  << ". Tablet locations: " << [&locations] {
852
0
                       std::string result;
853
0
                       for (auto& loc : locations) {
854
0
                         result += "\n  " + AsString(loc);
855
0
                       }
856
0
                       return result;
857
0
                     }();
858
0
      return STATUS(IllegalState, "Wrong order or overlaps in partitions");
859
0
    }
860
74.3k
    prev_partition_end = &loc.partition().partition_key_end();
861
74.3k
  }
862
56.2k
  return Status::OK();
863
56.2k
}
864
865
class TabletIdLookup : public ToStringable {
866
 public:
867
9.04k
  explicit TabletIdLookup(const TabletId& tablet_id) : tablet_id_(tablet_id) {}
868
869
0
  std::string ToString() const override {
870
0
    return Format("Tablet: $0", tablet_id_);
871
0
  }
872
873
 private:
874
  const TabletId& tablet_id_;
875
};
876
877
class TablePartitionLookup : public ToStringable {
878
 public:
879
  explicit TablePartitionLookup(
880
      const TableId& table_id, const VersionedPartitionGroupStartKey& partition_group_start)
881
47.5k
      : table_id_(table_id), partition_group_start_(partition_group_start) {}
882
883
0
  std::string ToString() const override {
884
0
    return Format("Table: $0, partition: $1, partition list version: $2",
885
0
                  table_id_, Slice(*partition_group_start_.key).ToDebugHexString(),
886
0
                  partition_group_start_.partition_list_version);
887
0
  }
888
889
 private:
890
  const TableId& table_id_;
891
  const VersionedPartitionGroupStartKey partition_group_start_;
892
};
893
894
class FullTableLookup : public ToStringable {
895
 public:
896
  explicit FullTableLookup(const TableId& table_id)
897
0
      : table_id_(table_id) {}
898
899
0
  std::string ToString() const override {
900
0
    return Format("FullTableLookup: $0", table_id_);
901
0
  }
902
 private:
903
  const TableId& table_id_;
904
};
905
906
} // namespace
907
908
Status MetaCache::ProcessTabletLocations(
909
    const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations,
910
56.2k
    boost::optional<PartitionListVersion> table_partition_list_version, LookupRpc* lookup_rpc) {
911
0
  if (VLOG_IS_ON(2)) {
912
0
    VLOG_WITH_PREFIX_AND_FUNC(2) << "lookup_rpc: " << AsString(lookup_rpc);
913
0
    for (const auto& loc : locations) {
914
0
      for (const auto& table_id : loc.table_ids()) {
915
0
        VLOG_WITH_PREFIX_AND_FUNC(2) << loc.tablet_id() << ", " << table_id;
916
0
      }
917
0
    }
918
0
    VLOG_WITH_PREFIX_AND_FUNC(4) << AsString(locations);
919
0
  }
920
921
56.2k
  RETURN_NOT_OK(CheckTabletLocations(locations));
922
923
56.2k
  std::vector<std::pair<LookupCallback, LookupCallbackVisitor>> to_notify;
924
56.2k
  {
925
56.2k
    std::lock_guard<decltype(mutex_)> lock(mutex_);
926
56.2k
    ProcessedTablesMap processed_tables;
927
928
74.3k
    for (const TabletLocationsPB& loc : locations) {
929
74.3k
      const std::string& tablet_id = loc.tablet_id();
930
      // Next, update the tablet caches.
931
74.3k
      RemoteTabletPtr remote = FindPtrOrNull(tablets_by_id_, tablet_id);
932
933
      // First, update the tserver cache, needed for the Refresh calls below.
934
219k
      for (const TabletLocationsPB_ReplicaPB& r : loc.replicas()) {
935
219k
        UpdateTabletServerUnlocked(r.ts_info());
936
219k
      }
937
938
74.3k
      VersionedTablePartitionListPtr colocated_table_partition_list;
939
74.3k
      if (loc.table_ids_size() > 1 && lookup_rpc && lookup_rpc->table()) {
940
        // When table_ids_size() == 1 we only receive info for the single table from the master
941
        // and we already have TableData initialized for it (this is done before sending an RPC to
942
        // the master). And when table_ids_size() > 1, it means we got response for lookup RPC for
943
        // co-located table and we can re-use TableData::partition_list from the table that was
944
        // requested by MetaCache::LookupTabletByKey caller for other tables co-located with this
945
        // one (since all co-located tables sharing the same set of tablets have the same table
946
        // partition list and now we have list of them returned by the master).
947
499
        const auto lookup_table_it = tables_.find(lookup_rpc->table()->id());
948
499
        if (lookup_table_it != tables_.end()) {
949
499
          colocated_table_partition_list = lookup_table_it->second.partition_list;
950
0
        } else {
951
          // We don't want to crash the server in that case for production, since this is not a
952
          // correctness issue, but gives some performance degradation on first lookups for
953
          // co-located tables.
954
          // But we do want it to crash in debug, so we can more reliably catch this if it happens.
955
0
          LOG_WITH_PREFIX(DFATAL) << Format(
956
0
              "Internal error: got response for lookup RPC for co-located table, but MetaCache "
957
0
              "table data wasn't initialized with partition list for this table. RPC: $0",
958
0
              AsString(lookup_rpc));
959
0
        }
960
499
      }
961
962
353k
      for (const std::string& table_id : loc.table_ids()) {
963
353k
        auto& processed_table = processed_tables[table_id];
964
353k
        std::map<PartitionKey, RemoteTabletPtr>* tablets_by_key = nullptr;
965
966
353k
        auto table_it = tables_.find(table_id);
967
353k
        if (table_it == tables_.end() && loc.table_ids_size() > 1 &&
968
270k
            colocated_table_partition_list) {
969
265k
          table_it = InitTableDataUnlocked(table_id, colocated_table_partition_list);
970
265k
        }
971
353k
        if (table_it != tables_.end()) {
972
340k
          auto& table_data = table_it->second;
973
974
1
          const auto msg_formatter = [&] {
975
1
            return Format(
976
1
                "Received table $0 partitions version: $1, MetaCache's table partitions version: "
977
1
                "$2",
978
1
                table_id, table_partition_list_version, table_data.partition_list->version);
979
1
          };
980
0
          VLOG_WITH_PREFIX_AND_FUNC(4) << msg_formatter();
981
340k
          if (table_partition_list_version.has_value()) {
982
340k
            if (table_partition_list_version.get() != table_data.partition_list->version) {
983
1
              return STATUS(
984
1
                  TryAgain, msg_formatter(),
985
1
                  ClientError(ClientErrorCode::kTablePartitionListIsStale));
986
1
            }
987
            // We need to guarantee that table_data.tablets_by_partition cache corresponds to
988
            // table_data.partition_list (see comments for TableData::partitions).
989
            // So, we don't update tablets_by_partition cache if we don't know table partitions
990
            // version for both response and TableData.
991
            // This only can happen for those LookupTabletById requests that don't specify table,
992
            // because they don't care about partitions changing.
993
340k
            tablets_by_key = &table_data.tablets_by_partition;
994
340k
          }
995
340k
        }
996
997
353k
        if (remote) {
998
          // Partition should not have changed.
999
282k
          DCHECK_EQ(loc.partition().partition_key_start(),
1000
282k
                    remote->partition().partition_key_start());
1001
282k
          DCHECK_EQ(loc.partition().partition_key_end(),
1002
282k
                    remote->partition().partition_key_end());
1003
1004
          // For colocated tables, RemoteTablet already exists because it was processed
1005
          // in a previous iteration of the for loop (for loc.table_ids()).
1006
          // We need to add this tablet to the current table's tablets_by_key map.
1007
282k
          if (tablets_by_key) {
1008
277k
            (*tablets_by_key)[remote->partition().partition_key_start()] = remote;
1009
277k
          }
1010
1011
0
          VLOG_WITH_PREFIX(5) << "Refreshing tablet " << tablet_id << ": "
1012
0
                              << loc.ShortDebugString();
1013
70.6k
        } else {
1014
0
          VLOG_WITH_PREFIX(5) << "Caching tablet " << tablet_id << ": " << loc.ShortDebugString();
1015
1016
70.6k
          Partition partition;
1017
70.6k
          Partition::FromPB(loc.partition(), &partition);
1018
70.6k
          remote = new RemoteTablet(
1019
70.6k
              tablet_id, partition, table_partition_list_version, loc.split_depth(),
1020
70.6k
              loc.split_parent_tablet_id());
1021
1022
70.6k
          CHECK(tablets_by_id_.emplace(tablet_id, remote).second);
1023
70.6k
          if (tablets_by_key) {
1024
62.7k
            auto emplace_result = tablets_by_key->emplace(partition.partition_key_start(), remote);
1025
62.7k
            if (!emplace_result.second) {
1026
12
              const auto& old_tablet = emplace_result.first->second;
1027
12
              if (old_tablet->split_depth() < remote->split_depth()) {
1028
                // Only replace with tablet of higher split_depth.
1029
12
                emplace_result.first->second = remote;
1030
0
              } else {
1031
                // If split_depth is the same - it should be the same tablet.
1032
0
                if (old_tablet->split_depth() == loc.split_depth()
1033
0
                    && old_tablet->tablet_id() != tablet_id) {
1034
0
                  const auto error_msg = Format(
1035
0
                      "Can't replace tablet $0 with $1 at partition_key_start $2, split_depth $3",
1036
0
                      old_tablet->tablet_id(), tablet_id, loc.partition().partition_key_start(),
1037
0
                      old_tablet->split_depth());
1038
0
                  LOG_WITH_PREFIX(DFATAL) << error_msg;
1039
                  // Just skip updating this tablet for release build.
1040
0
                }
1041
0
              }
1042
12
            }
1043
62.7k
          }
1044
70.6k
          MaybeUpdateClientRequests(*remote);
1045
70.6k
        }
1046
353k
        remote->Refresh(ts_cache_, loc.replicas());
1047
353k
        remote->SetExpectedReplicas(loc.expected_live_replicas(), loc.expected_read_replicas());
1048
353k
        if (table_partition_list_version.has_value()) {
1049
340k
          remote->MakeLastKnownPartitionListVersionAtLeast(*table_partition_list_version);
1050
340k
        }
1051
353k
        if (lookup_rpc) {
1052
352k
          lookup_rpc->UpdateProcessedTable(loc, remote, &processed_table);
1053
352k
        }
1054
353k
      }
1055
1056
74.3k
      auto it = tablet_lookups_by_id_.find(tablet_id);
1057
74.3k
      if (it != tablet_lookups_by_id_.end()) {
1058
33.0k
        while (auto* lookup = it->second.lookups.Pop()) {
1059
23.5k
          to_notify.emplace_back(std::move(lookup->callback),
1060
23.5k
                                 LookupCallbackVisitor(remote));
1061
23.5k
          delete lookup;
1062
23.5k
        }
1063
9.58k
      }
1064
74.3k
    }
1065
56.2k
    if (lookup_rpc) {
1066
56.2k
      lookup_rpc->AddCallbacksToBeNotified(processed_tables, &tables_, &to_notify);
1067
56.2k
      lookup_rpc->CleanupRequest();
1068
56.2k
    }
1069
56.2k
  }
1070
1071
93.8k
  for (const auto& callback_and_param : to_notify) {
1072
93.8k
    boost::apply_visitor(callback_and_param.second, callback_and_param.first);
1073
93.8k
  }
1074
1075
56.2k
  return Status::OK();
1076
56.2k
}
1077
1078
70.6k
void MetaCache::MaybeUpdateClientRequests(const RemoteTablet& tablet) {
1079
0
  VLOG_WITH_PREFIX_AND_FUNC(2) << "Tablet: " << tablet.tablet_id()
1080
0
                    << " split parent: " << tablet.split_parent_tablet_id();
1081
70.6k
  if (tablet.split_parent_tablet_id().empty()) {
1082
0
    VLOG_WITH_PREFIX(2) << "Tablet " << tablet.tablet_id() << " is not a result of split";
1083
70.5k
    return;
1084
70.5k
  }
1085
  // TODO: MetaCache is a friend of Client and tablet_requests_mutex_ with tablet_requests_ are
1086
  // public members of YBClient::Data. Consider refactoring that.
1087
34
  std::lock_guard<simple_spinlock> request_lock(client_->data_->tablet_requests_mutex_);
1088
34
  auto& tablet_requests = client_->data_->tablet_requests_;
1089
34
  const auto requests_it = tablet_requests.find(tablet.split_parent_tablet_id());
1090
34
  if (requests_it == tablet_requests.end()) {
1091
0
    VLOG_WITH_PREFIX(2) << "Can't find request_id_seq for tablet "
1092
0
                        << tablet.split_parent_tablet_id();
1093
    // This can happen if client wasn't active (for example node was partitioned away) during
1094
    // sequence of splits that resulted in `tablet` creation, so we don't have info about `tablet`
1095
    // split parent.
1096
    // In this case we set request_id_seq to special value and will reset it on getting
1097
    // "request id is less than min" error. We will use min request ID plus 2^24 (there wouldn't be
1098
    // 2^24 client requests in progress from the same client to the same tablet, so it is safe to do
1099
    // this).
1100
10
    tablet_requests.emplace(
1101
10
        tablet.tablet_id(),
1102
10
        YBClient::Data::TabletRequests {
1103
10
            .request_id_seq = kInitializeFromMinRunning
1104
10
        });
1105
10
    return;
1106
10
  }
1107
24
  VLOG_WITH_PREFIX(2) << "Setting request_id_seq for tablet " << tablet.tablet_id()
1108
0
                      << " from tablet " << tablet.split_parent_tablet_id() << " to "
1109
0
                      << requests_it->second.request_id_seq;
1110
24
  tablet_requests[tablet.tablet_id()].request_id_seq = requests_it->second.request_id_seq;
1111
24
}
1112
1113
std::unordered_map<TableId, TableData>::iterator MetaCache::InitTableDataUnlocked(
1114
309k
    const TableId& table_id, const VersionedTablePartitionListPtr& partitions) {
1115
0
  VLOG_WITH_PREFIX_AND_FUNC(4) << Format(
1116
0
      "MetaCache initializing TableData ($0 tables) for table $1: $2, "
1117
0
      "partition_list_version: $3",
1118
0
      tables_.size(), table_id, tables_.count(table_id), partitions->version);
1119
309k
  return tables_.emplace(
1120
309k
      std::piecewise_construct, std::forward_as_tuple(table_id),
1121
309k
      std::forward_as_tuple(partitions)).first;
1122
309k
}
1123
1124
16.0k
void MetaCache::InvalidateTableCache(const YBTable& table) {
1125
16.0k
  const auto& table_id = table.id();
1126
16.0k
  const auto table_partition_list = table.GetVersionedPartitions();
1127
0
  VLOG_WITH_PREFIX_AND_FUNC(1) << Format(
1128
0
      "table: $0, table.partition_list.version: $1", table_id, table_partition_list->version);
1129
1130
16.0k
  std::vector<LookupCallback> to_notify;
1131
1132
16.0k
  auto invalidate_needed = [this, &table_id, &table_partition_list](const auto& it) {
1133
16.0k
    const auto table_data_partition_list_version = it->second.partition_list->version;
1134
0
    VLOG_WITH_PREFIX(1) << Format(
1135
0
        "tables_[$0].partition_list.version: $1", table_id, table_data_partition_list_version);
1136
    // Only need to invalidate table cache, if it is has partition list version older than table's
1137
    // one.
1138
16.0k
    return table_data_partition_list_version < table_partition_list->version;
1139
16.0k
  };
1140
1141
16.0k
  {
1142
16.0k
    SharedLock<decltype(mutex_)> lock(mutex_);
1143
16.0k
    auto it = tables_.find(table_id);
1144
16.0k
    if (it != tables_.end()) {
1145
16.0k
      if (!invalidate_needed(it)) {
1146
16.0k
        return;
1147
16.0k
      }
1148
12
    }
1149
12
  }
1150
1151
12
  {
1152
12
    UniqueLock<decltype(mutex_)> lock(mutex_);
1153
12
    auto it = tables_.find(table_id);
1154
12
    if (it != tables_.end()) {
1155
12
      if (!invalidate_needed(it)) {
1156
0
        return;
1157
0
      }
1158
0
    } else {
1159
0
      it = InitTableDataUnlocked(table_id, table_partition_list);
1160
      // Nothing to invalidate, we have just initiliazed it first time.
1161
0
      return;
1162
0
    }
1163
1164
12
    auto& table_data = it->second;
1165
1166
    // Some partitions could be mapped to tablets that have been split and we need to re-fetch
1167
    // info about tablets serving partitions.
1168
32
    for (auto& tablet : table_data.tablets_by_partition) {
1169
32
      tablet.second->MarkStale();
1170
32
    }
1171
1172
0
    for (auto& tablet : table_data.all_tablets) {
1173
0
      tablet->MarkStale();
1174
0
    }
1175
    // TODO(tsplit): Optimize to retry only necessary lookups inside ProcessTabletLocations,
1176
    // detect which need to be retried by GetTableLocationsResponsePB.partition_list_version.
1177
14
    for (auto& group_lookups : table_data.tablet_lookups_by_group) {
1178
14
      while (auto* lookup = group_lookups.second.lookups.Pop()) {
1179
0
        to_notify.push_back(std::move(lookup->callback));
1180
0
        delete lookup;
1181
0
      }
1182
14
    }
1183
12
    table_data.tablet_lookups_by_group.clear();
1184
1185
    // Only update partitions here after invalidating TableData cache to avoid inconsistencies.
1186
    // See https://github.com/yugabyte/yugabyte-db/issues/6890.
1187
12
    table_data.partition_list = table_partition_list;
1188
12
  }
1189
0
  for (const auto& callback : to_notify) {
1190
0
    const auto s = STATUS_EC_FORMAT(
1191
0
        TryAgain, ClientError(ClientErrorCode::kMetaCacheInvalidated),
1192
0
        "MetaCache for table $0 has been invalidated.", table_id);
1193
0
    boost::apply_visitor(LookupCallbackVisitor(s), callback);
1194
0
  }
1195
12
}
1196
1197
class MetaCache::CallbackNotifier {
1198
 public:
1199
421
  explicit CallbackNotifier(const Status& status) : status_(status) {}
1200
1201
2.26k
  void Add(LookupCallback&& callback) {
1202
2.26k
    callbacks_.push_back(std::move(callback));
1203
2.26k
  }
1204
1205
2
  void SetStatus(const Status& status) {
1206
2
    status_ = status;
1207
2
  }
1208
1209
421
  ~CallbackNotifier() {
1210
2.26k
    for (const auto& callback : callbacks_) {
1211
2.26k
      boost::apply_visitor(LookupCallbackVisitor(status_), callback);
1212
2.26k
    }
1213
421
  }
1214
 private:
1215
  std::vector<LookupCallback> callbacks_;
1216
  Status status_;
1217
};
1218
1219
CoarseTimePoint MetaCache::LookupFailed(
1220
    const Status& status, int64_t request_no, const ToStringable& lookup_id,
1221
    LookupDataGroup* lookup_data_group,
1222
421
    CallbackNotifier* notifier) {
1223
421
  std::vector<LookupData*> retry;
1224
421
  auto now = CoarseMonoClock::Now();
1225
421
  CoarseTimePoint max_deadline;
1226
1227
3.17k
  while (auto* lookup = lookup_data_group->lookups.Pop()) {
1228
2.75k
    if (!status.IsTimedOut() || lookup->deadline <= now) {
1229
2.26k
      notifier->Add(std::move(lookup->callback));
1230
2.26k
      delete lookup;
1231
488
    } else {
1232
488
      max_deadline = std::max(max_deadline, lookup->deadline);
1233
488
      retry.push_back(lookup);
1234
488
    }
1235
2.75k
  }
1236
1237
421
  if (retry.empty()) {
1238
416
    lookup_data_group->Finished(request_no, lookup_id);
1239
5
  } else {
1240
488
    for (auto* lookup : retry) {
1241
488
      lookup_data_group->lookups.Push(lookup);
1242
488
    }
1243
5
  }
1244
1245
421
  return max_deadline;
1246
421
}
1247
1248
class LookupByIdRpc : public LookupRpc {
1249
 public:
1250
  LookupByIdRpc(const scoped_refptr<MetaCache>& meta_cache,
1251
                const TabletId& tablet_id,
1252
                const std::shared_ptr<const YBTable>& table,
1253
                master::IncludeInactive include_inactive,
1254
                int64_t request_no,
1255
                CoarseTimePoint deadline,
1256
                int64_t lookups_without_new_replicas)
1257
      : LookupRpc(meta_cache, table, request_no, deadline),
1258
        tablet_id_(tablet_id),
1259
9.08k
        include_inactive_(include_inactive) {
1260
9.08k
    if (lookups_without_new_replicas != 0) {
1261
810
      send_delay_ = std::min(
1262
810
          lookups_without_new_replicas * FLAGS_meta_cache_lookup_throttling_step_ms,
1263
810
          FLAGS_meta_cache_lookup_throttling_max_delay_ms) * 1ms;
1264
810
    }
1265
9.08k
  }
1266
1267
674
  std::string ToString() const override {
1268
674
    return Format("LookupByIdRpc(tablet: $0, num_attempts: $1)", tablet_id_, num_attempts());
1269
674
  }
1270
1271
9.91k
  void SendRpc() override {
1272
9.91k
    if (send_delay_) {
1273
810
      auto delay = send_delay_;
1274
810
      send_delay_ = MonoDelta();
1275
810
      auto status = mutable_retrier()->DelayedRetry(this, Status::OK(), delay);
1276
810
      if (!status.ok()) {
1277
0
        Finished(status);
1278
0
      }
1279
810
      return;
1280
810
    }
1281
1282
9.10k
    LookupRpc::SendRpc();
1283
9.10k
  }
1284
1285
9.10k
  void CallRemoteMethod() override {
1286
    // Fill out the request.
1287
9.10k
    req_.clear_tablet_ids();
1288
9.10k
    req_.add_tablet_ids(tablet_id_);
1289
9.10k
    if (table()) {
1290
1.31k
      req_.set_table_id(table()->id());
1291
1.31k
    }
1292
9.10k
    req_.set_include_inactive(include_inactive_);
1293
1294
9.10k
    master_client_proxy()->GetTabletLocationsAsync(
1295
9.10k
        req_, &resp_, mutable_retrier()->mutable_controller(),
1296
9.10k
        std::bind(&LookupByIdRpc::Finished, this, Status::OK()));
1297
9.10k
  }
1298
1299
  void AddCallbacksToBeNotified(
1300
    const ProcessedTablesMap& processed_tables,
1301
    std::unordered_map<TableId, TableData>* tables,
1302
8.70k
    std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override {
1303
    // Nothing to do when looking up by id.
1304
8.70k
    return;
1305
8.70k
  }
1306
1307
  void UpdateProcessedTable(const TabletLocationsPB& loc,
1308
                            RemoteTabletPtr remote,
1309
14.4k
                            ProcessedTablesMap::mapped_type* processed_table) override {
1310
14.4k
    return;
1311
14.4k
  }
1312
1313
 private:
1314
9.04k
  void ProcessResponse(const Status& status) override {
1315
9.04k
    DoProcessResponse(status, resp_);
1316
9.04k
  }
1317
1318
9.11k
  Status ResponseStatus() override {
1319
9.11k
    return StatusFromResp(resp_);
1320
9.11k
  }
1321
1322
8.70k
  void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS {
1323
8.70k
    auto& lookups = meta_cache()->tablet_lookups_by_id_;
1324
8.70k
    auto it = lookups.find(tablet_id_);
1325
8.70k
    TabletIdLookup tablet_id_lookup(tablet_id_);
1326
8.70k
    if (it != lookups.end()) {
1327
8.70k
      it->second.Finished(request_no(), tablet_id_lookup);
1328
0
    } else {
1329
0
      LOG_WITH_PREFIX(INFO) << "Cleanup request for unknown tablet: "
1330
0
                            << tablet_id_lookup.ToString();
1331
0
    }
1332
8.70k
  }
1333
1334
336
  void NotifyFailure(const Status& status) override {
1335
336
    meta_cache()->LookupByIdFailed(
1336
336
        tablet_id_, table(), include_inactive_,
1337
336
        GetPartitionListVersion(resp_), request_no(), status);
1338
336
  }
1339
1340
  Status ProcessTabletLocations(
1341
      const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations,
1342
8.70k
      boost::optional<PartitionListVersion> table_partition_list_version) override {
1343
8.70k
    return meta_cache()->ProcessTabletLocations(
1344
8.70k
        locations, table_partition_list_version, this);
1345
8.70k
  }
1346
1347
  // Tablet to lookup.
1348
  const TabletId tablet_id_;
1349
1350
  // Whether or not to lookup inactive (hidden) tablets.
1351
  master::IncludeInactive include_inactive_;
1352
1353
  // Request body.
1354
  master::GetTabletLocationsRequestPB req_;
1355
1356
  // Response body.
1357
  master::GetTabletLocationsResponsePB resp_;
1358
1359
  MonoDelta send_delay_;
1360
};
1361
1362
class LookupFullTableRpc : public LookupRpc {
1363
 public:
1364
  LookupFullTableRpc(const scoped_refptr<MetaCache>& meta_cache,
1365
                     const std::shared_ptr<const YBTable>& table,
1366
                     int64_t request_no,
1367
                     CoarseTimePoint deadline)
1368
0
      : LookupRpc(meta_cache, table, request_no, deadline) {
1369
0
  }
1370
1371
0
  std::string ToString() const override {
1372
0
    return Format("LookupFullTableRpc($0, $1)", table()->name(), num_attempts());
1373
0
  }
1374
1375
0
  void CallRemoteMethod() override {
1376
    // Fill out the request.
1377
0
    req_.mutable_table()->set_table_id(table()->id());
1378
    // The end partition key is left unset intentionally so that we'll prefetch
1379
    // some additional tablets.
1380
0
    master_client_proxy()->GetTableLocationsAsync(
1381
0
        req_, &resp_, mutable_retrier()->mutable_controller(),
1382
0
        std::bind(&LookupFullTableRpc::Finished, this, Status::OK()));
1383
0
  }
1384
1385
  void AddCallbacksToBeNotified(
1386
      const ProcessedTablesMap& processed_tables,
1387
      std::unordered_map<TableId, TableData>* tables,
1388
0
      std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override {
1389
0
    for (const auto& processed_table : processed_tables) {
1390
      // Handle tablet range
1391
0
      const auto it = tables->find(processed_table.first);
1392
0
      if (it == tables->end()) {
1393
0
        continue;
1394
0
      }
1395
0
      auto& table_data = it->second;
1396
0
      auto& full_table_lookups = table_data.full_table_lookups;
1397
0
      while (auto* lookup = full_table_lookups.lookups.Pop()) {
1398
0
        std::vector<internal::RemoteTabletPtr> remote_tablets;
1399
0
        for (const auto& entry : processed_table.second) {
1400
0
          remote_tablets.push_back(entry.second);
1401
0
        }
1402
0
        table_data.all_tablets = remote_tablets;
1403
0
        to_notify->emplace_back(std::move(lookup->callback),
1404
0
                                LookupCallbackVisitor(std::move(remote_tablets)));
1405
0
        delete lookup;
1406
0
      }
1407
0
    }
1408
0
  }
1409
1410
  void UpdateProcessedTable(const TabletLocationsPB& loc,
1411
                            RemoteTabletPtr remote,
1412
0
                            ProcessedTablesMap::mapped_type* processed_table) override {
1413
0
    processed_table->emplace(loc.partition().partition_key_start(), remote);
1414
0
  }
1415
1416
 private:
1417
0
  void ProcessResponse(const Status& status) override {
1418
0
    DoProcessResponse(status, resp_);
1419
0
  }
1420
1421
0
  Status ResponseStatus() override {
1422
0
    return StatusFromResp(resp_);
1423
0
  }
1424
1425
0
  void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS {
1426
0
    const auto it = meta_cache()->tables_.find(table()->id());
1427
0
    if (it == meta_cache()->tables_.end()) {
1428
0
      return;
1429
0
    }
1430
0
    auto& table_data = it->second;
1431
0
    auto full_table_lookup = FullTableLookup(table()->id());
1432
0
    table_data.full_table_lookups.Finished(request_no(), full_table_lookup, false);
1433
0
  }
1434
1435
0
  void NotifyFailure(const Status& status) override {
1436
0
    meta_cache()->LookupFullTableFailed(table(), request_no(), status);
1437
0
  }
1438
1439
  Status ProcessTabletLocations(
1440
      const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations,
1441
0
      boost::optional<PartitionListVersion> table_partition_list_version) override {
1442
0
    return meta_cache()->ProcessTabletLocations(
1443
0
        locations, table_partition_list_version, this);
1444
0
  }
1445
1446
  // Request body.
1447
  GetTableLocationsRequestPB req_;
1448
1449
  // Response body.
1450
  GetTableLocationsResponsePB resp_;
1451
};
1452
1453
class LookupByKeyRpc : public LookupRpc {
1454
 public:
1455
  LookupByKeyRpc(const scoped_refptr<MetaCache>& meta_cache,
1456
                 const std::shared_ptr<const YBTable>& table,
1457
                 const VersionedPartitionGroupStartKey& partition_group_start,
1458
                 int64_t request_no,
1459
                 CoarseTimePoint deadline)
1460
      : LookupRpc(meta_cache, table, request_no, deadline),
1461
47.5k
        partition_group_start_(partition_group_start) {
1462
47.5k
  }
1463
1464
253
  std::string ToString() const override {
1465
253
    return Format(
1466
253
        "GetTableLocations { table_name: $0, table_id: $1, partition_start_key: $2, "
1467
253
        "partition_list_version: $3, "
1468
253
        "request_no: $4, num_attempts: $5 }",
1469
253
        table()->name(),
1470
253
        table()->id(),
1471
253
        table()->partition_schema().PartitionKeyDebugString(
1472
253
            *partition_group_start_.key, internal::GetSchema(table()->schema())),
1473
253
        partition_group_start_.partition_list_version,
1474
253
        request_no(),
1475
253
        num_attempts());
1476
253
  }
1477
1478
0
  const string& table_id() const { return table()->id(); }
1479
1480
47.5k
  void CallRemoteMethod() override {
1481
    // Fill out the request.
1482
47.5k
    req_.mutable_table()->set_table_id(table()->id());
1483
47.5k
    req_.set_partition_key_start(*partition_group_start_.key);
1484
47.5k
    req_.set_max_returned_locations(kPartitionGroupSize);
1485
1486
    // The end partition key is left unset intentionally so that we'll prefetch
1487
    // some additional tablets.
1488
47.5k
    master_client_proxy()->GetTableLocationsAsync(
1489
47.5k
        req_, &resp_, mutable_retrier()->mutable_controller(),
1490
47.5k
        std::bind(&LookupByKeyRpc::Finished, this, Status::OK()));
1491
47.5k
  }
1492
1493
  void AddCallbacksToBeNotified(
1494
      const ProcessedTablesMap& processed_tables,
1495
      std::unordered_map<TableId, TableData>* tables,
1496
47.5k
      std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override {
1497
320k
    for (const auto& processed_table : processed_tables) {
1498
320k
      const auto table_it = tables->find(processed_table.first);
1499
320k
      if (table_it == tables->end()) {
1500
0
        continue;
1501
0
      }
1502
320k
      auto& table_data = table_it->second;
1503
      // This should be guaranteed by ProcessTabletLocations before we get here:
1504
0
      DCHECK(table_data.partition_list->version == partition_group_start_.partition_list_version)
1505
0
          << "table_data.partition_list->version: " << table_data.partition_list->version
1506
0
          << " partition_group_start_.partition_list_version: "
1507
0
          << partition_group_start_.partition_list_version;
1508
320k
      const auto lookup_by_group_iter =
1509
320k
          table_data.tablet_lookups_by_group.find(*partition_group_start_.key);
1510
320k
      if (lookup_by_group_iter != table_data.tablet_lookups_by_group.end()) {
1511
0
        VLOG_WITH_PREFIX_AND_FUNC(4)
1512
0
            << "Checking tablet_lookups_by_group for partition_group_start: "
1513
0
            << Slice(*partition_group_start_.key).ToDebugHexString();
1514
47.5k
        auto& lookups_group = lookup_by_group_iter->second;
1515
117k
        while (auto* lookup = lookups_group.lookups.Pop()) {
1516
70.3k
          auto remote_it = processed_table.second.find(*lookup->partition_start);
1517
70.3k
          auto lookup_visitor = LookupCallbackVisitor(
1518
70.3k
              remote_it != processed_table.second.end() ? remote_it->second : nullptr);
1519
70.3k
          to_notify->emplace_back(std::move(lookup->callback), lookup_visitor);
1520
70.3k
          delete lookup;
1521
70.3k
        }
1522
47.5k
      }
1523
320k
    }
1524
47.5k
  }
1525
1526
  void UpdateProcessedTable(const TabletLocationsPB& loc,
1527
                            RemoteTabletPtr remote,
1528
338k
                            ProcessedTablesMap::mapped_type* processed_table) override {
1529
338k
    processed_table->emplace(loc.partition().partition_key_start(), remote);
1530
338k
  }
1531
1532
 private:
1533
47.5k
  void ProcessResponse(const Status& status) override {
1534
47.5k
    DoProcessResponse(status, resp_);
1535
47.5k
  }
1536
1537
47.6k
  Status ResponseStatus() override {
1538
47.6k
    return StatusFromResp(resp_);
1539
47.6k
  }
1540
1541
47.5k
  Status VerifyResponse() override {
1542
    // Note: if LookupByIdRpc response has no partition list version this means this response
1543
    // is from master that doesn't support tablet splitting and it is OK to treat it as version 0
1544
    // (and 0 return value of resp_.partition_list_version() is OK).
1545
47.5k
    const auto req_partition_list_version = partition_group_start_.partition_list_version;
1546
47.5k
    const auto resp_partition_list_version = resp_.partition_list_version();
1547
1548
0
    const auto versions_formatter = [&] {
1549
0
      return Format(
1550
0
          "RPC: $0, response partition_list_version: $1", this->ToString(),
1551
0
          resp_partition_list_version);
1552
0
    };
1553
1554
47.5k
    if (resp_partition_list_version < req_partition_list_version) {
1555
      // This means an issue on master side, table partition list version shouldn't decrease on
1556
      // master.
1557
0
      const auto msg = Format("Ignoring response for obsolete RPC call. $0", versions_formatter());
1558
0
      LOG_WITH_PREFIX(DFATAL) << msg;
1559
0
      return STATUS(IllegalState, msg);
1560
0
    }
1561
47.5k
    if (resp_partition_list_version > req_partition_list_version) {
1562
      // This request is for partition_group_start calculated based on obsolete table partition
1563
      // list.
1564
0
      const auto msg = Format(
1565
0
          "Cached table partition list version is obsolete, refresh required. $0",
1566
0
          versions_formatter());
1567
0
      VLOG_WITH_PREFIX_AND_FUNC(3) << msg;
1568
0
      return STATUS(TryAgain, msg, ClientError(ClientErrorCode::kTablePartitionListIsStale));
1569
0
    }
1570
    // resp_partition_list_version == req_partition_list_version
1571
47.5k
    return Status::OK();
1572
47.5k
  }
1573
1574
47.5k
  void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS {
1575
47.5k
    const auto it = meta_cache()->tables_.find(table()->id());
1576
47.5k
    if (it == meta_cache()->tables_.end()) {
1577
0
      return;
1578
0
    }
1579
47.5k
    auto& table_data = it->second;
1580
    // This should be guaranteed by ProcessTabletLocations before we get here:
1581
0
    DCHECK(table_data.partition_list->version == partition_group_start_.partition_list_version)
1582
0
        << "table_data.partition_list->version: " << table_data.partition_list->version
1583
0
        << " partition_group_start_.partition_list_version: "
1584
0
        << partition_group_start_.partition_list_version;
1585
47.5k
    const auto lookup_by_group_iter = table_data.tablet_lookups_by_group.find(
1586
47.5k
        *partition_group_start_.key);
1587
47.5k
    TablePartitionLookup tablet_partition_lookup(table()->id(), partition_group_start_);
1588
47.5k
    if (lookup_by_group_iter != table_data.tablet_lookups_by_group.end()) {
1589
47.5k
      lookup_by_group_iter->second.Finished(request_no(), tablet_partition_lookup, false);
1590
0
    } else {
1591
0
      LOG_WITH_PREFIX(INFO) << "Cleanup request for unknown partition group: "
1592
0
                << tablet_partition_lookup.ToString();
1593
0
    }
1594
47.5k
  }
1595
1596
85
  void NotifyFailure(const Status& status) override {
1597
85
    meta_cache()->LookupByKeyFailed(
1598
85
        table(), partition_group_start_, resp_.partition_list_version(), request_no(), status);
1599
85
  }
1600
1601
  Status ProcessTabletLocations(
1602
      const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations,
1603
47.5k
      boost::optional<PartitionListVersion> table_partition_list_version) override {
1604
18.4E
    VLOG_WITH_PREFIX_AND_FUNC(2) << "partition_group_start: " << partition_group_start_.ToString();
1605
    // This condition is guaranteed by VerifyResponse function:
1606
47.5k
    CHECK(resp_.partition_list_version() == partition_group_start_.partition_list_version);
1607
1608
47.5k
    return meta_cache()->ProcessTabletLocations(locations, table_partition_list_version, this);
1609
47.5k
  }
1610
1611
  // Encoded partition group start key to lookup.
1612
  VersionedPartitionGroupStartKey partition_group_start_;
1613
1614
  // Request body.
1615
  GetTableLocationsRequestPB req_;
1616
1617
  // Response body.
1618
  GetTableLocationsResponsePB resp_;
1619
};
1620
1621
void MetaCache::LookupByKeyFailed(
1622
    const std::shared_ptr<const YBTable>& table,
1623
    const VersionedPartitionGroupStartKey& partition_group_start,
1624
    PartitionListVersion response_partition_list_version,
1625
85
    int64_t request_no, const Status& status) {
1626
85
  const auto req_partition_list_version = partition_group_start.partition_list_version;
1627
0
  VLOG_WITH_PREFIX(1) << "Lookup for table " << table->id() << " and partition group start "
1628
0
                      << Slice(*partition_group_start.key).ToDebugHexString()
1629
0
                      << ", request partition list version: " << req_partition_list_version
1630
0
                      << ", response partition list version: " << response_partition_list_version
1631
0
                      << " failed with: " << status;
1632
1633
85
  CallbackNotifier notifier(status);
1634
85
  CoarseTimePoint max_deadline;
1635
85
  {
1636
85
    std::lock_guard<decltype(mutex_)> lock(mutex_);
1637
85
    auto it = tables_.find(table->id());
1638
85
    if (it == tables_.end()) {
1639
0
      return;
1640
0
    }
1641
1642
85
    auto& table_data = it->second;
1643
85
    const auto table_data_partition_list_version = table_data.partition_list->version;
1644
0
    const auto versions_formatter = [&] {
1645
0
      return Format(
1646
0
          "MetaCache's table $0 partition list version: $1, stored in RPC call: $2, received: $3",
1647
0
          table->id(), table_data_partition_list_version,
1648
0
          req_partition_list_version, response_partition_list_version);
1649
0
    };
1650
1651
85
    if (table_data_partition_list_version < req_partition_list_version) {
1652
      // MetaCache partition list version is older than stored in LookupByKeyRpc for which we've
1653
      // received an answer.
1654
      // This shouldn't happen, because MetaCache partition list version for each table couldn't
1655
      // decrease and we store it inside LookupByKeyRpc when creating an RPC.
1656
0
      LOG_WITH_PREFIX(FATAL) << Format(
1657
0
          "Cached table partition list version is older than stored in RPC call. $0",
1658
0
          versions_formatter());
1659
85
    } else if (table_data_partition_list_version > req_partition_list_version) {
1660
      // MetaCache table  partition list version has updated since we've sent this RPC.
1661
      // We've already failed and cleaned all registered lookups for the table on MetaCache table
1662
      // partition list update, so we should ignore responses as well.
1663
0
      VLOG_WITH_PREFIX_AND_FUNC(3) << Format(
1664
0
          "Cached table partition list version is newer than stored in RPC call, ignoring "
1665
0
          "failure response. $0", versions_formatter());
1666
0
      return;
1667
0
    }
1668
1669
    // Since table_data_partition_list_version == req_partition_list_version,
1670
    // we will get tablet lookups for this exact RPC call there:
1671
85
    auto key_lookup_iterator = table_data.tablet_lookups_by_group.find(*partition_group_start.key);
1672
85
    if (key_lookup_iterator != table_data.tablet_lookups_by_group.end()) {
1673
85
      auto& lookup_data_group = key_lookup_iterator->second;
1674
85
      max_deadline = LookupFailed(status, request_no,
1675
85
                                  TablePartitionLookup(table->id(), partition_group_start),
1676
85
                                  &lookup_data_group, &notifier);
1677
85
    }
1678
85
  }
1679
1680
85
  if (max_deadline != CoarseTimePoint()) {
1681
5
    auto rpc = std::make_shared<LookupByKeyRpc>(
1682
5
        this, table, partition_group_start, request_no, max_deadline);
1683
5
    client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1684
5
  }
1685
85
}
1686
1687
void MetaCache::LookupFullTableFailed(const std::shared_ptr<const YBTable>& table,
1688
0
                                      int64_t request_no, const Status& status) {
1689
0
  VLOG_WITH_PREFIX(1) << "Lookup for table " << table->id() << " failed with: " << status;
1690
1691
0
  CallbackNotifier notifier(status);
1692
0
  CoarseTimePoint max_deadline;
1693
0
  {
1694
0
    std::lock_guard<decltype(mutex_)> lock(mutex_);
1695
0
    auto it = tables_.find(table->id());
1696
0
    if (it == tables_.end()) {
1697
0
      return;
1698
0
    }
1699
1700
0
    max_deadline = LookupFailed(status, request_no, FullTableLookup(table->id()),
1701
0
                                &it->second.full_table_lookups, &notifier);
1702
0
  }
1703
1704
0
  if (max_deadline != CoarseTimePoint()) {
1705
0
    auto rpc = std::make_shared<LookupFullTableRpc>(this, table, request_no, max_deadline);
1706
0
    client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1707
0
  }
1708
0
}
1709
1710
void MetaCache::LookupByIdFailed(
1711
    const TabletId& tablet_id,
1712
    const std::shared_ptr<const YBTable>& table,
1713
    master::IncludeInactive include_inactive,
1714
    const boost::optional<PartitionListVersion>& response_partition_list_version,
1715
    int64_t request_no,
1716
336
    const Status& status) {
1717
0
  VLOG_WITH_PREFIX(1) << "Lookup for tablet " << tablet_id << ", failed with: " << status;
1718
1719
336
  CallbackNotifier notifier(status);
1720
336
  CoarseTimePoint max_deadline;
1721
336
  {
1722
336
    std::lock_guard<decltype(mutex_)> lock(mutex_);
1723
336
    if (status.IsNotFound() && response_partition_list_version.has_value()) {
1724
276
      auto tablet = LookupTabletByIdFastPathUnlocked(tablet_id);
1725
276
      if (tablet) {
1726
276
        const auto tablet_last_known_table_partition_list_version =
1727
276
            tablet->GetLastKnownPartitionListVersion();
1728
276
        if (tablet_last_known_table_partition_list_version <
1729
2
            response_partition_list_version.value()) {
1730
2
          const auto msg_formatter = [&] {
1731
2
            return Format(
1732
2
                "Received table $0 ($1) partitions version: $2, last known by MetaCache for "
1733
2
                "tablet $3: $4",
1734
2
                table->id(), table->name(), response_partition_list_version,
1735
2
                tablet_id, tablet_last_known_table_partition_list_version);
1736
2
          };
1737
0
          VLOG_WITH_PREFIX_AND_FUNC(3) << msg_formatter();
1738
2
          notifier.SetStatus(STATUS(
1739
2
              TryAgain, msg_formatter(), ClientError(ClientErrorCode::kTablePartitionListIsStale)));
1740
2
        }
1741
276
      }
1742
276
    }
1743
1744
336
    auto table_lookup_iterator = tablet_lookups_by_id_.find(tablet_id);
1745
336
    if (table_lookup_iterator != tablet_lookups_by_id_.end()) {
1746
336
      auto& lookup_data_group = table_lookup_iterator->second;
1747
336
      max_deadline = LookupFailed(status, request_no, TabletIdLookup(tablet_id),
1748
336
                                  &lookup_data_group, &notifier);
1749
336
    }
1750
336
  }
1751
1752
336
  if (max_deadline != CoarseTimePoint()) {
1753
0
    auto rpc = std::make_shared<LookupByIdRpc>(
1754
0
        this, tablet_id, table, include_inactive, request_no, max_deadline, 0);
1755
0
    client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1756
0
  }
1757
336
}
1758
1759
RemoteTabletPtr MetaCache::LookupTabletByKeyFastPathUnlocked(
1760
11.1M
    const TableId& table_id, const VersionedPartitionStartKey& versioned_partition_start_key) {
1761
11.1M
  auto it = tables_.find(table_id);
1762
11.1M
  if (PREDICT_FALSE(it == tables_.end())) {
1763
    // No cache available for this table.
1764
88.5k
    return nullptr;
1765
88.5k
  }
1766
1767
11.0M
  const auto& table_data = it->second;
1768
1769
11.0M
  if (PREDICT_FALSE(
1770
11.0M
          table_data.partition_list->version !=
1771
0
          versioned_partition_start_key.partition_list_version)) {
1772
    // TableData::partition_list version in cache does not match partition_list_version used to
1773
    // calculate partition_key_start, can't use cache.
1774
0
    return nullptr;
1775
0
  }
1776
1777
11.0M
  const auto& partition_start_key = *versioned_partition_start_key.key;
1778
1779
11.0M
  DCHECK_EQ(
1780
11.0M
      partition_start_key,
1781
11.0M
      *client::FindPartitionStart(table_data.partition_list, partition_start_key));
1782
11.0M
  auto tablet_it = table_data.tablets_by_partition.find(partition_start_key);
1783
11.0M
  if (PREDICT_FALSE(tablet_it == it->second.tablets_by_partition.end())) {
1784
    // No tablets with a start partition key lower than 'partition_key'.
1785
23.7k
    return nullptr;
1786
23.7k
  }
1787
1788
11.0M
  const auto& result = tablet_it->second;
1789
1790
  // Stale entries must be re-fetched.
1791
11.0M
  if (result->stale()) {
1792
476
    return nullptr;
1793
476
  }
1794
1795
11.0M
  if (result->partition().partition_key_end().compare(partition_start_key) > 0 ||
1796
11.0M
      result->partition().partition_key_end().empty()) {
1797
    // partition_start_key < partition.end OR tablet does not end.
1798
11.0M
    return result;
1799
11.0M
  }
1800
1801
9.02k
  return nullptr;
1802
9.02k
}
1803
1804
boost::optional<std::vector<RemoteTabletPtr>> MetaCache::FastLookupAllTabletsUnlocked(
1805
0
    const std::shared_ptr<const YBTable>& table) {
1806
0
  auto tablets = std::vector<RemoteTabletPtr>();
1807
0
  auto it = tables_.find(table->id());
1808
0
  if (PREDICT_FALSE(it == tables_.end())) {
1809
    // No cache available for this table.
1810
0
    return boost::none;
1811
0
  }
1812
1813
0
  for (const auto& tablet : it->second.all_tablets) {
1814
0
    if (tablet->stale()) {
1815
0
      return boost::none;
1816
0
    }
1817
0
    tablets.push_back(tablet);
1818
0
  }
1819
1820
0
  if (tablets.empty()) {
1821
0
    return boost::none;
1822
0
  }
1823
0
  return tablets;
1824
0
}
1825
1826
// We disable thread safety analysis in this function due to manual conditional locking.
1827
RemoteTabletPtr MetaCache::FastLookupTabletByKeyUnlocked(
1828
11.1M
    const TableId& table_id, const VersionedPartitionStartKey& partition_start) {
1829
  // Fast path: lookup in the cache.
1830
11.1M
  auto result = LookupTabletByKeyFastPathUnlocked(table_id, partition_start);
1831
11.1M
  if (result && result->HasLeader()) {
1832
18.4E
    VLOG_WITH_PREFIX(5) << "Fast lookup: found tablet " << result->tablet_id();
1833
11.0M
    return result;
1834
11.0M
  }
1835
1836
112k
  return nullptr;
1837
112k
}
1838
1839
template <class Mutex>
1840
99.4k
bool IsUniqueLock(const std::lock_guard<Mutex>*) {
1841
99.4k
  return true;
1842
99.4k
}
1843
1844
template <class Mutex>
1845
55.1k
bool IsUniqueLock(const SharedLock<Mutex>*) {
1846
55.1k
  return false;
1847
55.1k
}
1848
1849
// partition_group_start should be not nullptr and points to PartitionGroupStartKeyPtr that will be
1850
// initialized only if it is nullptr. This is an optimization to avoid recalculation of
1851
// partition_group_start in subsequent call of this function (see MetaCache::LookupTabletByKey).
1852
template <class Lock>
1853
bool MetaCache::DoLookupTabletByKey(
1854
    const std::shared_ptr<const YBTable>& table, const VersionedTablePartitionListPtr& partitions,
1855
    const PartitionKeyPtr& partition_start, CoarseTimePoint deadline,
1856
11.1M
    LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) {
1857
11.1M
  DCHECK_ONLY_NOTNULL(partition_group_start);
1858
11.1M
  RemoteTabletPtr tablet;
1859
11.1M
  auto scope_exit = ScopeExit([callback, &tablet] {
1860
11.1M
    if (tablet) {
1861
11.0M
      (*callback)(tablet);
1862
11.0M
    }
1863
11.1M
  });
_ZZN2yb6client8internal9MetaCache19DoLookupTabletByKeyINS_10SharedLockINSt3__118shared_timed_mutexEEEEEbRKNS5_10shared_ptrIKNS0_7YBTableEEERKNS8_IKNS0_27VersionedTablePartitionListEEERKNS8_IKNS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEEENS5_6chrono10time_pointINS_15CoarseMonoClockENST_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEEPNS5_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEPSQ_ENKUlvE_clEv
Line
Count
Source
1859
11.1M
  auto scope_exit = ScopeExit([callback, &tablet] {
1860
11.1M
    if (tablet) {
1861
11.0M
      (*callback)(tablet);
1862
11.0M
    }
1863
11.1M
  });
_ZZN2yb6client8internal9MetaCache19DoLookupTabletByKeyINSt3__110lock_guardINS4_18shared_timed_mutexEEEEEbRKNS4_10shared_ptrIKNS0_7YBTableEEERKNS8_IKNS0_27VersionedTablePartitionListEEERKNS8_IKNS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEEEENS4_6chrono10time_pointINS_15CoarseMonoClockENST_8durationIxNS4_5ratioILl1ELl1000000000EEEEEEEPNS4_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEPSQ_ENKUlvE_clEv
Line
Count
Source
1859
46.9k
  auto scope_exit = ScopeExit([callback, &tablet] {
1860
46.9k
    if (tablet) {
1861
5
      (*callback)(tablet);
1862
5
    }
1863
46.9k
  });
1864
11.1M
  int64_t request_no;
1865
11.1M
  {
1866
11.1M
    Lock lock(mutex_);
1867
11.1M
    tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version});
1868
11.1M
    if (tablet) {
1869
11.0M
      return true;
1870
11.0M
    }
1871
1872
102k
    auto table_it = tables_.find(table->id());
1873
102k
    TableData* table_data;
1874
102k
    if (table_it == tables_.end()) {
1875
51
      VLOG_WITH_PREFIX_AND_FUNC(4) << Format(
1876
51
          "missed table_id $0", table->id());
1877
88.5k
      if (!IsUniqueLock(&lock)) {
1878
44.0k
        return false;
1879
44.0k
      }
1880
44.4k
      table_it = InitTableDataUnlocked(table->id(), partitions);
1881
44.4k
    }
1882
58.7k
    table_data = &table_it->second;
1883
1884
58.7k
    if (table_data->partition_list->version != partitions->version ||
1885
74.6k
        (PREDICT_FALSE(RandomActWithProbability(
1886
74.6k
            FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) &&
1887
0
         table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE)) {
1888
0
      (*callback)(STATUS(
1889
0
          TryAgain,
1890
0
          Format(
1891
0
              "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, "
1892
0
              "refresh required",
1893
0
              table->ToString(), table_data->partition_list->version, partitions->version),
1894
0
          ClientError(ClientErrorCode::kTablePartitionListIsStale)));
1895
0
      return true;
1896
0
    }
1897
1898
72.0k
    if (!*partition_group_start) {
1899
72.0k
      *partition_group_start = client::FindPartitionStart(
1900
72.0k
          partitions, *partition_start, kPartitionGroupSize);
1901
72.0k
    }
1902
1903
58.7k
    auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group;
1904
58.7k
    LookupDataGroup* lookups_group;
1905
58.7k
    {
1906
58.7k
      auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start);
1907
58.7k
      if (lookups_group_it == tablet_lookups_by_group.end()) {
1908
49.5k
        if (!IsUniqueLock(&lock)) {
1909
2.64k
          return false;
1910
2.64k
        }
1911
46.9k
        lookups_group = &tablet_lookups_by_group[**partition_group_start];
1912
9.18k
      } else {
1913
9.18k
        lookups_group = &lookups_group_it->second;
1914
9.18k
      }
1915
58.7k
    }
1916
56.1k
    lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start));
1917
56.1k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
1918
56.1k
    int64_t expected = 0;
1919
56.1k
    if (!lookups_group->running_request_number.compare_exchange_strong(
1920
24.3k
            expected, request_no, std::memory_order_acq_rel)) {
1921
18.4E
      VLOG_WITH_PREFIX_AND_FUNC(5)
1922
18.4E
          << "Lookup is already running for table: " << table->ToString()
1923
18.4E
          << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1924
18.4E
          << ", partition_list_version: " << partitions->version
1925
18.4E
          << ", request_no: " << expected;
1926
24.3k
      return true;
1927
24.3k
    }
1928
31.8k
  }
1929
1930
31.8k
  auto rpc = std::make_shared<LookupByKeyRpc>(
1931
31.8k
      this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version},
1932
31.8k
      request_no, deadline);
1933
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4)
1934
18.4E
      << "Started lookup for table: " << table->ToString()
1935
18.4E
      << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1936
18.4E
      << ", rpc: " << AsString(rpc);
1937
31.8k
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1938
31.8k
  return true;
1939
31.8k
}
_ZN2yb6client8internal9MetaCache19DoLookupTabletByKeyINS_10SharedLockINSt3__118shared_timed_mutexEEEEEbRKNS5_10shared_ptrIKNS0_7YBTableEEERKNS8_IKNS0_27VersionedTablePartitionListEEERKNS8_IKNS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEEENS5_6chrono10time_pointINS_15CoarseMonoClockENST_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEEPNS5_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEPSQ_
Line
Count
Source
1856
11.0M
    LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) {
1857
11.0M
  DCHECK_ONLY_NOTNULL(partition_group_start);
1858
11.0M
  RemoteTabletPtr tablet;
1859
11.0M
  auto scope_exit = ScopeExit([callback, &tablet] {
1860
11.0M
    if (tablet) {
1861
11.0M
      (*callback)(tablet);
1862
11.0M
    }
1863
11.0M
  });
1864
11.0M
  int64_t request_no;
1865
11.0M
  {
1866
11.0M
    Lock lock(mutex_);
1867
11.0M
    tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version});
1868
11.0M
    if (tablet) {
1869
11.0M
      return true;
1870
11.0M
    }
1871
1872
55.9k
    auto table_it = tables_.find(table->id());
1873
55.9k
    TableData* table_data;
1874
55.9k
    if (table_it == tables_.end()) {
1875
51
      VLOG_WITH_PREFIX_AND_FUNC(4) << Format(
1876
51
          "missed table_id $0", table->id());
1877
44.2k
      if (!IsUniqueLock(&lock)) {
1878
44.0k
        return false;
1879
44.0k
      }
1880
168
      table_it = InitTableDataUnlocked(table->id(), partitions);
1881
168
    }
1882
11.8k
    table_data = &table_it->second;
1883
1884
11.8k
    if (table_data->partition_list->version != partitions->version ||
1885
27.7k
        (PREDICT_FALSE(RandomActWithProbability(
1886
27.7k
            FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) &&
1887
0
         table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE)) {
1888
0
      (*callback)(STATUS(
1889
0
          TryAgain,
1890
0
          Format(
1891
0
              "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, "
1892
0
              "refresh required",
1893
0
              table->ToString(), table_data->partition_list->version, partitions->version),
1894
0
          ClientError(ClientErrorCode::kTablePartitionListIsStale)));
1895
0
      return true;
1896
0
    }
1897
1898
27.6k
    if (!*partition_group_start) {
1899
27.6k
      *partition_group_start = client::FindPartitionStart(
1900
27.6k
          partitions, *partition_start, kPartitionGroupSize);
1901
27.6k
    }
1902
1903
11.8k
    auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group;
1904
11.8k
    LookupDataGroup* lookups_group;
1905
11.8k
    {
1906
11.8k
      auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start);
1907
11.8k
      if (lookups_group_it == tablet_lookups_by_group.end()) {
1908
2.64k
        if (!IsUniqueLock(&lock)) {
1909
2.64k
          return false;
1910
2.64k
        }
1911
0
        lookups_group = &tablet_lookups_by_group[**partition_group_start];
1912
9.21k
      } else {
1913
9.21k
        lookups_group = &lookups_group_it->second;
1914
9.21k
      }
1915
11.8k
    }
1916
9.21k
    lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start));
1917
9.21k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
1918
9.21k
    int64_t expected = 0;
1919
9.21k
    if (!lookups_group->running_request_number.compare_exchange_strong(
1920
24.2k
            expected, request_no, std::memory_order_acq_rel)) {
1921
18.4E
      VLOG_WITH_PREFIX_AND_FUNC(5)
1922
18.4E
          << "Lookup is already running for table: " << table->ToString()
1923
18.4E
          << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1924
18.4E
          << ", partition_list_version: " << partitions->version
1925
18.4E
          << ", request_no: " << expected;
1926
24.2k
      return true;
1927
24.2k
    }
1928
18.4E
  }
1929
1930
18.4E
  auto rpc = std::make_shared<LookupByKeyRpc>(
1931
18.4E
      this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version},
1932
18.4E
      request_no, deadline);
1933
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4)
1934
18.4E
      << "Started lookup for table: " << table->ToString()
1935
18.4E
      << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1936
18.4E
      << ", rpc: " << AsString(rpc);
1937
18.4E
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1938
18.4E
  return true;
1939
18.4E
}
_ZN2yb6client8internal9MetaCache19DoLookupTabletByKeyINSt3__110lock_guardINS4_18shared_timed_mutexEEEEEbRKNS4_10shared_ptrIKNS0_7YBTableEEERKNS8_IKNS0_27VersionedTablePartitionListEEERKNS8_IKNS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEEEENS4_6chrono10time_pointINS_15CoarseMonoClockENST_8durationIxNS4_5ratioILl1ELl1000000000EEEEEEEPNS4_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEPSQ_
Line
Count
Source
1856
46.9k
    LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) {
1857
46.9k
  DCHECK_ONLY_NOTNULL(partition_group_start);
1858
46.9k
  RemoteTabletPtr tablet;
1859
46.9k
  auto scope_exit = ScopeExit([callback, &tablet] {
1860
46.9k
    if (tablet) {
1861
46.9k
      (*callback)(tablet);
1862
46.9k
    }
1863
46.9k
  });
1864
46.9k
  int64_t request_no;
1865
46.9k
  {
1866
46.9k
    Lock lock(mutex_);
1867
46.9k
    tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version});
1868
46.9k
    if (tablet) {
1869
5
      return true;
1870
5
    }
1871
1872
46.9k
    auto table_it = tables_.find(table->id());
1873
46.9k
    TableData* table_data;
1874
46.9k
    if (table_it == tables_.end()) {
1875
0
      VLOG_WITH_PREFIX_AND_FUNC(4) << Format(
1876
0
          "missed table_id $0", table->id());
1877
44.2k
      if (!IsUniqueLock(&lock)) {
1878
0
        return false;
1879
0
      }
1880
44.2k
      table_it = InitTableDataUnlocked(table->id(), partitions);
1881
44.2k
    }
1882
46.9k
    table_data = &table_it->second;
1883
1884
46.9k
    if (table_data->partition_list->version != partitions->version ||
1885
46.9k
        (PREDICT_FALSE(RandomActWithProbability(
1886
46.9k
            FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) &&
1887
0
         table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE)) {
1888
0
      (*callback)(STATUS(
1889
0
          TryAgain,
1890
0
          Format(
1891
0
              "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, "
1892
0
              "refresh required",
1893
0
              table->ToString(), table_data->partition_list->version, partitions->version),
1894
0
          ClientError(ClientErrorCode::kTablePartitionListIsStale)));
1895
0
      return true;
1896
0
    }
1897
1898
46.9k
    if (!*partition_group_start) {
1899
44.3k
      *partition_group_start = client::FindPartitionStart(
1900
44.3k
          partitions, *partition_start, kPartitionGroupSize);
1901
44.3k
    }
1902
1903
46.9k
    auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group;
1904
46.9k
    LookupDataGroup* lookups_group;
1905
46.9k
    {
1906
46.9k
      auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start);
1907
46.9k
      if (lookups_group_it == tablet_lookups_by_group.end()) {
1908
46.9k
        if (!IsUniqueLock(&lock)) {
1909
0
          return false;
1910
0
        }
1911
46.9k
        lookups_group = &tablet_lookups_by_group[**partition_group_start];
1912
18.4E
      } else {
1913
18.4E
        lookups_group = &lookups_group_it->second;
1914
18.4E
      }
1915
46.9k
    }
1916
46.9k
    lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start));
1917
46.9k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
1918
46.9k
    int64_t expected = 0;
1919
46.9k
    if (!lookups_group->running_request_number.compare_exchange_strong(
1920
52
            expected, request_no, std::memory_order_acq_rel)) {
1921
0
      VLOG_WITH_PREFIX_AND_FUNC(5)
1922
0
          << "Lookup is already running for table: " << table->ToString()
1923
0
          << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1924
0
          << ", partition_list_version: " << partitions->version
1925
0
          << ", request_no: " << expected;
1926
52
      return true;
1927
52
    }
1928
46.8k
  }
1929
1930
46.8k
  auto rpc = std::make_shared<LookupByKeyRpc>(
1931
46.8k
      this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version},
1932
46.8k
      request_no, deadline);
1933
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4)
1934
18.4E
      << "Started lookup for table: " << table->ToString()
1935
18.4E
      << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString()
1936
18.4E
      << ", rpc: " << AsString(rpc);
1937
46.8k
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1938
46.8k
  return true;
1939
46.8k
}
1940
1941
template <class Lock>
1942
bool MetaCache::DoLookupAllTablets(const std::shared_ptr<const YBTable>& table,
1943
                                   CoarseTimePoint deadline,
1944
0
                                   LookupTabletRangeCallback* callback) {
1945
0
  VLOG_WITH_PREFIX(3) << "DoLookupAllTablets() for table: " << table->ToString();
1946
0
  int64_t request_no;
1947
0
  {
1948
0
    Lock lock(mutex_);
1949
0
    if (PREDICT_TRUE(!FLAGS_TEST_force_master_lookup_all_tablets)) {
1950
0
      auto tablets = FastLookupAllTabletsUnlocked(table);
1951
0
      if (tablets.has_value()) {
1952
0
        VLOG_WITH_PREFIX(4) << "tablets has value";
1953
0
        (*callback)(*tablets);
1954
0
        return true;
1955
0
      }
1956
0
    }
1957
1958
0
    if (!IsUniqueLock(&lock)) {
1959
0
      return false;
1960
0
    }
1961
0
    auto table_it = tables_.find(table->id());
1962
0
    if (table_it == tables_.end()) {
1963
0
      table_it = InitTableDataUnlocked(table->id(), table->GetVersionedPartitions());
1964
0
    }
1965
0
    auto& table_data = table_it->second;
1966
1967
0
    auto& full_table_lookups = table_data.full_table_lookups;
1968
0
    full_table_lookups.lookups.Push(new LookupData(*callback, deadline, nullptr));
1969
0
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
1970
0
    int64_t expected = 0;
1971
0
    if (!full_table_lookups.running_request_number.compare_exchange_strong(
1972
0
        expected, request_no, std::memory_order_acq_rel)) {
1973
0
      VLOG_WITH_PREFIX_AND_FUNC(5)
1974
0
          << "Lookup is already running for table: " << table->ToString();
1975
0
      return true;
1976
0
    }
1977
0
  }
1978
1979
0
  VLOG_WITH_PREFIX_AND_FUNC(4)
1980
0
      << "Start lookup for table: " << table->ToString();
1981
1982
0
  auto rpc = std::make_shared<LookupFullTableRpc>(this, table, request_no, deadline);
1983
0
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
1984
0
  return true;
1985
0
}
Unexecuted instantiation: _ZN2yb6client8internal9MetaCache18DoLookupAllTabletsINS_10SharedLockINSt3__118shared_timed_mutexEEEEEbRKNS5_10shared_ptrIKNS0_7YBTableEEENS5_6chrono10time_pointINS_15CoarseMonoClockENSE_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEEPNS5_8functionIFvRKNS_6ResultINS5_6vectorI13scoped_refptrINS1_12RemoteTabletEENS5_9allocatorISR_EEEEEEEEE
Unexecuted instantiation: _ZN2yb6client8internal9MetaCache18DoLookupAllTabletsINSt3__110lock_guardINS4_18shared_timed_mutexEEEEEbRKNS4_10shared_ptrIKNS0_7YBTableEEENS4_6chrono10time_pointINS_15CoarseMonoClockENSE_8durationIxNS4_5ratioILl1ELl1000000000EEEEEEEPNS4_8functionIFvRKNS_6ResultINS4_6vectorI13scoped_refptrINS1_12RemoteTabletEENS4_9allocatorISR_EEEEEEEEE
1986
1987
// We disable thread safety analysis in this function due to manual conditional locking.
1988
void MetaCache::LookupTabletByKey(const std::shared_ptr<YBTable>& table,
1989
                                  const PartitionKey& partition_key,
1990
                                  CoarseTimePoint deadline,
1991
11.0M
                                  LookupTabletCallback callback) {
1992
11.0M
  if (table->ArePartitionsStale()) {
1993
16.0k
    table->RefreshPartitions(client_, [this, table, partition_key, deadline,
1994
16.0k
                              callback = std::move(callback)](const Status& status) {
1995
16.0k
      if (!status.ok()) {
1996
0
        callback(status);
1997
0
        return;
1998
0
      }
1999
16.0k
      InvalidateTableCache(*table);
2000
16.0k
      LookupTabletByKey(table, partition_key, deadline, std::move(callback));
2001
16.0k
    });
2002
16.0k
    return;
2003
16.0k
  }
2004
2005
11.0M
  const auto table_partition_list = table->GetVersionedPartitions();
2006
11.0M
  const auto partition_start = client::FindPartitionStart(table_partition_list, partition_key);
2007
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(5) << "Table: " << table->ToString()
2008
18.4E
                    << ", table_partition_list: " << table_partition_list->ToString()
2009
18.4E
                    << ", partition_key: " << Slice(partition_key).ToDebugHexString()
2010
18.4E
                    << ", partition_start: " << Slice(*partition_start).ToDebugHexString();
2011
2012
11.0M
  PartitionGroupStartKeyPtr partition_group_start;
2013
11.0M
  if (DoLookupTabletByKey<SharedLock<std::shared_timed_mutex>>(
2014
11.0M
          table, table_partition_list, partition_start, deadline, &callback,
2015
11.0M
          &partition_group_start)) {
2016
11.0M
    return;
2017
11.0M
  }
2018
2019
13.7k
  bool result = DoLookupTabletByKey<std::lock_guard<std::shared_timed_mutex>>(
2020
13.7k
      table, table_partition_list, partition_start, deadline, &callback, &partition_group_start);
2021
18.4E
  LOG_IF(DFATAL, !result)
2022
18.4E
      << "Lookup was not started for table " << table->ToString()
2023
18.4E
      << ", partition_key: " << Slice(partition_key).ToDebugHexString();
2024
13.7k
}
2025
2026
void MetaCache::LookupAllTablets(const std::shared_ptr<const YBTable>& table,
2027
                                 CoarseTimePoint deadline,
2028
0
                                 LookupTabletRangeCallback callback) {
2029
  // We first want to check the cache in read-only mode, and only if we can't find anything
2030
  // do a lookup in write mode.
2031
0
  if (DoLookupAllTablets<SharedLock<std::shared_timed_mutex>>(table, deadline, &callback)) {
2032
0
    return;
2033
0
  }
2034
2035
0
  bool result = DoLookupAllTablets<std::lock_guard<std::shared_timed_mutex>>(
2036
0
      table, deadline, &callback);
2037
0
  LOG_IF(DFATAL, !result)
2038
0
      << "Full table lookup was not started for table " << table->ToString();
2039
0
}
2040
2041
1.18M
RemoteTabletPtr MetaCache::LookupTabletByIdFastPathUnlocked(const TabletId& tablet_id) {
2042
1.18M
  auto it = tablets_by_id_.find(tablet_id);
2043
1.18M
  if (it != tablets_by_id_.end()) {
2044
1.16M
    return it->second;
2045
1.16M
  }
2046
18.0k
  return nullptr;
2047
18.0k
}
2048
2049
template <class Lock>
2050
bool MetaCache::DoLookupTabletById(
2051
    const TabletId& tablet_id,
2052
    const std::shared_ptr<const YBTable>& table,
2053
    master::IncludeInactive include_inactive,
2054
    CoarseTimePoint deadline,
2055
    UseCache use_cache,
2056
1.17M
    LookupTabletCallback* callback) {
2057
1.17M
  RemoteTabletPtr tablet;
2058
1.18M
  auto scope_exit = ScopeExit([callback, &tablet] {
2059
1.18M
    if (tablet) {
2060
1.14M
      (*callback)(tablet);
2061
1.14M
    }
2062
1.18M
  });
_ZZN2yb6client8internal9MetaCache18DoLookupTabletByIdINS_10SharedLockINSt3__118shared_timed_mutexEEEEEbRKNS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEERKNS5_10shared_ptrIKNS0_7YBTableEEENS_17StronglyTypedBoolINS_6master19IncludeInactive_TagEEENS5_6chrono10time_pointINS_15CoarseMonoClockENSQ_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEENSM_INS0_12UseCache_TagEEEPNS5_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEENKUlvE_clEv
Line
Count
Source
2058
1.17M
  auto scope_exit = ScopeExit([callback, &tablet] {
2059
1.17M
    if (tablet) {
2060
1.14M
      (*callback)(tablet);
2061
1.14M
    }
2062
1.17M
  });
_ZZN2yb6client8internal9MetaCache18DoLookupTabletByIdINSt3__110lock_guardINS4_18shared_timed_mutexEEEEEbRKNS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEERKNS4_10shared_ptrIKNS0_7YBTableEEENS_17StronglyTypedBoolINS_6master19IncludeInactive_TagEEENS4_6chrono10time_pointINS_15CoarseMonoClockENSQ_8durationIxNS4_5ratioILl1ELl1000000000EEEEEEENSM_INS0_12UseCache_TagEEEPNS4_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEENKUlvE_clEv
Line
Count
Source
2058
8.31k
  auto scope_exit = ScopeExit([callback, &tablet] {
2059
8.31k
    if (tablet) {
2060
0
      (*callback)(tablet);
2061
0
    }
2062
8.31k
  });
2063
1.17M
  int64_t request_no;
2064
1.17M
  int64_t lookups_without_new_replicas = 0;
2065
1.17M
  {
2066
1.17M
    Lock lock(mutex_);
2067
2068
    // Fast path: lookup in the cache.
2069
1.17M
    tablet = LookupTabletByIdFastPathUnlocked(tablet_id);
2070
1.17M
    if (tablet) {
2071
126
      VLOG_WITH_PREFIX(5) << "Fast lookup: candidate tablet " << AsString(tablet);
2072
1.16M
      if (use_cache && tablet->HasLeader()) {
2073
        // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with
2074
        // tablet_id is found on all replicas.
2075
74
        VLOG_WITH_PREFIX(5) << "Fast lookup: found tablet " << tablet->tablet_id();
2076
1.14M
        return true;
2077
1.14M
      }
2078
15.8k
      lookups_without_new_replicas = tablet->lookups_without_new_replicas();
2079
15.8k
      tablet = nullptr;
2080
15.8k
    }
2081
2082
30.4k
    LookupDataGroup* lookup;
2083
30.4k
    {
2084
30.4k
      auto lookup_it = tablet_lookups_by_id_.find(tablet_id);
2085
30.4k
      if (lookup_it == tablet_lookups_by_id_.end()) {
2086
16.3k
        if (!IsUniqueLock(&lock)) {
2087
8.22k
          return false;
2088
8.22k
        }
2089
8.14k
        lookup = &tablet_lookups_by_id_[tablet_id];
2090
14.1k
      } else {
2091
14.1k
        lookup = &lookup_it->second;
2092
14.1k
      }
2093
30.4k
    }
2094
22.2k
    lookup->lookups.Push(new LookupData(*callback, deadline, nullptr));
2095
22.2k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
2096
22.2k
    int64_t expected = 0;
2097
22.2k
    if (!lookup->running_request_number.compare_exchange_strong(
2098
15.1k
            expected, request_no, std::memory_order_acq_rel)) {
2099
18.4E
      VLOG_WITH_PREFIX_AND_FUNC(5) << "Lookup already running for tablet: " << tablet_id;
2100
15.1k
      return true;
2101
15.1k
    }
2102
7.09k
  }
2103
2104
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4) << "Start lookup for tablet " << tablet_id << ": " << request_no;
2105
2106
7.09k
  auto rpc = std::make_shared<LookupByIdRpc>(
2107
7.09k
      this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas);
2108
7.09k
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
2109
7.09k
  return true;
2110
7.09k
}
_ZN2yb6client8internal9MetaCache18DoLookupTabletByIdINS_10SharedLockINSt3__118shared_timed_mutexEEEEEbRKNS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEERKNS5_10shared_ptrIKNS0_7YBTableEEENS_17StronglyTypedBoolINS_6master19IncludeInactive_TagEEENS5_6chrono10time_pointINS_15CoarseMonoClockENSQ_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEENSM_INS0_12UseCache_TagEEEPNS5_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEE
Line
Count
Source
2056
1.17M
    LookupTabletCallback* callback) {
2057
1.17M
  RemoteTabletPtr tablet;
2058
1.17M
  auto scope_exit = ScopeExit([callback, &tablet] {
2059
1.17M
    if (tablet) {
2060
1.17M
      (*callback)(tablet);
2061
1.17M
    }
2062
1.17M
  });
2063
1.17M
  int64_t request_no;
2064
1.17M
  int64_t lookups_without_new_replicas = 0;
2065
1.17M
  {
2066
1.17M
    Lock lock(mutex_);
2067
2068
    // Fast path: lookup in the cache.
2069
1.17M
    tablet = LookupTabletByIdFastPathUnlocked(tablet_id);
2070
1.17M
    if (tablet) {
2071
126
      VLOG_WITH_PREFIX(5) << "Fast lookup: candidate tablet " << AsString(tablet);
2072
1.16M
      if (use_cache && tablet->HasLeader()) {
2073
        // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with
2074
        // tablet_id is found on all replicas.
2075
74
        VLOG_WITH_PREFIX(5) << "Fast lookup: found tablet " << tablet->tablet_id();
2076
1.14M
        return true;
2077
1.14M
      }
2078
15.2k
      lookups_without_new_replicas = tablet->lookups_without_new_replicas();
2079
15.2k
      tablet = nullptr;
2080
15.2k
    }
2081
2082
22.1k
    LookupDataGroup* lookup;
2083
22.1k
    {
2084
22.1k
      auto lookup_it = tablet_lookups_by_id_.find(tablet_id);
2085
22.1k
      if (lookup_it == tablet_lookups_by_id_.end()) {
2086
8.22k
        if (!IsUniqueLock(&lock)) {
2087
8.22k
          return false;
2088
8.22k
        }
2089
18.4E
        lookup = &tablet_lookups_by_id_[tablet_id];
2090
14.0k
      } else {
2091
14.0k
        lookup = &lookup_it->second;
2092
14.0k
      }
2093
22.1k
    }
2094
13.9k
    lookup->lookups.Push(new LookupData(*callback, deadline, nullptr));
2095
13.9k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
2096
13.9k
    int64_t expected = 0;
2097
13.9k
    if (!lookup->running_request_number.compare_exchange_strong(
2098
15.0k
            expected, request_no, std::memory_order_acq_rel)) {
2099
18.4E
      VLOG_WITH_PREFIX_AND_FUNC(5) << "Lookup already running for tablet: " << tablet_id;
2100
15.0k
      return true;
2101
15.0k
    }
2102
18.4E
  }
2103
2104
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4) << "Start lookup for tablet " << tablet_id << ": " << request_no;
2105
2106
18.4E
  auto rpc = std::make_shared<LookupByIdRpc>(
2107
18.4E
      this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas);
2108
18.4E
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
2109
18.4E
  return true;
2110
18.4E
}
_ZN2yb6client8internal9MetaCache18DoLookupTabletByIdINSt3__110lock_guardINS4_18shared_timed_mutexEEEEEbRKNS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEERKNS4_10shared_ptrIKNS0_7YBTableEEENS_17StronglyTypedBoolINS_6master19IncludeInactive_TagEEENS4_6chrono10time_pointINS_15CoarseMonoClockENSQ_8durationIxNS4_5ratioILl1ELl1000000000EEEEEEENSM_INS0_12UseCache_TagEEEPNS4_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEE
Line
Count
Source
2056
8.28k
    LookupTabletCallback* callback) {
2057
8.28k
  RemoteTabletPtr tablet;
2058
8.28k
  auto scope_exit = ScopeExit([callback, &tablet] {
2059
8.28k
    if (tablet) {
2060
8.28k
      (*callback)(tablet);
2061
8.28k
    }
2062
8.28k
  });
2063
8.28k
  int64_t request_no;
2064
8.28k
  int64_t lookups_without_new_replicas = 0;
2065
8.28k
  {
2066
8.28k
    Lock lock(mutex_);
2067
2068
    // Fast path: lookup in the cache.
2069
8.28k
    tablet = LookupTabletByIdFastPathUnlocked(tablet_id);
2070
8.28k
    if (tablet) {
2071
0
      VLOG_WITH_PREFIX(5) << "Fast lookup: candidate tablet " << AsString(tablet);
2072
615
      if (use_cache && tablet->HasLeader()) {
2073
        // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with
2074
        // tablet_id is found on all replicas.
2075
0
        VLOG_WITH_PREFIX(5) << "Fast lookup: found tablet " << tablet->tablet_id();
2076
0
        return true;
2077
0
      }
2078
615
      lookups_without_new_replicas = tablet->lookups_without_new_replicas();
2079
615
      tablet = nullptr;
2080
615
    }
2081
2082
8.28k
    LookupDataGroup* lookup;
2083
8.28k
    {
2084
8.28k
      auto lookup_it = tablet_lookups_by_id_.find(tablet_id);
2085
8.28k
      if (lookup_it == tablet_lookups_by_id_.end()) {
2086
8.22k
        if (!IsUniqueLock(&lock)) {
2087
0
          return false;
2088
0
        }
2089
8.22k
        lookup = &tablet_lookups_by_id_[tablet_id];
2090
59
      } else {
2091
59
        lookup = &lookup_it->second;
2092
59
      }
2093
8.28k
    }
2094
8.28k
    lookup->lookups.Push(new LookupData(*callback, deadline, nullptr));
2095
8.28k
    request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel);
2096
8.28k
    int64_t expected = 0;
2097
8.28k
    if (!lookup->running_request_number.compare_exchange_strong(
2098
89
            expected, request_no, std::memory_order_acq_rel)) {
2099
0
      VLOG_WITH_PREFIX_AND_FUNC(5) << "Lookup already running for tablet: " << tablet_id;
2100
89
      return true;
2101
89
    }
2102
8.19k
  }
2103
2104
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4) << "Start lookup for tablet " << tablet_id << ": " << request_no;
2105
2106
8.19k
  auto rpc = std::make_shared<LookupByIdRpc>(
2107
8.19k
      this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas);
2108
8.19k
  client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle());
2109
8.19k
  return true;
2110
8.19k
}
2111
2112
void MetaCache::LookupTabletById(const TabletId& tablet_id,
2113
                                 const std::shared_ptr<const YBTable>& table,
2114
                                 master::IncludeInactive include_inactive,
2115
                                 CoarseTimePoint deadline,
2116
                                 LookupTabletCallback callback,
2117
1.17M
                                 UseCache use_cache) {
2118
1.02k
  VLOG_WITH_PREFIX_AND_FUNC(5) << "(" << tablet_id << ", " << use_cache << ")";
2119
2120
1.17M
  if (DoLookupTabletById<SharedLock<decltype(mutex_)>>(
2121
1.16M
          tablet_id, table, include_inactive, deadline, use_cache, &callback)) {
2122
1.16M
    return;
2123
1.16M
  }
2124
2125
7.73k
  auto result = DoLookupTabletById<std::lock_guard<decltype(mutex_)>>(
2126
7.73k
      tablet_id, table, include_inactive, deadline, use_cache, &callback);
2127
18.4E
  LOG_IF(DFATAL, !result) << "Lookup was not started for tablet " << tablet_id;
2128
7.73k
}
2129
2130
void MetaCache::MarkTSFailed(RemoteTabletServer* ts,
2131
0
                             const Status& status) {
2132
0
  LOG_WITH_PREFIX(INFO) << "Marking tablet server " << ts->ToString() << " as failed.";
2133
0
  SharedLock<decltype(mutex_)> lock(mutex_);
2134
2135
0
  Status ts_status = status.CloneAndPrepend("TS failed");
2136
2137
  // TODO: replace with a ts->tablet multimap for faster lookup?
2138
0
  for (const auto& tablet : tablets_by_id_) {
2139
    // We just loop on all tablets; if a tablet does not have a replica on this
2140
    // TS, MarkReplicaFailed() returns false and we ignore the return value.
2141
0
    tablet.second->MarkReplicaFailed(ts, ts_status);
2142
0
  }
2143
0
}
2144
2145
56.6k
bool MetaCache::AcquireMasterLookupPermit() {
2146
56.6k
  return master_lookup_sem_.TryAcquire();
2147
56.6k
}
2148
2149
56.5k
void MetaCache::ReleaseMasterLookupPermit() {
2150
56.5k
  master_lookup_sem_.Release();
2151
56.5k
}
2152
2153
std::future<Result<internal::RemoteTabletPtr>> MetaCache::LookupTabletByKeyFuture(
2154
    const std::shared_ptr<YBTable>& table,
2155
    const PartitionKey& partition_key,
2156
0
    CoarseTimePoint deadline) {
2157
0
  return MakeFuture<Result<internal::RemoteTabletPtr>>([&](auto callback) {
2158
0
    this->LookupTabletByKey(table, partition_key, deadline, std::move(callback));
2159
0
  });
2160
0
}
2161
2162
153
LookupDataGroup::~LookupDataGroup() {
2163
153
  std::vector<LookupData*> leftovers;
2164
153
  while (auto* d = lookups.Pop()) {
2165
0
    leftovers.push_back(d);
2166
0
  }
2167
153
  if (!leftovers.empty()) {
2168
0
    LOG(DFATAL) << Format(
2169
0
        "Destructing LookupDataGroup($0), running_request_number: $1 with non empty lookups: $2",
2170
0
        static_cast<void*>(this), running_request_number, leftovers);
2171
0
  }
2172
153
}
2173
2174
void LookupDataGroup::Finished(
2175
56.6k
    int64_t request_no, const ToStringable& id, bool allow_absence) {
2176
56.6k
  int64_t expected = request_no;
2177
56.6k
  if (running_request_number.compare_exchange_strong(expected, 0, std::memory_order_acq_rel)) {
2178
56.6k
    max_completed_request_number = std::max(max_completed_request_number, request_no);
2179
0
    VLOG_WITH_FUNC(2) << "Finished lookup for " << id.ToString() << ", no: " << request_no;
2180
56.6k
    return;
2181
56.6k
  }
2182
2183
0
  if ((expected == 0 && max_completed_request_number <= request_no) && !allow_absence) {
2184
0
    LOG(DFATAL) << "Lookup was not running for " << id.ToString() << ", expected: " << request_no;
2185
0
    return;
2186
0
  }
2187
2188
0
  LOG(INFO)
2189
0
      << "Finished lookup for " << id.ToString() << ": " << request_no << ", while "
2190
0
      << expected << " was running, could happen during tablet split";
2191
0
}
2192
2193
TableData::TableData(const VersionedTablePartitionListPtr& partition_list_)
2194
309k
    : partition_list(partition_list_) {
2195
309k
  DCHECK_ONLY_NOTNULL(partition_list);
2196
309k
}
2197
2198
0
std::string VersionedPartitionStartKey::ToString() const {
2199
0
  return YB_STRUCT_TO_STRING(key, partition_list_version);
2200
0
}
2201
2202
13
std::string RemoteReplica::ToString() const {
2203
13
  return Format("$0 ($1, $2)",
2204
13
                ts->permanent_uuid(),
2205
13
                PeerRole_Name(role),
2206
13
                Failed() ? "FAILED" : "OK");
2207
13
}
2208
2209
} // namespace internal
2210
} // namespace client
2211
} // namespace yb