/Users/deen/code/yugabyte-db/src/yb/client/meta_cache.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/client/meta_cache.h" |
34 | | |
35 | | #include <stdint.h> |
36 | | |
37 | | #include <atomic> |
38 | | #include <list> |
39 | | #include <memory> |
40 | | #include <shared_mutex> |
41 | | #include <string> |
42 | | #include <unordered_map> |
43 | | #include <unordered_set> |
44 | | #include <vector> |
45 | | |
46 | | #include <boost/optional/optional_io.hpp> |
47 | | #include <glog/logging.h> |
48 | | |
49 | | #include "yb/client/client.h" |
50 | | #include "yb/client/client_error.h" |
51 | | #include "yb/client/client_master_rpc.h" |
52 | | #include "yb/client/client-internal.h" |
53 | | #include "yb/client/schema.h" |
54 | | #include "yb/client/table.h" |
55 | | #include "yb/client/yb_table_name.h" |
56 | | |
57 | | #include "yb/common/wire_protocol.h" |
58 | | |
59 | | #include "yb/gutil/map-util.h" |
60 | | #include "yb/gutil/ref_counted.h" |
61 | | #include "yb/gutil/strings/substitute.h" |
62 | | |
63 | | #include "yb/master/master_client.proxy.h" |
64 | | |
65 | | #include "yb/rpc/rpc_fwd.h" |
66 | | |
67 | | #include "yb/tserver/local_tablet_server.h" |
68 | | #include "yb/tserver/tserver_service.proxy.h" |
69 | | |
70 | | #include "yb/util/async_util.h" |
71 | | #include "yb/util/atomic.h" |
72 | | #include "yb/util/flag_tags.h" |
73 | | #include "yb/util/locks.h" |
74 | | #include "yb/util/logging.h" |
75 | | #include "yb/util/metrics.h" |
76 | | #include "yb/util/monotime.h" |
77 | | #include "yb/util/net/dns_resolver.h" |
78 | | #include "yb/util/net/net_util.h" |
79 | | #include "yb/util/net/sockaddr.h" |
80 | | #include "yb/util/random_util.h" |
81 | | #include "yb/util/result.h" |
82 | | #include "yb/util/scope_exit.h" |
83 | | #include "yb/util/shared_lock.h" |
84 | | #include "yb/util/status_format.h" |
85 | | #include "yb/util/unique_lock.h" |
86 | | |
87 | | using std::map; |
88 | | using std::shared_ptr; |
89 | | using strings::Substitute; |
90 | | using namespace std::literals; |
91 | | using namespace std::placeholders; |
92 | | |
93 | | DEFINE_int32(max_concurrent_master_lookups, 500, |
94 | | "Maximum number of concurrent tablet location lookups from YB client to master"); |
95 | | |
96 | | DEFINE_test_flag(bool, verify_all_replicas_alive, false, |
97 | | "If set, when a RemoteTablet object is destroyed, we will verify that all its " |
98 | | "replicas are not marked as failed"); |
99 | | |
100 | | DEFINE_int32(retry_failed_replica_ms, 60 * 1000, |
101 | | "Time in milliseconds to wait for before retrying a failed replica"); |
102 | | |
103 | | DEFINE_int64(meta_cache_lookup_throttling_step_ms, 5, |
104 | | "Step to increment delay between calls during lookup throttling."); |
105 | | |
106 | | DEFINE_int64(meta_cache_lookup_throttling_max_delay_ms, 1000, |
107 | | "Max delay between calls during lookup throttling."); |
108 | | |
109 | | DEFINE_test_flag(bool, force_master_lookup_all_tablets, false, |
110 | | "If set, force the client to go to the master for all tablet lookup " |
111 | | "instead of reading from cache."); |
112 | | |
113 | | DEFINE_test_flag(double, simulate_lookup_timeout_probability, 0, |
114 | | "If set, mark an RPC as failed and force retry on the first attempt."); |
115 | | DEFINE_test_flag(double, simulate_lookup_partition_list_mismatch_probability, 0, |
116 | | "Probability for simulating the partition list mismatch error on tablet lookup."); |
117 | | |
118 | | METRIC_DEFINE_coarse_histogram( |
119 | | server, dns_resolve_latency_during_init_proxy, |
120 | | "yb.client.MetaCache.InitProxy DNS Resolve", |
121 | | yb::MetricUnit::kMicroseconds, |
122 | | "Microseconds spent resolving DNS requests during MetaCache::InitProxy"); |
123 | | |
124 | | namespace yb { |
125 | | |
126 | | using consensus::RaftPeerPB; |
127 | | using master::GetTableLocationsRequestPB; |
128 | | using master::GetTableLocationsResponsePB; |
129 | | using master::TabletLocationsPB; |
130 | | using master::TabletLocationsPB_ReplicaPB; |
131 | | using master::TSInfoPB; |
132 | | using rpc::Messenger; |
133 | | using rpc::Rpc; |
134 | | using tablet::RaftGroupStatePB; |
135 | | using tserver::LocalTabletServer; |
136 | | using tserver::TabletServerServiceProxy; |
137 | | using tserver::TabletServerForwardServiceProxy; |
138 | | |
139 | | namespace client { |
140 | | |
141 | | namespace internal { |
142 | | |
143 | | using ProcessedTablesMap = |
144 | | std::unordered_map<TableId, std::unordered_map<PartitionKey, RemoteTabletPtr>>; |
145 | | |
146 | | namespace { |
147 | | |
148 | | // We join tablet partitions to groups, so tablet state info in one group requested with single |
149 | | // RPC call to master. |
150 | | // kPartitionGroupSize defines size of this group. |
151 | | #ifdef NDEBUG |
152 | | const size_t kPartitionGroupSize = 64; |
153 | | #else |
154 | | const size_t kPartitionGroupSize = 4; |
155 | | #endif |
156 | | |
157 | | std::atomic<int64_t> lookup_serial_{1}; |
158 | | |
159 | | } // namespace |
160 | | |
161 | 0 | int64_t TEST_GetLookupSerial() { |
162 | 0 | return lookup_serial_.load(std::memory_order_acquire); |
163 | 0 | } |
164 | | |
165 | | //////////////////////////////////////////////////////////// |
166 | | |
167 | | RemoteTabletServer::RemoteTabletServer(const master::TSInfoPB& pb) |
168 | 16.9k | : uuid_(pb.permanent_uuid()) { |
169 | 16.9k | Update(pb); |
170 | 16.9k | } |
171 | | |
172 | | RemoteTabletServer::RemoteTabletServer(const string& uuid, |
173 | | const shared_ptr<TabletServerServiceProxy>& proxy, |
174 | | const LocalTabletServer* local_tserver) |
175 | | : uuid_(uuid), |
176 | | proxy_(proxy), |
177 | 5.48k | local_tserver_(local_tserver) { |
178 | 0 | LOG_IF(DFATAL, proxy && !IsLocal()) << "Local tserver has non-local proxy"; |
179 | 5.48k | } |
180 | | |
181 | 235 | RemoteTabletServer::~RemoteTabletServer() = default; |
182 | | |
183 | 7.84M | Status RemoteTabletServer::InitProxy(YBClient* client) { |
184 | 7.84M | { |
185 | 7.84M | SharedLock<rw_spinlock> lock(mutex_); |
186 | | |
187 | 7.84M | if (proxy_) { |
188 | | // Already have a proxy created. |
189 | 7.84M | return Status::OK(); |
190 | 7.84M | } |
191 | 596 | } |
192 | | |
193 | 596 | std::lock_guard<rw_spinlock> lock(mutex_); |
194 | | |
195 | 596 | if (proxy_) { |
196 | | // Already have a proxy created. |
197 | 5 | return Status::OK(); |
198 | 5 | } |
199 | | |
200 | 7.76k | if (!dns_resolve_histogram_) { |
201 | 7.76k | auto metric_entity = client->metric_entity(); |
202 | 7.76k | if (metric_entity) { |
203 | 7.69k | dns_resolve_histogram_ = METRIC_dns_resolve_latency_during_init_proxy.Instantiate( |
204 | 7.69k | metric_entity); |
205 | 7.69k | } |
206 | 7.76k | } |
207 | | |
208 | | // TODO: if the TS advertises multiple host/ports, pick the right one |
209 | | // based on some kind of policy. For now just use the first always. |
210 | 591 | auto hostport = HostPortFromPB(DesiredHostPort( |
211 | 591 | public_rpc_hostports_, private_rpc_hostports_, cloud_info_pb_, |
212 | 591 | client->data_->cloud_info_pb_)); |
213 | 591 | CHECK(!hostport.host().empty()); |
214 | 591 | ScopedDnsTracker dns_tracker(dns_resolve_histogram_.get()); |
215 | 591 | proxy_.reset(new TabletServerServiceProxy(client->data_->proxy_cache_.get(), hostport)); |
216 | 591 | proxy_endpoint_ = hostport; |
217 | | |
218 | 591 | return Status::OK(); |
219 | 591 | } |
220 | | |
221 | 219k | void RemoteTabletServer::Update(const master::TSInfoPB& pb) { |
222 | 219k | CHECK_EQ(pb.permanent_uuid(), uuid_); |
223 | | |
224 | 219k | std::lock_guard<rw_spinlock> lock(mutex_); |
225 | 219k | private_rpc_hostports_ = pb.private_rpc_addresses(); |
226 | 219k | public_rpc_hostports_ = pb.broadcast_addresses(); |
227 | 219k | cloud_info_pb_ = pb.cloud_info(); |
228 | 219k | capabilities_.assign(pb.capabilities().begin(), pb.capabilities().end()); |
229 | 219k | std::sort(capabilities_.begin(), capabilities_.end()); |
230 | 219k | } |
231 | | |
232 | 5.72M | bool RemoteTabletServer::IsLocal() const { |
233 | 5.72M | return local_tserver_ != nullptr; |
234 | 5.72M | } |
235 | | |
236 | 804k | const std::string& RemoteTabletServer::permanent_uuid() const { |
237 | 804k | return uuid_; |
238 | 804k | } |
239 | | |
240 | 86.4k | const CloudInfoPB& RemoteTabletServer::cloud_info() const { |
241 | 86.4k | return cloud_info_pb_; |
242 | 86.4k | } |
243 | | |
244 | | const google::protobuf::RepeatedPtrField<HostPortPB>& |
245 | 159 | RemoteTabletServer::public_rpc_hostports() const { |
246 | 159 | return public_rpc_hostports_; |
247 | 159 | } |
248 | | |
249 | | const google::protobuf::RepeatedPtrField<HostPortPB>& |
250 | 159 | RemoteTabletServer::private_rpc_hostports() const { |
251 | 159 | return private_rpc_hostports_; |
252 | 159 | } |
253 | | |
254 | 7.84M | shared_ptr<TabletServerServiceProxy> RemoteTabletServer::proxy() const { |
255 | 7.84M | SharedLock<rw_spinlock> lock(mutex_); |
256 | 7.84M | return proxy_; |
257 | 7.84M | } |
258 | | |
259 | 0 | ::yb::HostPort RemoteTabletServer::ProxyEndpoint() const { |
260 | 0 | std::shared_lock<rw_spinlock> lock(mutex_); |
261 | 0 | return proxy_endpoint_; |
262 | 0 | } |
263 | | |
264 | 181k | string RemoteTabletServer::ToString() const { |
265 | 181k | string ret = "{ uuid: " + uuid_; |
266 | 181k | SharedLock<rw_spinlock> lock(mutex_); |
267 | 181k | if (!private_rpc_hostports_.empty()) { |
268 | 181k | ret += Format(" private: $0", private_rpc_hostports_); |
269 | 181k | } |
270 | 181k | if (!public_rpc_hostports_.empty()) { |
271 | 0 | ret += Format(" public: $0", public_rpc_hostports_); |
272 | 0 | } |
273 | 181k | ret += Format(" cloud_info: $0", cloud_info_pb_); |
274 | 181k | return ret; |
275 | 181k | } |
276 | | |
277 | 21.0k | bool RemoteTabletServer::HasHostFrom(const std::unordered_set<std::string>& hosts) const { |
278 | 21.0k | SharedLock<rw_spinlock> lock(mutex_); |
279 | 21.0k | for (const auto& hp : private_rpc_hostports_) { |
280 | 21.0k | if (hosts.count(hp.host())) { |
281 | 0 | return true; |
282 | 0 | } |
283 | 21.0k | } |
284 | 21.0k | for (const auto& hp : public_rpc_hostports_) { |
285 | 0 | if (hosts.count(hp.host())) { |
286 | 0 | return true; |
287 | 0 | } |
288 | 0 | } |
289 | 21.0k | return false; |
290 | 21.0k | } |
291 | | |
292 | 6.21M | bool RemoteTabletServer::HasCapability(CapabilityId capability) const { |
293 | 6.21M | SharedLock<rw_spinlock> lock(mutex_); |
294 | 6.21M | return std::binary_search(capabilities_.begin(), capabilities_.end(), capability); |
295 | 6.21M | } |
296 | | |
297 | 0 | std::string ReplicasCount::ToString() { |
298 | 0 | return Format( |
299 | 0 | " live replicas $0, read replicas $1, expected live replicas $2, expected read replicas $3", |
300 | 0 | num_alive_live_replicas, num_alive_read_replicas, |
301 | 0 | expected_live_replicas, expected_read_replicas); |
302 | 0 | } |
303 | | |
304 | | //////////////////////////////////////////////////////////// |
305 | | |
306 | | RemoteTablet::RemoteTablet(std::string tablet_id, |
307 | | Partition partition, |
308 | | boost::optional<PartitionListVersion> partition_list_version, |
309 | | uint64 split_depth, |
310 | | const TabletId& split_parent_tablet_id) |
311 | | : tablet_id_(std::move(tablet_id)), |
312 | | log_prefix_(Format("T $0: ", tablet_id_)), |
313 | | partition_(std::move(partition)), |
314 | | partition_list_version_(partition_list_version), |
315 | | split_depth_(split_depth), |
316 | | split_parent_tablet_id_(split_parent_tablet_id), |
317 | 70.6k | stale_(false) { |
318 | 70.6k | } |
319 | | |
320 | 160 | RemoteTablet::~RemoteTablet() { |
321 | 160 | if (PREDICT_FALSE(FLAGS_TEST_verify_all_replicas_alive)) { |
322 | | // Let's verify that none of the replicas are marked as failed. The test should always wait |
323 | | // enough time so that the lookup cache can be refreshed after force_lookup_cache_refresh_secs. |
324 | 0 | for (const auto& replica : replicas_) { |
325 | 0 | if (replica.Failed()) { |
326 | 0 | LOG_WITH_PREFIX(FATAL) << "Remote tablet server " << replica.ts->ToString() |
327 | 0 | << " with role " << PeerRole_Name(replica.role) |
328 | 0 | << " is marked as failed"; |
329 | 0 | } |
330 | 0 | } |
331 | 0 | } |
332 | 160 | } |
333 | | |
334 | | void RemoteTablet::Refresh( |
335 | | const TabletServerMap& tservers, |
336 | 353k | const google::protobuf::RepeatedPtrField<TabletLocationsPB_ReplicaPB>& replicas) { |
337 | | // Adopt the data from the successful response. |
338 | 353k | std::lock_guard<rw_spinlock> lock(mutex_); |
339 | 353k | std::vector<std::string> old_uuids; |
340 | 353k | old_uuids.reserve(replicas_.size()); |
341 | 730k | for (const auto& replica : replicas_) { |
342 | 730k | old_uuids.push_back(replica.ts->permanent_uuid()); |
343 | 730k | } |
344 | 353k | std::sort(old_uuids.begin(), old_uuids.end()); |
345 | 353k | replicas_.clear(); |
346 | 353k | bool has_new_replica = false; |
347 | 938k | for (const TabletLocationsPB_ReplicaPB& r : replicas) { |
348 | 938k | auto it = tservers.find(r.ts_info().permanent_uuid()); |
349 | 938k | CHECK(it != tservers.end()); |
350 | 938k | replicas_.emplace_back(it->second.get(), r.role()); |
351 | 938k | has_new_replica = |
352 | 938k | has_new_replica || |
353 | 800k | !std::binary_search(old_uuids.begin(), old_uuids.end(), r.ts_info().permanent_uuid()); |
354 | 938k | } |
355 | 353k | if (has_new_replica) { |
356 | 71.3k | lookups_without_new_replicas_ = 0; |
357 | 282k | } else { |
358 | 282k | ++lookups_without_new_replicas_; |
359 | 282k | } |
360 | 353k | stale_ = false; |
361 | 353k | refresh_time_.store(MonoTime::Now(), std::memory_order_release); |
362 | 353k | } |
363 | | |
364 | 32 | void RemoteTablet::MarkStale() { |
365 | 32 | std::lock_guard<rw_spinlock> lock(mutex_); |
366 | 32 | stale_ = true; |
367 | 32 | } |
368 | | |
369 | 11.0M | bool RemoteTablet::stale() const { |
370 | 11.0M | SharedLock<rw_spinlock> lock(mutex_); |
371 | 11.0M | return stale_; |
372 | 11.0M | } |
373 | | |
374 | 19 | void RemoteTablet::MarkAsSplit() { |
375 | 19 | std::lock_guard<rw_spinlock> lock(mutex_); |
376 | 19 | is_split_ = true; |
377 | 19 | } |
378 | | |
379 | 6.00M | bool RemoteTablet::is_split() const { |
380 | 6.00M | SharedLock<rw_spinlock> lock(mutex_); |
381 | 6.00M | return is_split_; |
382 | 6.00M | } |
383 | | |
384 | 4.92k | bool RemoteTablet::MarkReplicaFailed(RemoteTabletServer *ts, const Status& status) { |
385 | 4.92k | std::lock_guard<rw_spinlock> lock(mutex_); |
386 | 11 | VLOG_WITH_PREFIX(2) << "Current remote replicas in meta cache: " |
387 | 11 | << ReplicasAsStringUnlocked() << ". Replica " << ts->ToString() |
388 | 11 | << " has failed: " << status.ToString(); |
389 | 10.1k | for (RemoteReplica& rep : replicas_) { |
390 | 10.1k | if (rep.ts == ts) { |
391 | 4.91k | rep.MarkFailed(); |
392 | 4.91k | return true; |
393 | 4.91k | } |
394 | 10.1k | } |
395 | 15 | return false; |
396 | 4.92k | } |
397 | | |
398 | 1.69k | int RemoteTablet::GetNumFailedReplicas() const { |
399 | 1.69k | int failed = 0; |
400 | 1.69k | SharedLock<rw_spinlock> lock(mutex_); |
401 | 10.1k | for (const RemoteReplica& rep : replicas_) { |
402 | 10.1k | if (rep.Failed()) { |
403 | 19 | failed++; |
404 | 19 | } |
405 | 10.1k | } |
406 | 1.69k | return failed; |
407 | 1.69k | } |
408 | | |
409 | 1.79k | bool RemoteTablet::IsReplicasCountConsistent() const { |
410 | 1.79k | return replicas_count_.load(std::memory_order_acquire).IsReplicasCountConsistent(); |
411 | 1.79k | } |
412 | | |
413 | 0 | string RemoteTablet::ReplicasCountToString() const { |
414 | 0 | return replicas_count_.load(std::memory_order_acquire).ToString(); |
415 | 0 | } |
416 | | |
417 | 353k | void RemoteTablet::SetExpectedReplicas(int expected_live_replicas, int expected_read_replicas) { |
418 | 353k | ReplicasCount old_replicas_count = replicas_count_.load(std::memory_order_acquire); |
419 | 353k | for (;;) { |
420 | 353k | ReplicasCount new_replicas_count = old_replicas_count; |
421 | 353k | new_replicas_count.SetExpectedReplicas(expected_live_replicas, expected_read_replicas); |
422 | 353k | if (replicas_count_.compare_exchange_weak( |
423 | 353k | old_replicas_count, new_replicas_count, std::memory_order_acq_rel)) { |
424 | 353k | break; |
425 | 353k | } |
426 | 353k | } |
427 | 353k | } |
428 | | |
429 | 230k | void RemoteTablet::SetAliveReplicas(int alive_live_replicas, int alive_read_replicas) { |
430 | 230k | ReplicasCount old_replicas_count = replicas_count_.load(std::memory_order_acquire); |
431 | 230k | for (;;) { |
432 | 230k | ReplicasCount new_replicas_count = old_replicas_count; |
433 | 230k | new_replicas_count.SetAliveReplicas(alive_live_replicas, alive_read_replicas); |
434 | 230k | if (replicas_count_.compare_exchange_weak( |
435 | 230k | old_replicas_count, new_replicas_count, std::memory_order_acq_rel)) { |
436 | 230k | break; |
437 | 230k | } |
438 | 230k | } |
439 | 230k | } |
440 | | |
441 | 19.4M | RemoteTabletServer* RemoteTablet::LeaderTServer() const { |
442 | 19.4M | SharedLock<rw_spinlock> lock(mutex_); |
443 | 37.0M | for (const RemoteReplica& replica : replicas_) { |
444 | 37.0M | if (!replica.Failed() && replica.role == PeerRole::LEADER) { |
445 | 19.3M | return replica.ts; |
446 | 19.3M | } |
447 | 37.0M | } |
448 | 41.0k | return nullptr; |
449 | 19.4M | } |
450 | | |
451 | 12.1M | bool RemoteTablet::HasLeader() const { |
452 | 12.1M | return LeaderTServer() != nullptr; |
453 | 12.1M | } |
454 | | |
455 | | void RemoteTablet::GetRemoteTabletServers( |
456 | 230k | std::vector<RemoteTabletServer*>* servers, IncludeFailedReplicas include_failed_replicas) { |
457 | 230k | DCHECK(servers->empty()); |
458 | 230k | struct ReplicaUpdate { |
459 | 230k | RemoteReplica* replica; |
460 | 230k | tablet::RaftGroupStatePB new_state; |
461 | 230k | bool clear_failed; |
462 | 230k | }; |
463 | 230k | std::vector<ReplicaUpdate> replica_updates; |
464 | 230k | { |
465 | 230k | SharedLock<rw_spinlock> lock(mutex_); |
466 | 230k | int num_alive_live_replicas = 0; |
467 | 230k | int num_alive_read_replicas = 0; |
468 | 697k | for (RemoteReplica& replica : replicas_) { |
469 | 697k | if (replica.Failed()) { |
470 | 30.0k | if (include_failed_replicas) { |
471 | 1 | servers->push_back(replica.ts); |
472 | 1 | continue; |
473 | 1 | } |
474 | 30.0k | ReplicaUpdate replica_update = {&replica, RaftGroupStatePB::UNKNOWN, false}; |
475 | 18 | VLOG_WITH_PREFIX(4) |
476 | 18 | << "Replica " << replica.ts->ToString() |
477 | 18 | << " failed, state: " << RaftGroupStatePB_Name(replica.state) |
478 | 18 | << ", is local: " << replica.ts->IsLocal() |
479 | 18 | << ", time since failure: " << (MonoTime::Now() - replica.last_failed_time); |
480 | 30.0k | switch (replica.state) { |
481 | 29.9k | case RaftGroupStatePB::UNKNOWN: FALLTHROUGH_INTENDED; |
482 | 29.9k | case RaftGroupStatePB::NOT_STARTED: FALLTHROUGH_INTENDED; |
483 | 29.9k | case RaftGroupStatePB::BOOTSTRAPPING: FALLTHROUGH_INTENDED; |
484 | 29.9k | case RaftGroupStatePB::RUNNING: |
485 | | // These are non-terminal states that may retry. Check and update failed local replica's |
486 | | // current state. For remote replica, just wait for some time before retrying. |
487 | 29.9k | if (replica.ts->IsLocal()) { |
488 | 954 | tserver::GetTabletStatusRequestPB req; |
489 | 954 | tserver::GetTabletStatusResponsePB resp; |
490 | 954 | req.set_tablet_id(tablet_id_); |
491 | 954 | const Status status = |
492 | 954 | CHECK_NOTNULL(replica.ts->local_tserver())->GetTabletStatus(&req, &resp); |
493 | 954 | if (!status.ok() || resp.has_error()) { |
494 | 523 | LOG_WITH_PREFIX(ERROR) |
495 | 523 | << "Received error from GetTabletStatus: " |
496 | 523 | << (!status.ok() ? status : StatusFromPB(resp.error().status())); |
497 | 523 | continue; |
498 | 523 | } |
499 | | |
500 | 431 | DCHECK_EQ(resp.tablet_status().tablet_id(), tablet_id_); |
501 | 0 | VLOG_WITH_PREFIX(3) << "GetTabletStatus returned status: " |
502 | 0 | << tablet::RaftGroupStatePB_Name(resp.tablet_status().state()) |
503 | 0 | << " for replica " << replica.ts->ToString(); |
504 | 431 | replica_update.new_state = resp.tablet_status().state(); |
505 | 431 | if (replica_update.new_state != tablet::RaftGroupStatePB::RUNNING) { |
506 | 40 | if (replica_update.new_state != replica.state) { |
507 | | // Cannot update replica here directly because holding only shared lock on mutex. |
508 | 40 | replica_updates.push_back(replica_update); // Update only state |
509 | 40 | } |
510 | 40 | continue; |
511 | 40 | } |
512 | 391 | if (!replica.ts->local_tserver()->LeaderAndReady( |
513 | 391 | tablet_id_, /* allow_stale */ true)) { |
514 | | // Should continue here because otherwise failed state will be cleared. |
515 | 391 | continue; |
516 | 391 | } |
517 | 29.0k | } else if ((MonoTime::Now() - replica.last_failed_time) < |
518 | 28.9k | FLAGS_retry_failed_replica_ms * 1ms) { |
519 | 28.9k | continue; |
520 | 28.9k | } |
521 | 51 | break; |
522 | 0 | case RaftGroupStatePB::FAILED: FALLTHROUGH_INTENDED; |
523 | 15 | case RaftGroupStatePB::QUIESCING: FALLTHROUGH_INTENDED; |
524 | 50 | case RaftGroupStatePB::SHUTDOWN: |
525 | | // These are terminal states, so we won't retry. |
526 | 50 | continue; |
527 | 5 | } |
528 | | |
529 | 5 | VLOG_WITH_PREFIX(3) << "Changing state of replica " << replica.ts->ToString() |
530 | 0 | << " from failed to not failed"; |
531 | 5 | replica_update.clear_failed = true; |
532 | | // Cannot update replica here directly because holding only shared lock on mutex. |
533 | 5 | replica_updates.push_back(replica_update); |
534 | 667k | } else { |
535 | 667k | if (replica.role == PeerRole::READ_REPLICA) { |
536 | 5.32k | num_alive_read_replicas++; |
537 | 662k | } else if (replica.role == PeerRole::FOLLOWER || replica.role == PeerRole::LEADER) { |
538 | 658k | num_alive_live_replicas++; |
539 | 658k | } |
540 | 667k | } |
541 | 667k | servers->push_back(replica.ts); |
542 | 667k | } |
543 | 230k | SetAliveReplicas(num_alive_live_replicas, num_alive_read_replicas); |
544 | 230k | } |
545 | 230k | if (!replica_updates.empty()) { |
546 | 45 | std::lock_guard<rw_spinlock> lock(mutex_); |
547 | 45 | for (const auto& update : replica_updates) { |
548 | 45 | if (update.new_state != RaftGroupStatePB::UNKNOWN) { |
549 | 40 | update.replica->state = update.new_state; |
550 | 40 | } |
551 | 45 | if (update.clear_failed) { |
552 | 5 | update.replica->ClearFailed(); |
553 | 5 | } |
554 | 45 | } |
555 | 45 | } |
556 | 230k | } |
557 | | |
558 | 939 | bool RemoteTablet::MarkTServerAsLeader(const RemoteTabletServer* server) { |
559 | 939 | bool found = false; |
560 | 939 | std::lock_guard<rw_spinlock> lock(mutex_); |
561 | 2.94k | for (RemoteReplica& replica : replicas_) { |
562 | 2.94k | if (replica.ts == server) { |
563 | 939 | replica.role = PeerRole::LEADER; |
564 | 939 | found = true; |
565 | 2.00k | } else if (replica.role == PeerRole::LEADER) { |
566 | 183 | replica.role = PeerRole::FOLLOWER; |
567 | 183 | } |
568 | 2.94k | } |
569 | 0 | VLOG_WITH_PREFIX(3) << "Latest replicas: " << ReplicasAsStringUnlocked(); |
570 | 0 | VLOG_IF_WITH_PREFIX(3, !found) << "Specified server not found: " << server->ToString() |
571 | 0 | << ". Replicas: " << ReplicasAsStringUnlocked(); |
572 | 939 | return found; |
573 | 939 | } |
574 | | |
575 | 1.56k | void RemoteTablet::MarkTServerAsFollower(const RemoteTabletServer* server) { |
576 | 1.56k | bool found = false; |
577 | 1.56k | std::lock_guard<rw_spinlock> lock(mutex_); |
578 | 4.70k | for (RemoteReplica& replica : replicas_) { |
579 | 4.70k | if (replica.ts == server) { |
580 | 1.56k | replica.role = PeerRole::FOLLOWER; |
581 | 1.56k | found = true; |
582 | 1.56k | } |
583 | 4.70k | } |
584 | 0 | VLOG_WITH_PREFIX(3) << "Latest replicas: " << ReplicasAsStringUnlocked(); |
585 | 0 | DCHECK(found) << "Tablet " << tablet_id_ << ": Specified server not found: " |
586 | 0 | << server->ToString() << ". Replicas: " << ReplicasAsStringUnlocked(); |
587 | 1.56k | } |
588 | | |
589 | 4 | std::string RemoteTablet::ReplicasAsString() const { |
590 | 4 | SharedLock<rw_spinlock> lock(mutex_); |
591 | 4 | return ReplicasAsStringUnlocked(); |
592 | 4 | } |
593 | | |
594 | 4 | std::string RemoteTablet::ReplicasAsStringUnlocked() const { |
595 | 4 | DCHECK(mutex_.is_locked()); |
596 | 4 | string replicas_str; |
597 | 13 | for (const RemoteReplica& rep : replicas_) { |
598 | 13 | if (!replicas_str.empty()) replicas_str += ", "; |
599 | 13 | replicas_str += rep.ToString(); |
600 | 13 | } |
601 | 4 | return replicas_str; |
602 | 4 | } |
603 | | |
604 | 8.20k | std::string RemoteTablet::ToString() const { |
605 | 8.20k | return YB_CLASS_TO_STRING(tablet_id, partition, partition_list_version, split_depth); |
606 | 8.20k | } |
607 | | |
608 | 276 | PartitionListVersion RemoteTablet::GetLastKnownPartitionListVersion() const { |
609 | 276 | SharedLock<rw_spinlock> lock(mutex_); |
610 | 276 | return last_known_partition_list_version_; |
611 | 276 | } |
612 | | |
613 | | void RemoteTablet::MakeLastKnownPartitionListVersionAtLeast( |
614 | 340k | PartitionListVersion partition_list_version) { |
615 | 340k | std::lock_guard<rw_spinlock> lock(mutex_); |
616 | 340k | last_known_partition_list_version_ = |
617 | 340k | std::max(last_known_partition_list_version_, partition_list_version); |
618 | 340k | } |
619 | | |
620 | 96.1k | void LookupCallbackVisitor::operator()(const LookupTabletCallback& tablet_callback) const { |
621 | 96.1k | if (error_status_) { |
622 | 2.26k | tablet_callback(*error_status_); |
623 | 2.26k | return; |
624 | 2.26k | } |
625 | 93.8k | auto remote_tablet = boost::get<RemoteTabletPtr>(param_); |
626 | 93.8k | if (remote_tablet == nullptr) { |
627 | 0 | static const Status error_status = STATUS( |
628 | 0 | TryAgain, "Tablet for requested partition is not yet running", |
629 | 0 | ClientError(ClientErrorCode::kTabletNotYetRunning)); |
630 | 0 | tablet_callback(error_status); |
631 | 0 | return; |
632 | 0 | } |
633 | 93.8k | tablet_callback(remote_tablet); |
634 | 93.8k | } |
635 | | |
636 | | void LookupCallbackVisitor::operator()( |
637 | 0 | const LookupTabletRangeCallback& tablet_range_callback) const { |
638 | 0 | if (error_status_) { |
639 | 0 | tablet_range_callback(*error_status_); |
640 | 0 | return; |
641 | 0 | } |
642 | 0 | auto result = boost::get<std::vector<RemoteTabletPtr>>(param_); |
643 | 0 | tablet_range_callback(result); |
644 | 0 | } |
645 | | |
646 | | //////////////////////////////////////////////////////////// |
647 | | |
648 | | MetaCache::MetaCache(YBClient* client) |
649 | | : client_(client), |
650 | | master_lookup_sem_(FLAGS_max_concurrent_master_lookups), |
651 | 22.9k | log_prefix_(Format("MetaCache($0): ", static_cast<void*>(this))) { |
652 | 22.9k | } |
653 | | |
654 | 2.11k | MetaCache::~MetaCache() { |
655 | 2.11k | } |
656 | | |
657 | | void MetaCache::SetLocalTabletServer(const string& permanent_uuid, |
658 | | const shared_ptr<TabletServerServiceProxy>& proxy, |
659 | 5.48k | const LocalTabletServer* local_tserver) { |
660 | 5.48k | const auto entry = ts_cache_.emplace(permanent_uuid, |
661 | 5.48k | std::make_unique<RemoteTabletServer>(permanent_uuid, |
662 | 5.48k | proxy, |
663 | 5.48k | local_tserver)); |
664 | 5.48k | CHECK(entry.second); |
665 | 5.48k | local_tserver_ = entry.first->second.get(); |
666 | 5.48k | } |
667 | | |
668 | 219k | void MetaCache::UpdateTabletServerUnlocked(const master::TSInfoPB& pb) { |
669 | 219k | const std::string& permanent_uuid = pb.permanent_uuid(); |
670 | 219k | auto it = ts_cache_.find(permanent_uuid); |
671 | 219k | if (it != ts_cache_.end()) { |
672 | 202k | it->second->Update(pb); |
673 | 202k | return; |
674 | 202k | } |
675 | | |
676 | 16.9k | VLOG_WITH_PREFIX(1) << "Client caching new TabletServer " << permanent_uuid; |
677 | 16.9k | CHECK(ts_cache_.emplace(permanent_uuid, std::make_unique<RemoteTabletServer>(pb)).second); |
678 | 16.9k | } |
679 | | |
680 | | // A (table, partition_key) --> tablet lookup. May be in-flight to a master, or |
681 | | // may be handled locally. |
682 | | // |
683 | | // Keeps a reference on the owning metacache while alive. |
684 | | class LookupRpc : public internal::ClientMasterRpcBase, public RequestCleanup { |
685 | | public: |
686 | | LookupRpc(const scoped_refptr<MetaCache>& meta_cache, |
687 | | const std::shared_ptr<const YBTable>& table, |
688 | | int64_t request_no, |
689 | | CoarseTimePoint deadline); |
690 | | |
691 | | virtual ~LookupRpc(); |
692 | | |
693 | | void SendRpc() override; |
694 | | |
695 | 160k | MetaCache* meta_cache() { return meta_cache_.get(); } |
696 | 0 | YBClient* client() const { return meta_cache_->client_; } |
697 | | |
698 | | virtual void NotifyFailure(const Status& status) = 0; |
699 | | |
700 | | template <class Response> |
701 | | void DoProcessResponse(const Status& status, const Response& resp); |
702 | | |
703 | | // Subclasses can override VerifyResponse for implementing additional response checks. Called |
704 | | // from Finished if there are no errors passed in response. |
705 | 9.03k | virtual CHECKED_STATUS VerifyResponse() { return Status::OK(); } |
706 | | |
707 | 56.8k | int64_t request_no() const { |
708 | 56.8k | return request_no_; |
709 | 56.8k | } |
710 | | |
711 | 155k | const std::shared_ptr<const YBTable>& table() const { |
712 | 155k | return table_; |
713 | 155k | } |
714 | | |
715 | | // When we receive a response for a key lookup or full table rpc, add callbacks to be fired off |
716 | | // to the to_notify set corresponding to the rpc type. Modify tables pointer by removing lookups |
717 | | // as we add to the to_notify set. |
718 | | virtual void AddCallbacksToBeNotified( |
719 | | const ProcessedTablesMap& processed_tables, |
720 | | std::unordered_map<TableId, TableData>* tables, |
721 | | std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) = 0; |
722 | | |
723 | | // When looking up by key or full table, update the processe table with the returned location. |
724 | | virtual void UpdateProcessedTable(const TabletLocationsPB& loc, |
725 | | RemoteTabletPtr remote, |
726 | | ProcessedTablesMap::mapped_type* processed_table) = 0; |
727 | | |
728 | | private: |
729 | | virtual CHECKED_STATUS ProcessTabletLocations( |
730 | | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations, |
731 | | boost::optional<PartitionListVersion> table_partition_list_version) = 0; |
732 | | |
733 | | const int64_t request_no_; |
734 | | |
735 | | // Pointer back to the tablet cache. Populated with location information |
736 | | // if the lookup finishes successfully. |
737 | | // |
738 | | // When the RPC is destroyed, a master lookup permit is returned to the |
739 | | // cache if one was acquired in the first place. |
740 | | scoped_refptr<MetaCache> meta_cache_; |
741 | | |
742 | | // Table to lookup. Optional in case lookup by ID, but if specified used to check if table |
743 | | // partitions are stale. |
744 | | const std::shared_ptr<const YBTable> table_; |
745 | | |
746 | | // Whether this lookup has acquired a master lookup permit. |
747 | | bool has_permit_ = false; |
748 | | }; |
749 | | |
750 | | LookupRpc::LookupRpc(const scoped_refptr<MetaCache>& meta_cache, |
751 | | const std::shared_ptr<const YBTable>& table, |
752 | | int64_t request_no, |
753 | | CoarseTimePoint deadline) |
754 | | : ClientMasterRpcBase(meta_cache->client_, deadline), |
755 | | request_no_(request_no), |
756 | | meta_cache_(meta_cache), |
757 | 56.6k | table_(table) { |
758 | 56.6k | DCHECK(deadline != CoarseTimePoint()); |
759 | 56.6k | } |
760 | | |
761 | 56.6k | LookupRpc::~LookupRpc() { |
762 | 56.6k | if (has_permit_) { |
763 | 56.6k | meta_cache_->ReleaseMasterLookupPermit(); |
764 | 56.6k | } |
765 | 56.6k | } |
766 | | |
767 | 54.7k | void LookupRpc::SendRpc() { |
768 | 54.7k | if (!has_permit_) { |
769 | 54.7k | has_permit_ = meta_cache_->AcquireMasterLookupPermit(); |
770 | 54.7k | } |
771 | 54.7k | if (!has_permit_) { |
772 | | // Couldn't get a permit, try again in a little while. |
773 | 0 | ScheduleRetry(STATUS(TryAgain, "Client has too many outstanding requests to the master")); |
774 | 0 | return; |
775 | 0 | } |
776 | | |
777 | | // See YBClient::Data::SyncLeaderMasterRpc(). |
778 | 54.7k | auto now = CoarseMonoClock::Now(); |
779 | 54.7k | if (retrier().deadline() < now) { |
780 | 0 | Finished(STATUS_FORMAT(TimedOut, "Timed out after deadline expired, passed: $0", |
781 | 0 | MonoDelta(now - retrier().start()))); |
782 | 0 | return; |
783 | 0 | } |
784 | 54.7k | mutable_retrier()->PrepareController(); |
785 | | |
786 | 54.7k | ClientMasterRpcBase::SendRpc(); |
787 | 54.7k | } |
788 | | |
789 | | namespace { |
790 | | |
791 | 9.03k | CHECKED_STATUS GetFirstErrorForTabletById(const master::GetTabletLocationsResponsePB& resp) { |
792 | 8.70k | return resp.errors_size() > 0 ? StatusFromPB(resp.errors(0).status()) : Status::OK(); |
793 | 9.03k | } |
794 | | |
795 | 47.5k | CHECKED_STATUS GetFirstErrorForTabletById(const master::GetTableLocationsResponsePB& resp) { |
796 | | // There are no per-tablet lookup errors inside GetTableLocationsResponsePB. |
797 | 47.5k | return Status::OK(); |
798 | 47.5k | } |
799 | | |
800 | | template <class Response> |
801 | 56.5k | boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) { |
802 | 56.5k | return resp.has_partition_list_version() |
803 | 48.7k | ? boost::make_optional<PartitionListVersion>(resp.partition_list_version()) |
804 | 7.79k | : boost::none; |
805 | 56.5k | } meta_cache.cc:_ZN2yb6client8internal12_GLOBAL__N_123GetPartitionListVersionINS_6master27GetTableLocationsResponsePBEEEN5boost8optionalIjEERKT_ Line | Count | Source | 801 | 47.5k | boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) { | 802 | 47.5k | return resp.has_partition_list_version() | 803 | 47.5k | ? boost::make_optional<PartitionListVersion>(resp.partition_list_version()) | 804 | 1 | : boost::none; | 805 | 47.5k | } |
meta_cache.cc:_ZN2yb6client8internal12_GLOBAL__N_123GetPartitionListVersionINS_6master28GetTabletLocationsResponsePBEEEN5boost8optionalIjEERKT_ Line | Count | Source | 801 | 9.04k | boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) { | 802 | 9.04k | return resp.has_partition_list_version() | 803 | 1.24k | ? boost::make_optional<PartitionListVersion>(resp.partition_list_version()) | 804 | 7.79k | : boost::none; | 805 | 9.04k | } |
|
806 | | |
807 | | } // namespace |
808 | | |
809 | | template <class Response> |
810 | 56.6k | void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) { |
811 | 56.6k | auto new_status = status; |
812 | 56.6k | if (new_status.ok()) { |
813 | 56.5k | new_status = VerifyResponse(); |
814 | 56.5k | } |
815 | | |
816 | | // Prefer response failures over no tablets found. |
817 | 56.6k | if (new_status.ok()) { |
818 | 56.5k | new_status = GetFirstErrorForTabletById(resp); |
819 | 56.5k | } |
820 | | |
821 | 56.6k | if (new_status.ok() && resp.tablet_locations_size() == 0) { |
822 | 0 | new_status = STATUS(NotFound, "No such tablet found"); |
823 | 0 | } |
824 | | |
825 | 56.6k | if (new_status.ok()) { |
826 | 56.2k | if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) { |
827 | 0 | const auto s = STATUS(TimedOut, "Simulate timeout for test."); |
828 | 0 | NotifyFailure(s); |
829 | 0 | return; |
830 | 0 | } |
831 | 56.2k | new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp)); |
832 | 56.2k | } |
833 | 56.6k | if (!new_status.ok()) { |
834 | 265 | YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1) << new_status; |
835 | 421 | new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString())); |
836 | 421 | NotifyFailure(new_status); |
837 | 421 | } |
838 | 56.6k | } _ZN2yb6client8internal9LookupRpc17DoProcessResponseINS_6master27GetTableLocationsResponsePBEEEvRKNS_6StatusERKT_ Line | Count | Source | 810 | 47.5k | void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) { | 811 | 47.5k | auto new_status = status; | 812 | 47.5k | if (new_status.ok()) { | 813 | 47.5k | new_status = VerifyResponse(); | 814 | 47.5k | } | 815 | | | 816 | | // Prefer response failures over no tablets found. | 817 | 47.5k | if (new_status.ok()) { | 818 | 47.5k | new_status = GetFirstErrorForTabletById(resp); | 819 | 47.5k | } | 820 | | | 821 | 47.5k | if (new_status.ok() && resp.tablet_locations_size() == 0) { | 822 | 0 | new_status = STATUS(NotFound, "No such tablet found"); | 823 | 0 | } | 824 | | | 825 | 47.5k | if (new_status.ok()) { | 826 | 47.5k | if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) { | 827 | 0 | const auto s = STATUS(TimedOut, "Simulate timeout for test."); | 828 | 0 | NotifyFailure(s); | 829 | 0 | return; | 830 | 0 | } | 831 | 47.5k | new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp)); | 832 | 47.5k | } | 833 | 47.5k | if (!new_status.ok()) { | 834 | 21 | YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1) << new_status; | 835 | 85 | new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString())); | 836 | 85 | NotifyFailure(new_status); | 837 | 85 | } | 838 | 47.5k | } |
_ZN2yb6client8internal9LookupRpc17DoProcessResponseINS_6master28GetTabletLocationsResponsePBEEEvRKNS_6StatusERKT_ Line | Count | Source | 810 | 9.04k | void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) { | 811 | 9.04k | auto new_status = status; | 812 | 9.04k | if (new_status.ok()) { | 813 | 9.03k | new_status = VerifyResponse(); | 814 | 9.03k | } | 815 | | | 816 | | // Prefer response failures over no tablets found. | 817 | 9.04k | if (new_status.ok()) { | 818 | 9.03k | new_status = GetFirstErrorForTabletById(resp); | 819 | 9.03k | } | 820 | | | 821 | 9.04k | if (new_status.ok() && resp.tablet_locations_size() == 0) { | 822 | 0 | new_status = STATUS(NotFound, "No such tablet found"); | 823 | 0 | } | 824 | | | 825 | 9.04k | if (new_status.ok()) { | 826 | 8.70k | if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) { | 827 | 0 | const auto s = STATUS(TimedOut, "Simulate timeout for test."); | 828 | 0 | NotifyFailure(s); | 829 | 0 | return; | 830 | 0 | } | 831 | 8.70k | new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp)); | 832 | 8.70k | } | 833 | 9.04k | if (!new_status.ok()) { | 834 | 244 | YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 1) << new_status; | 835 | 336 | new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString())); | 836 | 336 | NotifyFailure(new_status); | 837 | 336 | } | 838 | 9.04k | } |
|
839 | | |
840 | | namespace { |
841 | | |
842 | | Status CheckTabletLocations( |
843 | 56.2k | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations) { |
844 | 56.2k | const std::string* prev_partition_end = nullptr; |
845 | 74.3k | for (const TabletLocationsPB& loc : locations) { |
846 | 74.3k | if (prev_partition_end && *prev_partition_end > loc.partition().partition_key_start()) { |
847 | 0 | LOG(DFATAL) << "There should be no overlaps in tablet partitions and they should be sorted " |
848 | 0 | << "by partition_key_start. Prev partition end: " |
849 | 0 | << Slice(*prev_partition_end).ToDebugHexString() << ", current partition start: " |
850 | 0 | << Slice(loc.partition().partition_key_start()).ToDebugHexString() |
851 | 0 | << ". Tablet locations: " << [&locations] { |
852 | 0 | std::string result; |
853 | 0 | for (auto& loc : locations) { |
854 | 0 | result += "\n " + AsString(loc); |
855 | 0 | } |
856 | 0 | return result; |
857 | 0 | }(); |
858 | 0 | return STATUS(IllegalState, "Wrong order or overlaps in partitions"); |
859 | 0 | } |
860 | 74.3k | prev_partition_end = &loc.partition().partition_key_end(); |
861 | 74.3k | } |
862 | 56.2k | return Status::OK(); |
863 | 56.2k | } |
864 | | |
865 | | class TabletIdLookup : public ToStringable { |
866 | | public: |
867 | 9.04k | explicit TabletIdLookup(const TabletId& tablet_id) : tablet_id_(tablet_id) {} |
868 | | |
869 | 0 | std::string ToString() const override { |
870 | 0 | return Format("Tablet: $0", tablet_id_); |
871 | 0 | } |
872 | | |
873 | | private: |
874 | | const TabletId& tablet_id_; |
875 | | }; |
876 | | |
877 | | class TablePartitionLookup : public ToStringable { |
878 | | public: |
879 | | explicit TablePartitionLookup( |
880 | | const TableId& table_id, const VersionedPartitionGroupStartKey& partition_group_start) |
881 | 47.5k | : table_id_(table_id), partition_group_start_(partition_group_start) {} |
882 | | |
883 | 0 | std::string ToString() const override { |
884 | 0 | return Format("Table: $0, partition: $1, partition list version: $2", |
885 | 0 | table_id_, Slice(*partition_group_start_.key).ToDebugHexString(), |
886 | 0 | partition_group_start_.partition_list_version); |
887 | 0 | } |
888 | | |
889 | | private: |
890 | | const TableId& table_id_; |
891 | | const VersionedPartitionGroupStartKey partition_group_start_; |
892 | | }; |
893 | | |
894 | | class FullTableLookup : public ToStringable { |
895 | | public: |
896 | | explicit FullTableLookup(const TableId& table_id) |
897 | 0 | : table_id_(table_id) {} |
898 | | |
899 | 0 | std::string ToString() const override { |
900 | 0 | return Format("FullTableLookup: $0", table_id_); |
901 | 0 | } |
902 | | private: |
903 | | const TableId& table_id_; |
904 | | }; |
905 | | |
906 | | } // namespace |
907 | | |
908 | | Status MetaCache::ProcessTabletLocations( |
909 | | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations, |
910 | 56.2k | boost::optional<PartitionListVersion> table_partition_list_version, LookupRpc* lookup_rpc) { |
911 | 0 | if (VLOG_IS_ON(2)) { |
912 | 0 | VLOG_WITH_PREFIX_AND_FUNC(2) << "lookup_rpc: " << AsString(lookup_rpc); |
913 | 0 | for (const auto& loc : locations) { |
914 | 0 | for (const auto& table_id : loc.table_ids()) { |
915 | 0 | VLOG_WITH_PREFIX_AND_FUNC(2) << loc.tablet_id() << ", " << table_id; |
916 | 0 | } |
917 | 0 | } |
918 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << AsString(locations); |
919 | 0 | } |
920 | | |
921 | 56.2k | RETURN_NOT_OK(CheckTabletLocations(locations)); |
922 | | |
923 | 56.2k | std::vector<std::pair<LookupCallback, LookupCallbackVisitor>> to_notify; |
924 | 56.2k | { |
925 | 56.2k | std::lock_guard<decltype(mutex_)> lock(mutex_); |
926 | 56.2k | ProcessedTablesMap processed_tables; |
927 | | |
928 | 74.3k | for (const TabletLocationsPB& loc : locations) { |
929 | 74.3k | const std::string& tablet_id = loc.tablet_id(); |
930 | | // Next, update the tablet caches. |
931 | 74.3k | RemoteTabletPtr remote = FindPtrOrNull(tablets_by_id_, tablet_id); |
932 | | |
933 | | // First, update the tserver cache, needed for the Refresh calls below. |
934 | 219k | for (const TabletLocationsPB_ReplicaPB& r : loc.replicas()) { |
935 | 219k | UpdateTabletServerUnlocked(r.ts_info()); |
936 | 219k | } |
937 | | |
938 | 74.3k | VersionedTablePartitionListPtr colocated_table_partition_list; |
939 | 74.3k | if (loc.table_ids_size() > 1 && lookup_rpc && lookup_rpc->table()) { |
940 | | // When table_ids_size() == 1 we only receive info for the single table from the master |
941 | | // and we already have TableData initialized for it (this is done before sending an RPC to |
942 | | // the master). And when table_ids_size() > 1, it means we got response for lookup RPC for |
943 | | // co-located table and we can re-use TableData::partition_list from the table that was |
944 | | // requested by MetaCache::LookupTabletByKey caller for other tables co-located with this |
945 | | // one (since all co-located tables sharing the same set of tablets have the same table |
946 | | // partition list and now we have list of them returned by the master). |
947 | 499 | const auto lookup_table_it = tables_.find(lookup_rpc->table()->id()); |
948 | 499 | if (lookup_table_it != tables_.end()) { |
949 | 499 | colocated_table_partition_list = lookup_table_it->second.partition_list; |
950 | 0 | } else { |
951 | | // We don't want to crash the server in that case for production, since this is not a |
952 | | // correctness issue, but gives some performance degradation on first lookups for |
953 | | // co-located tables. |
954 | | // But we do want it to crash in debug, so we can more reliably catch this if it happens. |
955 | 0 | LOG_WITH_PREFIX(DFATAL) << Format( |
956 | 0 | "Internal error: got response for lookup RPC for co-located table, but MetaCache " |
957 | 0 | "table data wasn't initialized with partition list for this table. RPC: $0", |
958 | 0 | AsString(lookup_rpc)); |
959 | 0 | } |
960 | 499 | } |
961 | | |
962 | 353k | for (const std::string& table_id : loc.table_ids()) { |
963 | 353k | auto& processed_table = processed_tables[table_id]; |
964 | 353k | std::map<PartitionKey, RemoteTabletPtr>* tablets_by_key = nullptr; |
965 | | |
966 | 353k | auto table_it = tables_.find(table_id); |
967 | 353k | if (table_it == tables_.end() && loc.table_ids_size() > 1 && |
968 | 270k | colocated_table_partition_list) { |
969 | 265k | table_it = InitTableDataUnlocked(table_id, colocated_table_partition_list); |
970 | 265k | } |
971 | 353k | if (table_it != tables_.end()) { |
972 | 340k | auto& table_data = table_it->second; |
973 | | |
974 | 1 | const auto msg_formatter = [&] { |
975 | 1 | return Format( |
976 | 1 | "Received table $0 partitions version: $1, MetaCache's table partitions version: " |
977 | 1 | "$2", |
978 | 1 | table_id, table_partition_list_version, table_data.partition_list->version); |
979 | 1 | }; |
980 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << msg_formatter(); |
981 | 340k | if (table_partition_list_version.has_value()) { |
982 | 340k | if (table_partition_list_version.get() != table_data.partition_list->version) { |
983 | 1 | return STATUS( |
984 | 1 | TryAgain, msg_formatter(), |
985 | 1 | ClientError(ClientErrorCode::kTablePartitionListIsStale)); |
986 | 1 | } |
987 | | // We need to guarantee that table_data.tablets_by_partition cache corresponds to |
988 | | // table_data.partition_list (see comments for TableData::partitions). |
989 | | // So, we don't update tablets_by_partition cache if we don't know table partitions |
990 | | // version for both response and TableData. |
991 | | // This only can happen for those LookupTabletById requests that don't specify table, |
992 | | // because they don't care about partitions changing. |
993 | 340k | tablets_by_key = &table_data.tablets_by_partition; |
994 | 340k | } |
995 | 340k | } |
996 | | |
997 | 353k | if (remote) { |
998 | | // Partition should not have changed. |
999 | 282k | DCHECK_EQ(loc.partition().partition_key_start(), |
1000 | 282k | remote->partition().partition_key_start()); |
1001 | 282k | DCHECK_EQ(loc.partition().partition_key_end(), |
1002 | 282k | remote->partition().partition_key_end()); |
1003 | | |
1004 | | // For colocated tables, RemoteTablet already exists because it was processed |
1005 | | // in a previous iteration of the for loop (for loc.table_ids()). |
1006 | | // We need to add this tablet to the current table's tablets_by_key map. |
1007 | 282k | if (tablets_by_key) { |
1008 | 277k | (*tablets_by_key)[remote->partition().partition_key_start()] = remote; |
1009 | 277k | } |
1010 | | |
1011 | 0 | VLOG_WITH_PREFIX(5) << "Refreshing tablet " << tablet_id << ": " |
1012 | 0 | << loc.ShortDebugString(); |
1013 | 70.6k | } else { |
1014 | 0 | VLOG_WITH_PREFIX(5) << "Caching tablet " << tablet_id << ": " << loc.ShortDebugString(); |
1015 | | |
1016 | 70.6k | Partition partition; |
1017 | 70.6k | Partition::FromPB(loc.partition(), &partition); |
1018 | 70.6k | remote = new RemoteTablet( |
1019 | 70.6k | tablet_id, partition, table_partition_list_version, loc.split_depth(), |
1020 | 70.6k | loc.split_parent_tablet_id()); |
1021 | | |
1022 | 70.6k | CHECK(tablets_by_id_.emplace(tablet_id, remote).second); |
1023 | 70.6k | if (tablets_by_key) { |
1024 | 62.7k | auto emplace_result = tablets_by_key->emplace(partition.partition_key_start(), remote); |
1025 | 62.7k | if (!emplace_result.second) { |
1026 | 12 | const auto& old_tablet = emplace_result.first->second; |
1027 | 12 | if (old_tablet->split_depth() < remote->split_depth()) { |
1028 | | // Only replace with tablet of higher split_depth. |
1029 | 12 | emplace_result.first->second = remote; |
1030 | 0 | } else { |
1031 | | // If split_depth is the same - it should be the same tablet. |
1032 | 0 | if (old_tablet->split_depth() == loc.split_depth() |
1033 | 0 | && old_tablet->tablet_id() != tablet_id) { |
1034 | 0 | const auto error_msg = Format( |
1035 | 0 | "Can't replace tablet $0 with $1 at partition_key_start $2, split_depth $3", |
1036 | 0 | old_tablet->tablet_id(), tablet_id, loc.partition().partition_key_start(), |
1037 | 0 | old_tablet->split_depth()); |
1038 | 0 | LOG_WITH_PREFIX(DFATAL) << error_msg; |
1039 | | // Just skip updating this tablet for release build. |
1040 | 0 | } |
1041 | 0 | } |
1042 | 12 | } |
1043 | 62.7k | } |
1044 | 70.6k | MaybeUpdateClientRequests(*remote); |
1045 | 70.6k | } |
1046 | 353k | remote->Refresh(ts_cache_, loc.replicas()); |
1047 | 353k | remote->SetExpectedReplicas(loc.expected_live_replicas(), loc.expected_read_replicas()); |
1048 | 353k | if (table_partition_list_version.has_value()) { |
1049 | 340k | remote->MakeLastKnownPartitionListVersionAtLeast(*table_partition_list_version); |
1050 | 340k | } |
1051 | 353k | if (lookup_rpc) { |
1052 | 352k | lookup_rpc->UpdateProcessedTable(loc, remote, &processed_table); |
1053 | 352k | } |
1054 | 353k | } |
1055 | | |
1056 | 74.3k | auto it = tablet_lookups_by_id_.find(tablet_id); |
1057 | 74.3k | if (it != tablet_lookups_by_id_.end()) { |
1058 | 33.0k | while (auto* lookup = it->second.lookups.Pop()) { |
1059 | 23.5k | to_notify.emplace_back(std::move(lookup->callback), |
1060 | 23.5k | LookupCallbackVisitor(remote)); |
1061 | 23.5k | delete lookup; |
1062 | 23.5k | } |
1063 | 9.58k | } |
1064 | 74.3k | } |
1065 | 56.2k | if (lookup_rpc) { |
1066 | 56.2k | lookup_rpc->AddCallbacksToBeNotified(processed_tables, &tables_, &to_notify); |
1067 | 56.2k | lookup_rpc->CleanupRequest(); |
1068 | 56.2k | } |
1069 | 56.2k | } |
1070 | | |
1071 | 93.8k | for (const auto& callback_and_param : to_notify) { |
1072 | 93.8k | boost::apply_visitor(callback_and_param.second, callback_and_param.first); |
1073 | 93.8k | } |
1074 | | |
1075 | 56.2k | return Status::OK(); |
1076 | 56.2k | } |
1077 | | |
1078 | 70.6k | void MetaCache::MaybeUpdateClientRequests(const RemoteTablet& tablet) { |
1079 | 0 | VLOG_WITH_PREFIX_AND_FUNC(2) << "Tablet: " << tablet.tablet_id() |
1080 | 0 | << " split parent: " << tablet.split_parent_tablet_id(); |
1081 | 70.6k | if (tablet.split_parent_tablet_id().empty()) { |
1082 | 0 | VLOG_WITH_PREFIX(2) << "Tablet " << tablet.tablet_id() << " is not a result of split"; |
1083 | 70.5k | return; |
1084 | 70.5k | } |
1085 | | // TODO: MetaCache is a friend of Client and tablet_requests_mutex_ with tablet_requests_ are |
1086 | | // public members of YBClient::Data. Consider refactoring that. |
1087 | 34 | std::lock_guard<simple_spinlock> request_lock(client_->data_->tablet_requests_mutex_); |
1088 | 34 | auto& tablet_requests = client_->data_->tablet_requests_; |
1089 | 34 | const auto requests_it = tablet_requests.find(tablet.split_parent_tablet_id()); |
1090 | 34 | if (requests_it == tablet_requests.end()) { |
1091 | 0 | VLOG_WITH_PREFIX(2) << "Can't find request_id_seq for tablet " |
1092 | 0 | << tablet.split_parent_tablet_id(); |
1093 | | // This can happen if client wasn't active (for example node was partitioned away) during |
1094 | | // sequence of splits that resulted in `tablet` creation, so we don't have info about `tablet` |
1095 | | // split parent. |
1096 | | // In this case we set request_id_seq to special value and will reset it on getting |
1097 | | // "request id is less than min" error. We will use min request ID plus 2^24 (there wouldn't be |
1098 | | // 2^24 client requests in progress from the same client to the same tablet, so it is safe to do |
1099 | | // this). |
1100 | 10 | tablet_requests.emplace( |
1101 | 10 | tablet.tablet_id(), |
1102 | 10 | YBClient::Data::TabletRequests { |
1103 | 10 | .request_id_seq = kInitializeFromMinRunning |
1104 | 10 | }); |
1105 | 10 | return; |
1106 | 10 | } |
1107 | 24 | VLOG_WITH_PREFIX(2) << "Setting request_id_seq for tablet " << tablet.tablet_id() |
1108 | 0 | << " from tablet " << tablet.split_parent_tablet_id() << " to " |
1109 | 0 | << requests_it->second.request_id_seq; |
1110 | 24 | tablet_requests[tablet.tablet_id()].request_id_seq = requests_it->second.request_id_seq; |
1111 | 24 | } |
1112 | | |
1113 | | std::unordered_map<TableId, TableData>::iterator MetaCache::InitTableDataUnlocked( |
1114 | 309k | const TableId& table_id, const VersionedTablePartitionListPtr& partitions) { |
1115 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << Format( |
1116 | 0 | "MetaCache initializing TableData ($0 tables) for table $1: $2, " |
1117 | 0 | "partition_list_version: $3", |
1118 | 0 | tables_.size(), table_id, tables_.count(table_id), partitions->version); |
1119 | 309k | return tables_.emplace( |
1120 | 309k | std::piecewise_construct, std::forward_as_tuple(table_id), |
1121 | 309k | std::forward_as_tuple(partitions)).first; |
1122 | 309k | } |
1123 | | |
1124 | 16.0k | void MetaCache::InvalidateTableCache(const YBTable& table) { |
1125 | 16.0k | const auto& table_id = table.id(); |
1126 | 16.0k | const auto table_partition_list = table.GetVersionedPartitions(); |
1127 | 0 | VLOG_WITH_PREFIX_AND_FUNC(1) << Format( |
1128 | 0 | "table: $0, table.partition_list.version: $1", table_id, table_partition_list->version); |
1129 | | |
1130 | 16.0k | std::vector<LookupCallback> to_notify; |
1131 | | |
1132 | 16.0k | auto invalidate_needed = [this, &table_id, &table_partition_list](const auto& it) { |
1133 | 16.0k | const auto table_data_partition_list_version = it->second.partition_list->version; |
1134 | 0 | VLOG_WITH_PREFIX(1) << Format( |
1135 | 0 | "tables_[$0].partition_list.version: $1", table_id, table_data_partition_list_version); |
1136 | | // Only need to invalidate table cache, if it is has partition list version older than table's |
1137 | | // one. |
1138 | 16.0k | return table_data_partition_list_version < table_partition_list->version; |
1139 | 16.0k | }; |
1140 | | |
1141 | 16.0k | { |
1142 | 16.0k | SharedLock<decltype(mutex_)> lock(mutex_); |
1143 | 16.0k | auto it = tables_.find(table_id); |
1144 | 16.0k | if (it != tables_.end()) { |
1145 | 16.0k | if (!invalidate_needed(it)) { |
1146 | 16.0k | return; |
1147 | 16.0k | } |
1148 | 12 | } |
1149 | 12 | } |
1150 | | |
1151 | 12 | { |
1152 | 12 | UniqueLock<decltype(mutex_)> lock(mutex_); |
1153 | 12 | auto it = tables_.find(table_id); |
1154 | 12 | if (it != tables_.end()) { |
1155 | 12 | if (!invalidate_needed(it)) { |
1156 | 0 | return; |
1157 | 0 | } |
1158 | 0 | } else { |
1159 | 0 | it = InitTableDataUnlocked(table_id, table_partition_list); |
1160 | | // Nothing to invalidate, we have just initiliazed it first time. |
1161 | 0 | return; |
1162 | 0 | } |
1163 | | |
1164 | 12 | auto& table_data = it->second; |
1165 | | |
1166 | | // Some partitions could be mapped to tablets that have been split and we need to re-fetch |
1167 | | // info about tablets serving partitions. |
1168 | 32 | for (auto& tablet : table_data.tablets_by_partition) { |
1169 | 32 | tablet.second->MarkStale(); |
1170 | 32 | } |
1171 | | |
1172 | 0 | for (auto& tablet : table_data.all_tablets) { |
1173 | 0 | tablet->MarkStale(); |
1174 | 0 | } |
1175 | | // TODO(tsplit): Optimize to retry only necessary lookups inside ProcessTabletLocations, |
1176 | | // detect which need to be retried by GetTableLocationsResponsePB.partition_list_version. |
1177 | 14 | for (auto& group_lookups : table_data.tablet_lookups_by_group) { |
1178 | 14 | while (auto* lookup = group_lookups.second.lookups.Pop()) { |
1179 | 0 | to_notify.push_back(std::move(lookup->callback)); |
1180 | 0 | delete lookup; |
1181 | 0 | } |
1182 | 14 | } |
1183 | 12 | table_data.tablet_lookups_by_group.clear(); |
1184 | | |
1185 | | // Only update partitions here after invalidating TableData cache to avoid inconsistencies. |
1186 | | // See https://github.com/yugabyte/yugabyte-db/issues/6890. |
1187 | 12 | table_data.partition_list = table_partition_list; |
1188 | 12 | } |
1189 | 0 | for (const auto& callback : to_notify) { |
1190 | 0 | const auto s = STATUS_EC_FORMAT( |
1191 | 0 | TryAgain, ClientError(ClientErrorCode::kMetaCacheInvalidated), |
1192 | 0 | "MetaCache for table $0 has been invalidated.", table_id); |
1193 | 0 | boost::apply_visitor(LookupCallbackVisitor(s), callback); |
1194 | 0 | } |
1195 | 12 | } |
1196 | | |
1197 | | class MetaCache::CallbackNotifier { |
1198 | | public: |
1199 | 421 | explicit CallbackNotifier(const Status& status) : status_(status) {} |
1200 | | |
1201 | 2.26k | void Add(LookupCallback&& callback) { |
1202 | 2.26k | callbacks_.push_back(std::move(callback)); |
1203 | 2.26k | } |
1204 | | |
1205 | 2 | void SetStatus(const Status& status) { |
1206 | 2 | status_ = status; |
1207 | 2 | } |
1208 | | |
1209 | 421 | ~CallbackNotifier() { |
1210 | 2.26k | for (const auto& callback : callbacks_) { |
1211 | 2.26k | boost::apply_visitor(LookupCallbackVisitor(status_), callback); |
1212 | 2.26k | } |
1213 | 421 | } |
1214 | | private: |
1215 | | std::vector<LookupCallback> callbacks_; |
1216 | | Status status_; |
1217 | | }; |
1218 | | |
1219 | | CoarseTimePoint MetaCache::LookupFailed( |
1220 | | const Status& status, int64_t request_no, const ToStringable& lookup_id, |
1221 | | LookupDataGroup* lookup_data_group, |
1222 | 421 | CallbackNotifier* notifier) { |
1223 | 421 | std::vector<LookupData*> retry; |
1224 | 421 | auto now = CoarseMonoClock::Now(); |
1225 | 421 | CoarseTimePoint max_deadline; |
1226 | | |
1227 | 3.17k | while (auto* lookup = lookup_data_group->lookups.Pop()) { |
1228 | 2.75k | if (!status.IsTimedOut() || lookup->deadline <= now) { |
1229 | 2.26k | notifier->Add(std::move(lookup->callback)); |
1230 | 2.26k | delete lookup; |
1231 | 488 | } else { |
1232 | 488 | max_deadline = std::max(max_deadline, lookup->deadline); |
1233 | 488 | retry.push_back(lookup); |
1234 | 488 | } |
1235 | 2.75k | } |
1236 | | |
1237 | 421 | if (retry.empty()) { |
1238 | 416 | lookup_data_group->Finished(request_no, lookup_id); |
1239 | 5 | } else { |
1240 | 488 | for (auto* lookup : retry) { |
1241 | 488 | lookup_data_group->lookups.Push(lookup); |
1242 | 488 | } |
1243 | 5 | } |
1244 | | |
1245 | 421 | return max_deadline; |
1246 | 421 | } |
1247 | | |
1248 | | class LookupByIdRpc : public LookupRpc { |
1249 | | public: |
1250 | | LookupByIdRpc(const scoped_refptr<MetaCache>& meta_cache, |
1251 | | const TabletId& tablet_id, |
1252 | | const std::shared_ptr<const YBTable>& table, |
1253 | | master::IncludeInactive include_inactive, |
1254 | | int64_t request_no, |
1255 | | CoarseTimePoint deadline, |
1256 | | int64_t lookups_without_new_replicas) |
1257 | | : LookupRpc(meta_cache, table, request_no, deadline), |
1258 | | tablet_id_(tablet_id), |
1259 | 9.08k | include_inactive_(include_inactive) { |
1260 | 9.08k | if (lookups_without_new_replicas != 0) { |
1261 | 810 | send_delay_ = std::min( |
1262 | 810 | lookups_without_new_replicas * FLAGS_meta_cache_lookup_throttling_step_ms, |
1263 | 810 | FLAGS_meta_cache_lookup_throttling_max_delay_ms) * 1ms; |
1264 | 810 | } |
1265 | 9.08k | } |
1266 | | |
1267 | 674 | std::string ToString() const override { |
1268 | 674 | return Format("LookupByIdRpc(tablet: $0, num_attempts: $1)", tablet_id_, num_attempts()); |
1269 | 674 | } |
1270 | | |
1271 | 9.91k | void SendRpc() override { |
1272 | 9.91k | if (send_delay_) { |
1273 | 810 | auto delay = send_delay_; |
1274 | 810 | send_delay_ = MonoDelta(); |
1275 | 810 | auto status = mutable_retrier()->DelayedRetry(this, Status::OK(), delay); |
1276 | 810 | if (!status.ok()) { |
1277 | 0 | Finished(status); |
1278 | 0 | } |
1279 | 810 | return; |
1280 | 810 | } |
1281 | | |
1282 | 9.10k | LookupRpc::SendRpc(); |
1283 | 9.10k | } |
1284 | | |
1285 | 9.10k | void CallRemoteMethod() override { |
1286 | | // Fill out the request. |
1287 | 9.10k | req_.clear_tablet_ids(); |
1288 | 9.10k | req_.add_tablet_ids(tablet_id_); |
1289 | 9.10k | if (table()) { |
1290 | 1.31k | req_.set_table_id(table()->id()); |
1291 | 1.31k | } |
1292 | 9.10k | req_.set_include_inactive(include_inactive_); |
1293 | | |
1294 | 9.10k | master_client_proxy()->GetTabletLocationsAsync( |
1295 | 9.10k | req_, &resp_, mutable_retrier()->mutable_controller(), |
1296 | 9.10k | std::bind(&LookupByIdRpc::Finished, this, Status::OK())); |
1297 | 9.10k | } |
1298 | | |
1299 | | void AddCallbacksToBeNotified( |
1300 | | const ProcessedTablesMap& processed_tables, |
1301 | | std::unordered_map<TableId, TableData>* tables, |
1302 | 8.70k | std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override { |
1303 | | // Nothing to do when looking up by id. |
1304 | 8.70k | return; |
1305 | 8.70k | } |
1306 | | |
1307 | | void UpdateProcessedTable(const TabletLocationsPB& loc, |
1308 | | RemoteTabletPtr remote, |
1309 | 14.4k | ProcessedTablesMap::mapped_type* processed_table) override { |
1310 | 14.4k | return; |
1311 | 14.4k | } |
1312 | | |
1313 | | private: |
1314 | 9.04k | void ProcessResponse(const Status& status) override { |
1315 | 9.04k | DoProcessResponse(status, resp_); |
1316 | 9.04k | } |
1317 | | |
1318 | 9.11k | Status ResponseStatus() override { |
1319 | 9.11k | return StatusFromResp(resp_); |
1320 | 9.11k | } |
1321 | | |
1322 | 8.70k | void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS { |
1323 | 8.70k | auto& lookups = meta_cache()->tablet_lookups_by_id_; |
1324 | 8.70k | auto it = lookups.find(tablet_id_); |
1325 | 8.70k | TabletIdLookup tablet_id_lookup(tablet_id_); |
1326 | 8.70k | if (it != lookups.end()) { |
1327 | 8.70k | it->second.Finished(request_no(), tablet_id_lookup); |
1328 | 0 | } else { |
1329 | 0 | LOG_WITH_PREFIX(INFO) << "Cleanup request for unknown tablet: " |
1330 | 0 | << tablet_id_lookup.ToString(); |
1331 | 0 | } |
1332 | 8.70k | } |
1333 | | |
1334 | 336 | void NotifyFailure(const Status& status) override { |
1335 | 336 | meta_cache()->LookupByIdFailed( |
1336 | 336 | tablet_id_, table(), include_inactive_, |
1337 | 336 | GetPartitionListVersion(resp_), request_no(), status); |
1338 | 336 | } |
1339 | | |
1340 | | Status ProcessTabletLocations( |
1341 | | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations, |
1342 | 8.70k | boost::optional<PartitionListVersion> table_partition_list_version) override { |
1343 | 8.70k | return meta_cache()->ProcessTabletLocations( |
1344 | 8.70k | locations, table_partition_list_version, this); |
1345 | 8.70k | } |
1346 | | |
1347 | | // Tablet to lookup. |
1348 | | const TabletId tablet_id_; |
1349 | | |
1350 | | // Whether or not to lookup inactive (hidden) tablets. |
1351 | | master::IncludeInactive include_inactive_; |
1352 | | |
1353 | | // Request body. |
1354 | | master::GetTabletLocationsRequestPB req_; |
1355 | | |
1356 | | // Response body. |
1357 | | master::GetTabletLocationsResponsePB resp_; |
1358 | | |
1359 | | MonoDelta send_delay_; |
1360 | | }; |
1361 | | |
1362 | | class LookupFullTableRpc : public LookupRpc { |
1363 | | public: |
1364 | | LookupFullTableRpc(const scoped_refptr<MetaCache>& meta_cache, |
1365 | | const std::shared_ptr<const YBTable>& table, |
1366 | | int64_t request_no, |
1367 | | CoarseTimePoint deadline) |
1368 | 0 | : LookupRpc(meta_cache, table, request_no, deadline) { |
1369 | 0 | } |
1370 | | |
1371 | 0 | std::string ToString() const override { |
1372 | 0 | return Format("LookupFullTableRpc($0, $1)", table()->name(), num_attempts()); |
1373 | 0 | } |
1374 | | |
1375 | 0 | void CallRemoteMethod() override { |
1376 | | // Fill out the request. |
1377 | 0 | req_.mutable_table()->set_table_id(table()->id()); |
1378 | | // The end partition key is left unset intentionally so that we'll prefetch |
1379 | | // some additional tablets. |
1380 | 0 | master_client_proxy()->GetTableLocationsAsync( |
1381 | 0 | req_, &resp_, mutable_retrier()->mutable_controller(), |
1382 | 0 | std::bind(&LookupFullTableRpc::Finished, this, Status::OK())); |
1383 | 0 | } |
1384 | | |
1385 | | void AddCallbacksToBeNotified( |
1386 | | const ProcessedTablesMap& processed_tables, |
1387 | | std::unordered_map<TableId, TableData>* tables, |
1388 | 0 | std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override { |
1389 | 0 | for (const auto& processed_table : processed_tables) { |
1390 | | // Handle tablet range |
1391 | 0 | const auto it = tables->find(processed_table.first); |
1392 | 0 | if (it == tables->end()) { |
1393 | 0 | continue; |
1394 | 0 | } |
1395 | 0 | auto& table_data = it->second; |
1396 | 0 | auto& full_table_lookups = table_data.full_table_lookups; |
1397 | 0 | while (auto* lookup = full_table_lookups.lookups.Pop()) { |
1398 | 0 | std::vector<internal::RemoteTabletPtr> remote_tablets; |
1399 | 0 | for (const auto& entry : processed_table.second) { |
1400 | 0 | remote_tablets.push_back(entry.second); |
1401 | 0 | } |
1402 | 0 | table_data.all_tablets = remote_tablets; |
1403 | 0 | to_notify->emplace_back(std::move(lookup->callback), |
1404 | 0 | LookupCallbackVisitor(std::move(remote_tablets))); |
1405 | 0 | delete lookup; |
1406 | 0 | } |
1407 | 0 | } |
1408 | 0 | } |
1409 | | |
1410 | | void UpdateProcessedTable(const TabletLocationsPB& loc, |
1411 | | RemoteTabletPtr remote, |
1412 | 0 | ProcessedTablesMap::mapped_type* processed_table) override { |
1413 | 0 | processed_table->emplace(loc.partition().partition_key_start(), remote); |
1414 | 0 | } |
1415 | | |
1416 | | private: |
1417 | 0 | void ProcessResponse(const Status& status) override { |
1418 | 0 | DoProcessResponse(status, resp_); |
1419 | 0 | } |
1420 | | |
1421 | 0 | Status ResponseStatus() override { |
1422 | 0 | return StatusFromResp(resp_); |
1423 | 0 | } |
1424 | | |
1425 | 0 | void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS { |
1426 | 0 | const auto it = meta_cache()->tables_.find(table()->id()); |
1427 | 0 | if (it == meta_cache()->tables_.end()) { |
1428 | 0 | return; |
1429 | 0 | } |
1430 | 0 | auto& table_data = it->second; |
1431 | 0 | auto full_table_lookup = FullTableLookup(table()->id()); |
1432 | 0 | table_data.full_table_lookups.Finished(request_no(), full_table_lookup, false); |
1433 | 0 | } |
1434 | | |
1435 | 0 | void NotifyFailure(const Status& status) override { |
1436 | 0 | meta_cache()->LookupFullTableFailed(table(), request_no(), status); |
1437 | 0 | } |
1438 | | |
1439 | | Status ProcessTabletLocations( |
1440 | | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations, |
1441 | 0 | boost::optional<PartitionListVersion> table_partition_list_version) override { |
1442 | 0 | return meta_cache()->ProcessTabletLocations( |
1443 | 0 | locations, table_partition_list_version, this); |
1444 | 0 | } |
1445 | | |
1446 | | // Request body. |
1447 | | GetTableLocationsRequestPB req_; |
1448 | | |
1449 | | // Response body. |
1450 | | GetTableLocationsResponsePB resp_; |
1451 | | }; |
1452 | | |
1453 | | class LookupByKeyRpc : public LookupRpc { |
1454 | | public: |
1455 | | LookupByKeyRpc(const scoped_refptr<MetaCache>& meta_cache, |
1456 | | const std::shared_ptr<const YBTable>& table, |
1457 | | const VersionedPartitionGroupStartKey& partition_group_start, |
1458 | | int64_t request_no, |
1459 | | CoarseTimePoint deadline) |
1460 | | : LookupRpc(meta_cache, table, request_no, deadline), |
1461 | 47.5k | partition_group_start_(partition_group_start) { |
1462 | 47.5k | } |
1463 | | |
1464 | 253 | std::string ToString() const override { |
1465 | 253 | return Format( |
1466 | 253 | "GetTableLocations { table_name: $0, table_id: $1, partition_start_key: $2, " |
1467 | 253 | "partition_list_version: $3, " |
1468 | 253 | "request_no: $4, num_attempts: $5 }", |
1469 | 253 | table()->name(), |
1470 | 253 | table()->id(), |
1471 | 253 | table()->partition_schema().PartitionKeyDebugString( |
1472 | 253 | *partition_group_start_.key, internal::GetSchema(table()->schema())), |
1473 | 253 | partition_group_start_.partition_list_version, |
1474 | 253 | request_no(), |
1475 | 253 | num_attempts()); |
1476 | 253 | } |
1477 | | |
1478 | 0 | const string& table_id() const { return table()->id(); } |
1479 | | |
1480 | 47.5k | void CallRemoteMethod() override { |
1481 | | // Fill out the request. |
1482 | 47.5k | req_.mutable_table()->set_table_id(table()->id()); |
1483 | 47.5k | req_.set_partition_key_start(*partition_group_start_.key); |
1484 | 47.5k | req_.set_max_returned_locations(kPartitionGroupSize); |
1485 | | |
1486 | | // The end partition key is left unset intentionally so that we'll prefetch |
1487 | | // some additional tablets. |
1488 | 47.5k | master_client_proxy()->GetTableLocationsAsync( |
1489 | 47.5k | req_, &resp_, mutable_retrier()->mutable_controller(), |
1490 | 47.5k | std::bind(&LookupByKeyRpc::Finished, this, Status::OK())); |
1491 | 47.5k | } |
1492 | | |
1493 | | void AddCallbacksToBeNotified( |
1494 | | const ProcessedTablesMap& processed_tables, |
1495 | | std::unordered_map<TableId, TableData>* tables, |
1496 | 47.5k | std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override { |
1497 | 320k | for (const auto& processed_table : processed_tables) { |
1498 | 320k | const auto table_it = tables->find(processed_table.first); |
1499 | 320k | if (table_it == tables->end()) { |
1500 | 0 | continue; |
1501 | 0 | } |
1502 | 320k | auto& table_data = table_it->second; |
1503 | | // This should be guaranteed by ProcessTabletLocations before we get here: |
1504 | 0 | DCHECK(table_data.partition_list->version == partition_group_start_.partition_list_version) |
1505 | 0 | << "table_data.partition_list->version: " << table_data.partition_list->version |
1506 | 0 | << " partition_group_start_.partition_list_version: " |
1507 | 0 | << partition_group_start_.partition_list_version; |
1508 | 320k | const auto lookup_by_group_iter = |
1509 | 320k | table_data.tablet_lookups_by_group.find(*partition_group_start_.key); |
1510 | 320k | if (lookup_by_group_iter != table_data.tablet_lookups_by_group.end()) { |
1511 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) |
1512 | 0 | << "Checking tablet_lookups_by_group for partition_group_start: " |
1513 | 0 | << Slice(*partition_group_start_.key).ToDebugHexString(); |
1514 | 47.5k | auto& lookups_group = lookup_by_group_iter->second; |
1515 | 117k | while (auto* lookup = lookups_group.lookups.Pop()) { |
1516 | 70.3k | auto remote_it = processed_table.second.find(*lookup->partition_start); |
1517 | 70.3k | auto lookup_visitor = LookupCallbackVisitor( |
1518 | 70.3k | remote_it != processed_table.second.end() ? remote_it->second : nullptr); |
1519 | 70.3k | to_notify->emplace_back(std::move(lookup->callback), lookup_visitor); |
1520 | 70.3k | delete lookup; |
1521 | 70.3k | } |
1522 | 47.5k | } |
1523 | 320k | } |
1524 | 47.5k | } |
1525 | | |
1526 | | void UpdateProcessedTable(const TabletLocationsPB& loc, |
1527 | | RemoteTabletPtr remote, |
1528 | 338k | ProcessedTablesMap::mapped_type* processed_table) override { |
1529 | 338k | processed_table->emplace(loc.partition().partition_key_start(), remote); |
1530 | 338k | } |
1531 | | |
1532 | | private: |
1533 | 47.5k | void ProcessResponse(const Status& status) override { |
1534 | 47.5k | DoProcessResponse(status, resp_); |
1535 | 47.5k | } |
1536 | | |
1537 | 47.6k | Status ResponseStatus() override { |
1538 | 47.6k | return StatusFromResp(resp_); |
1539 | 47.6k | } |
1540 | | |
1541 | 47.5k | Status VerifyResponse() override { |
1542 | | // Note: if LookupByIdRpc response has no partition list version this means this response |
1543 | | // is from master that doesn't support tablet splitting and it is OK to treat it as version 0 |
1544 | | // (and 0 return value of resp_.partition_list_version() is OK). |
1545 | 47.5k | const auto req_partition_list_version = partition_group_start_.partition_list_version; |
1546 | 47.5k | const auto resp_partition_list_version = resp_.partition_list_version(); |
1547 | | |
1548 | 0 | const auto versions_formatter = [&] { |
1549 | 0 | return Format( |
1550 | 0 | "RPC: $0, response partition_list_version: $1", this->ToString(), |
1551 | 0 | resp_partition_list_version); |
1552 | 0 | }; |
1553 | | |
1554 | 47.5k | if (resp_partition_list_version < req_partition_list_version) { |
1555 | | // This means an issue on master side, table partition list version shouldn't decrease on |
1556 | | // master. |
1557 | 0 | const auto msg = Format("Ignoring response for obsolete RPC call. $0", versions_formatter()); |
1558 | 0 | LOG_WITH_PREFIX(DFATAL) << msg; |
1559 | 0 | return STATUS(IllegalState, msg); |
1560 | 0 | } |
1561 | 47.5k | if (resp_partition_list_version > req_partition_list_version) { |
1562 | | // This request is for partition_group_start calculated based on obsolete table partition |
1563 | | // list. |
1564 | 0 | const auto msg = Format( |
1565 | 0 | "Cached table partition list version is obsolete, refresh required. $0", |
1566 | 0 | versions_formatter()); |
1567 | 0 | VLOG_WITH_PREFIX_AND_FUNC(3) << msg; |
1568 | 0 | return STATUS(TryAgain, msg, ClientError(ClientErrorCode::kTablePartitionListIsStale)); |
1569 | 0 | } |
1570 | | // resp_partition_list_version == req_partition_list_version |
1571 | 47.5k | return Status::OK(); |
1572 | 47.5k | } |
1573 | | |
1574 | 47.5k | void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS { |
1575 | 47.5k | const auto it = meta_cache()->tables_.find(table()->id()); |
1576 | 47.5k | if (it == meta_cache()->tables_.end()) { |
1577 | 0 | return; |
1578 | 0 | } |
1579 | 47.5k | auto& table_data = it->second; |
1580 | | // This should be guaranteed by ProcessTabletLocations before we get here: |
1581 | 0 | DCHECK(table_data.partition_list->version == partition_group_start_.partition_list_version) |
1582 | 0 | << "table_data.partition_list->version: " << table_data.partition_list->version |
1583 | 0 | << " partition_group_start_.partition_list_version: " |
1584 | 0 | << partition_group_start_.partition_list_version; |
1585 | 47.5k | const auto lookup_by_group_iter = table_data.tablet_lookups_by_group.find( |
1586 | 47.5k | *partition_group_start_.key); |
1587 | 47.5k | TablePartitionLookup tablet_partition_lookup(table()->id(), partition_group_start_); |
1588 | 47.5k | if (lookup_by_group_iter != table_data.tablet_lookups_by_group.end()) { |
1589 | 47.5k | lookup_by_group_iter->second.Finished(request_no(), tablet_partition_lookup, false); |
1590 | 0 | } else { |
1591 | 0 | LOG_WITH_PREFIX(INFO) << "Cleanup request for unknown partition group: " |
1592 | 0 | << tablet_partition_lookup.ToString(); |
1593 | 0 | } |
1594 | 47.5k | } |
1595 | | |
1596 | 85 | void NotifyFailure(const Status& status) override { |
1597 | 85 | meta_cache()->LookupByKeyFailed( |
1598 | 85 | table(), partition_group_start_, resp_.partition_list_version(), request_no(), status); |
1599 | 85 | } |
1600 | | |
1601 | | Status ProcessTabletLocations( |
1602 | | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations, |
1603 | 47.5k | boost::optional<PartitionListVersion> table_partition_list_version) override { |
1604 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(2) << "partition_group_start: " << partition_group_start_.ToString(); |
1605 | | // This condition is guaranteed by VerifyResponse function: |
1606 | 47.5k | CHECK(resp_.partition_list_version() == partition_group_start_.partition_list_version); |
1607 | | |
1608 | 47.5k | return meta_cache()->ProcessTabletLocations(locations, table_partition_list_version, this); |
1609 | 47.5k | } |
1610 | | |
1611 | | // Encoded partition group start key to lookup. |
1612 | | VersionedPartitionGroupStartKey partition_group_start_; |
1613 | | |
1614 | | // Request body. |
1615 | | GetTableLocationsRequestPB req_; |
1616 | | |
1617 | | // Response body. |
1618 | | GetTableLocationsResponsePB resp_; |
1619 | | }; |
1620 | | |
1621 | | void MetaCache::LookupByKeyFailed( |
1622 | | const std::shared_ptr<const YBTable>& table, |
1623 | | const VersionedPartitionGroupStartKey& partition_group_start, |
1624 | | PartitionListVersion response_partition_list_version, |
1625 | 85 | int64_t request_no, const Status& status) { |
1626 | 85 | const auto req_partition_list_version = partition_group_start.partition_list_version; |
1627 | 0 | VLOG_WITH_PREFIX(1) << "Lookup for table " << table->id() << " and partition group start " |
1628 | 0 | << Slice(*partition_group_start.key).ToDebugHexString() |
1629 | 0 | << ", request partition list version: " << req_partition_list_version |
1630 | 0 | << ", response partition list version: " << response_partition_list_version |
1631 | 0 | << " failed with: " << status; |
1632 | | |
1633 | 85 | CallbackNotifier notifier(status); |
1634 | 85 | CoarseTimePoint max_deadline; |
1635 | 85 | { |
1636 | 85 | std::lock_guard<decltype(mutex_)> lock(mutex_); |
1637 | 85 | auto it = tables_.find(table->id()); |
1638 | 85 | if (it == tables_.end()) { |
1639 | 0 | return; |
1640 | 0 | } |
1641 | | |
1642 | 85 | auto& table_data = it->second; |
1643 | 85 | const auto table_data_partition_list_version = table_data.partition_list->version; |
1644 | 0 | const auto versions_formatter = [&] { |
1645 | 0 | return Format( |
1646 | 0 | "MetaCache's table $0 partition list version: $1, stored in RPC call: $2, received: $3", |
1647 | 0 | table->id(), table_data_partition_list_version, |
1648 | 0 | req_partition_list_version, response_partition_list_version); |
1649 | 0 | }; |
1650 | | |
1651 | 85 | if (table_data_partition_list_version < req_partition_list_version) { |
1652 | | // MetaCache partition list version is older than stored in LookupByKeyRpc for which we've |
1653 | | // received an answer. |
1654 | | // This shouldn't happen, because MetaCache partition list version for each table couldn't |
1655 | | // decrease and we store it inside LookupByKeyRpc when creating an RPC. |
1656 | 0 | LOG_WITH_PREFIX(FATAL) << Format( |
1657 | 0 | "Cached table partition list version is older than stored in RPC call. $0", |
1658 | 0 | versions_formatter()); |
1659 | 85 | } else if (table_data_partition_list_version > req_partition_list_version) { |
1660 | | // MetaCache table partition list version has updated since we've sent this RPC. |
1661 | | // We've already failed and cleaned all registered lookups for the table on MetaCache table |
1662 | | // partition list update, so we should ignore responses as well. |
1663 | 0 | VLOG_WITH_PREFIX_AND_FUNC(3) << Format( |
1664 | 0 | "Cached table partition list version is newer than stored in RPC call, ignoring " |
1665 | 0 | "failure response. $0", versions_formatter()); |
1666 | 0 | return; |
1667 | 0 | } |
1668 | | |
1669 | | // Since table_data_partition_list_version == req_partition_list_version, |
1670 | | // we will get tablet lookups for this exact RPC call there: |
1671 | 85 | auto key_lookup_iterator = table_data.tablet_lookups_by_group.find(*partition_group_start.key); |
1672 | 85 | if (key_lookup_iterator != table_data.tablet_lookups_by_group.end()) { |
1673 | 85 | auto& lookup_data_group = key_lookup_iterator->second; |
1674 | 85 | max_deadline = LookupFailed(status, request_no, |
1675 | 85 | TablePartitionLookup(table->id(), partition_group_start), |
1676 | 85 | &lookup_data_group, ¬ifier); |
1677 | 85 | } |
1678 | 85 | } |
1679 | | |
1680 | 85 | if (max_deadline != CoarseTimePoint()) { |
1681 | 5 | auto rpc = std::make_shared<LookupByKeyRpc>( |
1682 | 5 | this, table, partition_group_start, request_no, max_deadline); |
1683 | 5 | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
1684 | 5 | } |
1685 | 85 | } |
1686 | | |
1687 | | void MetaCache::LookupFullTableFailed(const std::shared_ptr<const YBTable>& table, |
1688 | 0 | int64_t request_no, const Status& status) { |
1689 | 0 | VLOG_WITH_PREFIX(1) << "Lookup for table " << table->id() << " failed with: " << status; |
1690 | |
|
1691 | 0 | CallbackNotifier notifier(status); |
1692 | 0 | CoarseTimePoint max_deadline; |
1693 | 0 | { |
1694 | 0 | std::lock_guard<decltype(mutex_)> lock(mutex_); |
1695 | 0 | auto it = tables_.find(table->id()); |
1696 | 0 | if (it == tables_.end()) { |
1697 | 0 | return; |
1698 | 0 | } |
1699 | | |
1700 | 0 | max_deadline = LookupFailed(status, request_no, FullTableLookup(table->id()), |
1701 | 0 | &it->second.full_table_lookups, ¬ifier); |
1702 | 0 | } |
1703 | |
|
1704 | 0 | if (max_deadline != CoarseTimePoint()) { |
1705 | 0 | auto rpc = std::make_shared<LookupFullTableRpc>(this, table, request_no, max_deadline); |
1706 | 0 | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
1707 | 0 | } |
1708 | 0 | } |
1709 | | |
1710 | | void MetaCache::LookupByIdFailed( |
1711 | | const TabletId& tablet_id, |
1712 | | const std::shared_ptr<const YBTable>& table, |
1713 | | master::IncludeInactive include_inactive, |
1714 | | const boost::optional<PartitionListVersion>& response_partition_list_version, |
1715 | | int64_t request_no, |
1716 | 336 | const Status& status) { |
1717 | 0 | VLOG_WITH_PREFIX(1) << "Lookup for tablet " << tablet_id << ", failed with: " << status; |
1718 | | |
1719 | 336 | CallbackNotifier notifier(status); |
1720 | 336 | CoarseTimePoint max_deadline; |
1721 | 336 | { |
1722 | 336 | std::lock_guard<decltype(mutex_)> lock(mutex_); |
1723 | 336 | if (status.IsNotFound() && response_partition_list_version.has_value()) { |
1724 | 276 | auto tablet = LookupTabletByIdFastPathUnlocked(tablet_id); |
1725 | 276 | if (tablet) { |
1726 | 276 | const auto tablet_last_known_table_partition_list_version = |
1727 | 276 | tablet->GetLastKnownPartitionListVersion(); |
1728 | 276 | if (tablet_last_known_table_partition_list_version < |
1729 | 2 | response_partition_list_version.value()) { |
1730 | 2 | const auto msg_formatter = [&] { |
1731 | 2 | return Format( |
1732 | 2 | "Received table $0 ($1) partitions version: $2, last known by MetaCache for " |
1733 | 2 | "tablet $3: $4", |
1734 | 2 | table->id(), table->name(), response_partition_list_version, |
1735 | 2 | tablet_id, tablet_last_known_table_partition_list_version); |
1736 | 2 | }; |
1737 | 0 | VLOG_WITH_PREFIX_AND_FUNC(3) << msg_formatter(); |
1738 | 2 | notifier.SetStatus(STATUS( |
1739 | 2 | TryAgain, msg_formatter(), ClientError(ClientErrorCode::kTablePartitionListIsStale))); |
1740 | 2 | } |
1741 | 276 | } |
1742 | 276 | } |
1743 | | |
1744 | 336 | auto table_lookup_iterator = tablet_lookups_by_id_.find(tablet_id); |
1745 | 336 | if (table_lookup_iterator != tablet_lookups_by_id_.end()) { |
1746 | 336 | auto& lookup_data_group = table_lookup_iterator->second; |
1747 | 336 | max_deadline = LookupFailed(status, request_no, TabletIdLookup(tablet_id), |
1748 | 336 | &lookup_data_group, ¬ifier); |
1749 | 336 | } |
1750 | 336 | } |
1751 | | |
1752 | 336 | if (max_deadline != CoarseTimePoint()) { |
1753 | 0 | auto rpc = std::make_shared<LookupByIdRpc>( |
1754 | 0 | this, tablet_id, table, include_inactive, request_no, max_deadline, 0); |
1755 | 0 | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
1756 | 0 | } |
1757 | 336 | } |
1758 | | |
1759 | | RemoteTabletPtr MetaCache::LookupTabletByKeyFastPathUnlocked( |
1760 | 11.1M | const TableId& table_id, const VersionedPartitionStartKey& versioned_partition_start_key) { |
1761 | 11.1M | auto it = tables_.find(table_id); |
1762 | 11.1M | if (PREDICT_FALSE(it == tables_.end())) { |
1763 | | // No cache available for this table. |
1764 | 88.5k | return nullptr; |
1765 | 88.5k | } |
1766 | | |
1767 | 11.0M | const auto& table_data = it->second; |
1768 | | |
1769 | 11.0M | if (PREDICT_FALSE( |
1770 | 11.0M | table_data.partition_list->version != |
1771 | 0 | versioned_partition_start_key.partition_list_version)) { |
1772 | | // TableData::partition_list version in cache does not match partition_list_version used to |
1773 | | // calculate partition_key_start, can't use cache. |
1774 | 0 | return nullptr; |
1775 | 0 | } |
1776 | | |
1777 | 11.0M | const auto& partition_start_key = *versioned_partition_start_key.key; |
1778 | | |
1779 | 11.0M | DCHECK_EQ( |
1780 | 11.0M | partition_start_key, |
1781 | 11.0M | *client::FindPartitionStart(table_data.partition_list, partition_start_key)); |
1782 | 11.0M | auto tablet_it = table_data.tablets_by_partition.find(partition_start_key); |
1783 | 11.0M | if (PREDICT_FALSE(tablet_it == it->second.tablets_by_partition.end())) { |
1784 | | // No tablets with a start partition key lower than 'partition_key'. |
1785 | 23.7k | return nullptr; |
1786 | 23.7k | } |
1787 | | |
1788 | 11.0M | const auto& result = tablet_it->second; |
1789 | | |
1790 | | // Stale entries must be re-fetched. |
1791 | 11.0M | if (result->stale()) { |
1792 | 476 | return nullptr; |
1793 | 476 | } |
1794 | | |
1795 | 11.0M | if (result->partition().partition_key_end().compare(partition_start_key) > 0 || |
1796 | 11.0M | result->partition().partition_key_end().empty()) { |
1797 | | // partition_start_key < partition.end OR tablet does not end. |
1798 | 11.0M | return result; |
1799 | 11.0M | } |
1800 | | |
1801 | 9.02k | return nullptr; |
1802 | 9.02k | } |
1803 | | |
1804 | | boost::optional<std::vector<RemoteTabletPtr>> MetaCache::FastLookupAllTabletsUnlocked( |
1805 | 0 | const std::shared_ptr<const YBTable>& table) { |
1806 | 0 | auto tablets = std::vector<RemoteTabletPtr>(); |
1807 | 0 | auto it = tables_.find(table->id()); |
1808 | 0 | if (PREDICT_FALSE(it == tables_.end())) { |
1809 | | // No cache available for this table. |
1810 | 0 | return boost::none; |
1811 | 0 | } |
1812 | | |
1813 | 0 | for (const auto& tablet : it->second.all_tablets) { |
1814 | 0 | if (tablet->stale()) { |
1815 | 0 | return boost::none; |
1816 | 0 | } |
1817 | 0 | tablets.push_back(tablet); |
1818 | 0 | } |
1819 | |
|
1820 | 0 | if (tablets.empty()) { |
1821 | 0 | return boost::none; |
1822 | 0 | } |
1823 | 0 | return tablets; |
1824 | 0 | } |
1825 | | |
1826 | | // We disable thread safety analysis in this function due to manual conditional locking. |
1827 | | RemoteTabletPtr MetaCache::FastLookupTabletByKeyUnlocked( |
1828 | 11.1M | const TableId& table_id, const VersionedPartitionStartKey& partition_start) { |
1829 | | // Fast path: lookup in the cache. |
1830 | 11.1M | auto result = LookupTabletByKeyFastPathUnlocked(table_id, partition_start); |
1831 | 11.1M | if (result && result->HasLeader()) { |
1832 | 18.4E | VLOG_WITH_PREFIX(5) << "Fast lookup: found tablet " << result->tablet_id(); |
1833 | 11.0M | return result; |
1834 | 11.0M | } |
1835 | | |
1836 | 112k | return nullptr; |
1837 | 112k | } |
1838 | | |
1839 | | template <class Mutex> |
1840 | 99.4k | bool IsUniqueLock(const std::lock_guard<Mutex>*) { |
1841 | 99.4k | return true; |
1842 | 99.4k | } |
1843 | | |
1844 | | template <class Mutex> |
1845 | 55.1k | bool IsUniqueLock(const SharedLock<Mutex>*) { |
1846 | 55.1k | return false; |
1847 | 55.1k | } |
1848 | | |
1849 | | // partition_group_start should be not nullptr and points to PartitionGroupStartKeyPtr that will be |
1850 | | // initialized only if it is nullptr. This is an optimization to avoid recalculation of |
1851 | | // partition_group_start in subsequent call of this function (see MetaCache::LookupTabletByKey). |
1852 | | template <class Lock> |
1853 | | bool MetaCache::DoLookupTabletByKey( |
1854 | | const std::shared_ptr<const YBTable>& table, const VersionedTablePartitionListPtr& partitions, |
1855 | | const PartitionKeyPtr& partition_start, CoarseTimePoint deadline, |
1856 | 11.1M | LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) { |
1857 | 11.1M | DCHECK_ONLY_NOTNULL(partition_group_start); |
1858 | 11.1M | RemoteTabletPtr tablet; |
1859 | 11.1M | auto scope_exit = ScopeExit([callback, &tablet] { |
1860 | 11.1M | if (tablet) { |
1861 | 11.0M | (*callback)(tablet); |
1862 | 11.0M | } |
1863 | 11.1M | }); _ZZN2yb6client8internal9MetaCache19DoLookupTabletByKeyINS_10SharedLockINSt3__118shared_timed_mutexEEEEEbRKNS5_10shared_ptrIKNS0_7YBTableEEERKNS8_IKNS0_27VersionedTablePartitionListEEERKNS8_IKNS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEEENS5_6chrono10time_pointINS_15CoarseMonoClockENST_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEEPNS5_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEPSQ_ENKUlvE_clEv Line | Count | Source | 1859 | 11.1M | auto scope_exit = ScopeExit([callback, &tablet] { | 1860 | 11.1M | if (tablet) { | 1861 | 11.0M | (*callback)(tablet); | 1862 | 11.0M | } | 1863 | 11.1M | }); |
_ZZN2yb6client8internal9MetaCache19DoLookupTabletByKeyINSt3__110lock_guardINS4_18shared_timed_mutexEEEEEbRKNS4_10shared_ptrIKNS0_7YBTableEEERKNS8_IKNS0_27VersionedTablePartitionListEEERKNS8_IKNS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEEEENS4_6chrono10time_pointINS_15CoarseMonoClockENST_8durationIxNS4_5ratioILl1ELl1000000000EEEEEEEPNS4_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEPSQ_ENKUlvE_clEv Line | Count | Source | 1859 | 46.9k | auto scope_exit = ScopeExit([callback, &tablet] { | 1860 | 46.9k | if (tablet) { | 1861 | 5 | (*callback)(tablet); | 1862 | 5 | } | 1863 | 46.9k | }); |
|
1864 | 11.1M | int64_t request_no; |
1865 | 11.1M | { |
1866 | 11.1M | Lock lock(mutex_); |
1867 | 11.1M | tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version}); |
1868 | 11.1M | if (tablet) { |
1869 | 11.0M | return true; |
1870 | 11.0M | } |
1871 | | |
1872 | 102k | auto table_it = tables_.find(table->id()); |
1873 | 102k | TableData* table_data; |
1874 | 102k | if (table_it == tables_.end()) { |
1875 | 51 | VLOG_WITH_PREFIX_AND_FUNC(4) << Format( |
1876 | 51 | "missed table_id $0", table->id()); |
1877 | 88.5k | if (!IsUniqueLock(&lock)) { |
1878 | 44.0k | return false; |
1879 | 44.0k | } |
1880 | 44.4k | table_it = InitTableDataUnlocked(table->id(), partitions); |
1881 | 44.4k | } |
1882 | 58.7k | table_data = &table_it->second; |
1883 | | |
1884 | 58.7k | if (table_data->partition_list->version != partitions->version || |
1885 | 74.6k | (PREDICT_FALSE(RandomActWithProbability( |
1886 | 74.6k | FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) && |
1887 | 0 | table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE)) { |
1888 | 0 | (*callback)(STATUS( |
1889 | 0 | TryAgain, |
1890 | 0 | Format( |
1891 | 0 | "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, " |
1892 | 0 | "refresh required", |
1893 | 0 | table->ToString(), table_data->partition_list->version, partitions->version), |
1894 | 0 | ClientError(ClientErrorCode::kTablePartitionListIsStale))); |
1895 | 0 | return true; |
1896 | 0 | } |
1897 | | |
1898 | 72.0k | if (!*partition_group_start) { |
1899 | 72.0k | *partition_group_start = client::FindPartitionStart( |
1900 | 72.0k | partitions, *partition_start, kPartitionGroupSize); |
1901 | 72.0k | } |
1902 | | |
1903 | 58.7k | auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group; |
1904 | 58.7k | LookupDataGroup* lookups_group; |
1905 | 58.7k | { |
1906 | 58.7k | auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start); |
1907 | 58.7k | if (lookups_group_it == tablet_lookups_by_group.end()) { |
1908 | 49.5k | if (!IsUniqueLock(&lock)) { |
1909 | 2.64k | return false; |
1910 | 2.64k | } |
1911 | 46.9k | lookups_group = &tablet_lookups_by_group[**partition_group_start]; |
1912 | 9.18k | } else { |
1913 | 9.18k | lookups_group = &lookups_group_it->second; |
1914 | 9.18k | } |
1915 | 58.7k | } |
1916 | 56.1k | lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start)); |
1917 | 56.1k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); |
1918 | 56.1k | int64_t expected = 0; |
1919 | 56.1k | if (!lookups_group->running_request_number.compare_exchange_strong( |
1920 | 24.3k | expected, request_no, std::memory_order_acq_rel)) { |
1921 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(5) |
1922 | 18.4E | << "Lookup is already running for table: " << table->ToString() |
1923 | 18.4E | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() |
1924 | 18.4E | << ", partition_list_version: " << partitions->version |
1925 | 18.4E | << ", request_no: " << expected; |
1926 | 24.3k | return true; |
1927 | 24.3k | } |
1928 | 31.8k | } |
1929 | | |
1930 | 31.8k | auto rpc = std::make_shared<LookupByKeyRpc>( |
1931 | 31.8k | this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version}, |
1932 | 31.8k | request_no, deadline); |
1933 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) |
1934 | 18.4E | << "Started lookup for table: " << table->ToString() |
1935 | 18.4E | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() |
1936 | 18.4E | << ", rpc: " << AsString(rpc); |
1937 | 31.8k | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
1938 | 31.8k | return true; |
1939 | 31.8k | } _ZN2yb6client8internal9MetaCache19DoLookupTabletByKeyINS_10SharedLockINSt3__118shared_timed_mutexEEEEEbRKNS5_10shared_ptrIKNS0_7YBTableEEERKNS8_IKNS0_27VersionedTablePartitionListEEERKNS8_IKNS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEEENS5_6chrono10time_pointINS_15CoarseMonoClockENST_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEEPNS5_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEPSQ_ Line | Count | Source | 1856 | 11.0M | LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) { | 1857 | 11.0M | DCHECK_ONLY_NOTNULL(partition_group_start); | 1858 | 11.0M | RemoteTabletPtr tablet; | 1859 | 11.0M | auto scope_exit = ScopeExit([callback, &tablet] { | 1860 | 11.0M | if (tablet) { | 1861 | 11.0M | (*callback)(tablet); | 1862 | 11.0M | } | 1863 | 11.0M | }); | 1864 | 11.0M | int64_t request_no; | 1865 | 11.0M | { | 1866 | 11.0M | Lock lock(mutex_); | 1867 | 11.0M | tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version}); | 1868 | 11.0M | if (tablet) { | 1869 | 11.0M | return true; | 1870 | 11.0M | } | 1871 | | | 1872 | 55.9k | auto table_it = tables_.find(table->id()); | 1873 | 55.9k | TableData* table_data; | 1874 | 55.9k | if (table_it == tables_.end()) { | 1875 | 51 | VLOG_WITH_PREFIX_AND_FUNC(4) << Format( | 1876 | 51 | "missed table_id $0", table->id()); | 1877 | 44.2k | if (!IsUniqueLock(&lock)) { | 1878 | 44.0k | return false; | 1879 | 44.0k | } | 1880 | 168 | table_it = InitTableDataUnlocked(table->id(), partitions); | 1881 | 168 | } | 1882 | 11.8k | table_data = &table_it->second; | 1883 | | | 1884 | 11.8k | if (table_data->partition_list->version != partitions->version || | 1885 | 27.7k | (PREDICT_FALSE(RandomActWithProbability( | 1886 | 27.7k | FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) && | 1887 | 0 | table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE)) { | 1888 | 0 | (*callback)(STATUS( | 1889 | 0 | TryAgain, | 1890 | 0 | Format( | 1891 | 0 | "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, " | 1892 | 0 | "refresh required", | 1893 | 0 | table->ToString(), table_data->partition_list->version, partitions->version), | 1894 | 0 | ClientError(ClientErrorCode::kTablePartitionListIsStale))); | 1895 | 0 | return true; | 1896 | 0 | } | 1897 | | | 1898 | 27.6k | if (!*partition_group_start) { | 1899 | 27.6k | *partition_group_start = client::FindPartitionStart( | 1900 | 27.6k | partitions, *partition_start, kPartitionGroupSize); | 1901 | 27.6k | } | 1902 | | | 1903 | 11.8k | auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group; | 1904 | 11.8k | LookupDataGroup* lookups_group; | 1905 | 11.8k | { | 1906 | 11.8k | auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start); | 1907 | 11.8k | if (lookups_group_it == tablet_lookups_by_group.end()) { | 1908 | 2.64k | if (!IsUniqueLock(&lock)) { | 1909 | 2.64k | return false; | 1910 | 2.64k | } | 1911 | 0 | lookups_group = &tablet_lookups_by_group[**partition_group_start]; | 1912 | 9.21k | } else { | 1913 | 9.21k | lookups_group = &lookups_group_it->second; | 1914 | 9.21k | } | 1915 | 11.8k | } | 1916 | 9.21k | lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start)); | 1917 | 9.21k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); | 1918 | 9.21k | int64_t expected = 0; | 1919 | 9.21k | if (!lookups_group->running_request_number.compare_exchange_strong( | 1920 | 24.2k | expected, request_no, std::memory_order_acq_rel)) { | 1921 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(5) | 1922 | 18.4E | << "Lookup is already running for table: " << table->ToString() | 1923 | 18.4E | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() | 1924 | 18.4E | << ", partition_list_version: " << partitions->version | 1925 | 18.4E | << ", request_no: " << expected; | 1926 | 24.2k | return true; | 1927 | 24.2k | } | 1928 | 18.4E | } | 1929 | | | 1930 | 18.4E | auto rpc = std::make_shared<LookupByKeyRpc>( | 1931 | 18.4E | this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version}, | 1932 | 18.4E | request_no, deadline); | 1933 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) | 1934 | 18.4E | << "Started lookup for table: " << table->ToString() | 1935 | 18.4E | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() | 1936 | 18.4E | << ", rpc: " << AsString(rpc); | 1937 | 18.4E | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); | 1938 | 18.4E | return true; | 1939 | 18.4E | } |
_ZN2yb6client8internal9MetaCache19DoLookupTabletByKeyINSt3__110lock_guardINS4_18shared_timed_mutexEEEEEbRKNS4_10shared_ptrIKNS0_7YBTableEEERKNS8_IKNS0_27VersionedTablePartitionListEEERKNS8_IKNS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEEEENS4_6chrono10time_pointINS_15CoarseMonoClockENST_8durationIxNS4_5ratioILl1ELl1000000000EEEEEEEPNS4_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEPSQ_ Line | Count | Source | 1856 | 46.9k | LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) { | 1857 | 46.9k | DCHECK_ONLY_NOTNULL(partition_group_start); | 1858 | 46.9k | RemoteTabletPtr tablet; | 1859 | 46.9k | auto scope_exit = ScopeExit([callback, &tablet] { | 1860 | 46.9k | if (tablet) { | 1861 | 46.9k | (*callback)(tablet); | 1862 | 46.9k | } | 1863 | 46.9k | }); | 1864 | 46.9k | int64_t request_no; | 1865 | 46.9k | { | 1866 | 46.9k | Lock lock(mutex_); | 1867 | 46.9k | tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version}); | 1868 | 46.9k | if (tablet) { | 1869 | 5 | return true; | 1870 | 5 | } | 1871 | | | 1872 | 46.9k | auto table_it = tables_.find(table->id()); | 1873 | 46.9k | TableData* table_data; | 1874 | 46.9k | if (table_it == tables_.end()) { | 1875 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << Format( | 1876 | 0 | "missed table_id $0", table->id()); | 1877 | 44.2k | if (!IsUniqueLock(&lock)) { | 1878 | 0 | return false; | 1879 | 0 | } | 1880 | 44.2k | table_it = InitTableDataUnlocked(table->id(), partitions); | 1881 | 44.2k | } | 1882 | 46.9k | table_data = &table_it->second; | 1883 | | | 1884 | 46.9k | if (table_data->partition_list->version != partitions->version || | 1885 | 46.9k | (PREDICT_FALSE(RandomActWithProbability( | 1886 | 46.9k | FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) && | 1887 | 0 | table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE)) { | 1888 | 0 | (*callback)(STATUS( | 1889 | 0 | TryAgain, | 1890 | 0 | Format( | 1891 | 0 | "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, " | 1892 | 0 | "refresh required", | 1893 | 0 | table->ToString(), table_data->partition_list->version, partitions->version), | 1894 | 0 | ClientError(ClientErrorCode::kTablePartitionListIsStale))); | 1895 | 0 | return true; | 1896 | 0 | } | 1897 | | | 1898 | 46.9k | if (!*partition_group_start) { | 1899 | 44.3k | *partition_group_start = client::FindPartitionStart( | 1900 | 44.3k | partitions, *partition_start, kPartitionGroupSize); | 1901 | 44.3k | } | 1902 | | | 1903 | 46.9k | auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group; | 1904 | 46.9k | LookupDataGroup* lookups_group; | 1905 | 46.9k | { | 1906 | 46.9k | auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start); | 1907 | 46.9k | if (lookups_group_it == tablet_lookups_by_group.end()) { | 1908 | 46.9k | if (!IsUniqueLock(&lock)) { | 1909 | 0 | return false; | 1910 | 0 | } | 1911 | 46.9k | lookups_group = &tablet_lookups_by_group[**partition_group_start]; | 1912 | 18.4E | } else { | 1913 | 18.4E | lookups_group = &lookups_group_it->second; | 1914 | 18.4E | } | 1915 | 46.9k | } | 1916 | 46.9k | lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start)); | 1917 | 46.9k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); | 1918 | 46.9k | int64_t expected = 0; | 1919 | 46.9k | if (!lookups_group->running_request_number.compare_exchange_strong( | 1920 | 52 | expected, request_no, std::memory_order_acq_rel)) { | 1921 | 0 | VLOG_WITH_PREFIX_AND_FUNC(5) | 1922 | 0 | << "Lookup is already running for table: " << table->ToString() | 1923 | 0 | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() | 1924 | 0 | << ", partition_list_version: " << partitions->version | 1925 | 0 | << ", request_no: " << expected; | 1926 | 52 | return true; | 1927 | 52 | } | 1928 | 46.8k | } | 1929 | | | 1930 | 46.8k | auto rpc = std::make_shared<LookupByKeyRpc>( | 1931 | 46.8k | this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version}, | 1932 | 46.8k | request_no, deadline); | 1933 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) | 1934 | 18.4E | << "Started lookup for table: " << table->ToString() | 1935 | 18.4E | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() | 1936 | 18.4E | << ", rpc: " << AsString(rpc); | 1937 | 46.8k | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); | 1938 | 46.8k | return true; | 1939 | 46.8k | } |
|
1940 | | |
1941 | | template <class Lock> |
1942 | | bool MetaCache::DoLookupAllTablets(const std::shared_ptr<const YBTable>& table, |
1943 | | CoarseTimePoint deadline, |
1944 | 0 | LookupTabletRangeCallback* callback) { |
1945 | 0 | VLOG_WITH_PREFIX(3) << "DoLookupAllTablets() for table: " << table->ToString(); |
1946 | 0 | int64_t request_no; |
1947 | 0 | { |
1948 | 0 | Lock lock(mutex_); |
1949 | 0 | if (PREDICT_TRUE(!FLAGS_TEST_force_master_lookup_all_tablets)) { |
1950 | 0 | auto tablets = FastLookupAllTabletsUnlocked(table); |
1951 | 0 | if (tablets.has_value()) { |
1952 | 0 | VLOG_WITH_PREFIX(4) << "tablets has value"; |
1953 | 0 | (*callback)(*tablets); |
1954 | 0 | return true; |
1955 | 0 | } |
1956 | 0 | } |
1957 | | |
1958 | 0 | if (!IsUniqueLock(&lock)) { |
1959 | 0 | return false; |
1960 | 0 | } |
1961 | 0 | auto table_it = tables_.find(table->id()); |
1962 | 0 | if (table_it == tables_.end()) { |
1963 | 0 | table_it = InitTableDataUnlocked(table->id(), table->GetVersionedPartitions()); |
1964 | 0 | } |
1965 | 0 | auto& table_data = table_it->second; |
1966 | |
|
1967 | 0 | auto& full_table_lookups = table_data.full_table_lookups; |
1968 | 0 | full_table_lookups.lookups.Push(new LookupData(*callback, deadline, nullptr)); |
1969 | 0 | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); |
1970 | 0 | int64_t expected = 0; |
1971 | 0 | if (!full_table_lookups.running_request_number.compare_exchange_strong( |
1972 | 0 | expected, request_no, std::memory_order_acq_rel)) { |
1973 | 0 | VLOG_WITH_PREFIX_AND_FUNC(5) |
1974 | 0 | << "Lookup is already running for table: " << table->ToString(); |
1975 | 0 | return true; |
1976 | 0 | } |
1977 | 0 | } |
1978 | | |
1979 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) |
1980 | 0 | << "Start lookup for table: " << table->ToString(); |
1981 | |
|
1982 | 0 | auto rpc = std::make_shared<LookupFullTableRpc>(this, table, request_no, deadline); |
1983 | 0 | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
1984 | 0 | return true; |
1985 | 0 | } Unexecuted instantiation: _ZN2yb6client8internal9MetaCache18DoLookupAllTabletsINS_10SharedLockINSt3__118shared_timed_mutexEEEEEbRKNS5_10shared_ptrIKNS0_7YBTableEEENS5_6chrono10time_pointINS_15CoarseMonoClockENSE_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEEPNS5_8functionIFvRKNS_6ResultINS5_6vectorI13scoped_refptrINS1_12RemoteTabletEENS5_9allocatorISR_EEEEEEEEE Unexecuted instantiation: _ZN2yb6client8internal9MetaCache18DoLookupAllTabletsINSt3__110lock_guardINS4_18shared_timed_mutexEEEEEbRKNS4_10shared_ptrIKNS0_7YBTableEEENS4_6chrono10time_pointINS_15CoarseMonoClockENSE_8durationIxNS4_5ratioILl1ELl1000000000EEEEEEEPNS4_8functionIFvRKNS_6ResultINS4_6vectorI13scoped_refptrINS1_12RemoteTabletEENS4_9allocatorISR_EEEEEEEEE |
1986 | | |
1987 | | // We disable thread safety analysis in this function due to manual conditional locking. |
1988 | | void MetaCache::LookupTabletByKey(const std::shared_ptr<YBTable>& table, |
1989 | | const PartitionKey& partition_key, |
1990 | | CoarseTimePoint deadline, |
1991 | 11.0M | LookupTabletCallback callback) { |
1992 | 11.0M | if (table->ArePartitionsStale()) { |
1993 | 16.0k | table->RefreshPartitions(client_, [this, table, partition_key, deadline, |
1994 | 16.0k | callback = std::move(callback)](const Status& status) { |
1995 | 16.0k | if (!status.ok()) { |
1996 | 0 | callback(status); |
1997 | 0 | return; |
1998 | 0 | } |
1999 | 16.0k | InvalidateTableCache(*table); |
2000 | 16.0k | LookupTabletByKey(table, partition_key, deadline, std::move(callback)); |
2001 | 16.0k | }); |
2002 | 16.0k | return; |
2003 | 16.0k | } |
2004 | | |
2005 | 11.0M | const auto table_partition_list = table->GetVersionedPartitions(); |
2006 | 11.0M | const auto partition_start = client::FindPartitionStart(table_partition_list, partition_key); |
2007 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(5) << "Table: " << table->ToString() |
2008 | 18.4E | << ", table_partition_list: " << table_partition_list->ToString() |
2009 | 18.4E | << ", partition_key: " << Slice(partition_key).ToDebugHexString() |
2010 | 18.4E | << ", partition_start: " << Slice(*partition_start).ToDebugHexString(); |
2011 | | |
2012 | 11.0M | PartitionGroupStartKeyPtr partition_group_start; |
2013 | 11.0M | if (DoLookupTabletByKey<SharedLock<std::shared_timed_mutex>>( |
2014 | 11.0M | table, table_partition_list, partition_start, deadline, &callback, |
2015 | 11.0M | &partition_group_start)) { |
2016 | 11.0M | return; |
2017 | 11.0M | } |
2018 | | |
2019 | 13.7k | bool result = DoLookupTabletByKey<std::lock_guard<std::shared_timed_mutex>>( |
2020 | 13.7k | table, table_partition_list, partition_start, deadline, &callback, &partition_group_start); |
2021 | 18.4E | LOG_IF(DFATAL, !result) |
2022 | 18.4E | << "Lookup was not started for table " << table->ToString() |
2023 | 18.4E | << ", partition_key: " << Slice(partition_key).ToDebugHexString(); |
2024 | 13.7k | } |
2025 | | |
2026 | | void MetaCache::LookupAllTablets(const std::shared_ptr<const YBTable>& table, |
2027 | | CoarseTimePoint deadline, |
2028 | 0 | LookupTabletRangeCallback callback) { |
2029 | | // We first want to check the cache in read-only mode, and only if we can't find anything |
2030 | | // do a lookup in write mode. |
2031 | 0 | if (DoLookupAllTablets<SharedLock<std::shared_timed_mutex>>(table, deadline, &callback)) { |
2032 | 0 | return; |
2033 | 0 | } |
2034 | | |
2035 | 0 | bool result = DoLookupAllTablets<std::lock_guard<std::shared_timed_mutex>>( |
2036 | 0 | table, deadline, &callback); |
2037 | 0 | LOG_IF(DFATAL, !result) |
2038 | 0 | << "Full table lookup was not started for table " << table->ToString(); |
2039 | 0 | } |
2040 | | |
2041 | 1.18M | RemoteTabletPtr MetaCache::LookupTabletByIdFastPathUnlocked(const TabletId& tablet_id) { |
2042 | 1.18M | auto it = tablets_by_id_.find(tablet_id); |
2043 | 1.18M | if (it != tablets_by_id_.end()) { |
2044 | 1.16M | return it->second; |
2045 | 1.16M | } |
2046 | 18.0k | return nullptr; |
2047 | 18.0k | } |
2048 | | |
2049 | | template <class Lock> |
2050 | | bool MetaCache::DoLookupTabletById( |
2051 | | const TabletId& tablet_id, |
2052 | | const std::shared_ptr<const YBTable>& table, |
2053 | | master::IncludeInactive include_inactive, |
2054 | | CoarseTimePoint deadline, |
2055 | | UseCache use_cache, |
2056 | 1.17M | LookupTabletCallback* callback) { |
2057 | 1.17M | RemoteTabletPtr tablet; |
2058 | 1.18M | auto scope_exit = ScopeExit([callback, &tablet] { |
2059 | 1.18M | if (tablet) { |
2060 | 1.14M | (*callback)(tablet); |
2061 | 1.14M | } |
2062 | 1.18M | }); _ZZN2yb6client8internal9MetaCache18DoLookupTabletByIdINS_10SharedLockINSt3__118shared_timed_mutexEEEEEbRKNS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEERKNS5_10shared_ptrIKNS0_7YBTableEEENS_17StronglyTypedBoolINS_6master19IncludeInactive_TagEEENS5_6chrono10time_pointINS_15CoarseMonoClockENSQ_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEENSM_INS0_12UseCache_TagEEEPNS5_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEENKUlvE_clEv Line | Count | Source | 2058 | 1.17M | auto scope_exit = ScopeExit([callback, &tablet] { | 2059 | 1.17M | if (tablet) { | 2060 | 1.14M | (*callback)(tablet); | 2061 | 1.14M | } | 2062 | 1.17M | }); |
_ZZN2yb6client8internal9MetaCache18DoLookupTabletByIdINSt3__110lock_guardINS4_18shared_timed_mutexEEEEEbRKNS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEERKNS4_10shared_ptrIKNS0_7YBTableEEENS_17StronglyTypedBoolINS_6master19IncludeInactive_TagEEENS4_6chrono10time_pointINS_15CoarseMonoClockENSQ_8durationIxNS4_5ratioILl1ELl1000000000EEEEEEENSM_INS0_12UseCache_TagEEEPNS4_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEEENKUlvE_clEv Line | Count | Source | 2058 | 8.31k | auto scope_exit = ScopeExit([callback, &tablet] { | 2059 | 8.31k | if (tablet) { | 2060 | 0 | (*callback)(tablet); | 2061 | 0 | } | 2062 | 8.31k | }); |
|
2063 | 1.17M | int64_t request_no; |
2064 | 1.17M | int64_t lookups_without_new_replicas = 0; |
2065 | 1.17M | { |
2066 | 1.17M | Lock lock(mutex_); |
2067 | | |
2068 | | // Fast path: lookup in the cache. |
2069 | 1.17M | tablet = LookupTabletByIdFastPathUnlocked(tablet_id); |
2070 | 1.17M | if (tablet) { |
2071 | 126 | VLOG_WITH_PREFIX(5) << "Fast lookup: candidate tablet " << AsString(tablet); |
2072 | 1.16M | if (use_cache && tablet->HasLeader()) { |
2073 | | // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with |
2074 | | // tablet_id is found on all replicas. |
2075 | 74 | VLOG_WITH_PREFIX(5) << "Fast lookup: found tablet " << tablet->tablet_id(); |
2076 | 1.14M | return true; |
2077 | 1.14M | } |
2078 | 15.8k | lookups_without_new_replicas = tablet->lookups_without_new_replicas(); |
2079 | 15.8k | tablet = nullptr; |
2080 | 15.8k | } |
2081 | | |
2082 | 30.4k | LookupDataGroup* lookup; |
2083 | 30.4k | { |
2084 | 30.4k | auto lookup_it = tablet_lookups_by_id_.find(tablet_id); |
2085 | 30.4k | if (lookup_it == tablet_lookups_by_id_.end()) { |
2086 | 16.3k | if (!IsUniqueLock(&lock)) { |
2087 | 8.22k | return false; |
2088 | 8.22k | } |
2089 | 8.14k | lookup = &tablet_lookups_by_id_[tablet_id]; |
2090 | 14.1k | } else { |
2091 | 14.1k | lookup = &lookup_it->second; |
2092 | 14.1k | } |
2093 | 30.4k | } |
2094 | 22.2k | lookup->lookups.Push(new LookupData(*callback, deadline, nullptr)); |
2095 | 22.2k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); |
2096 | 22.2k | int64_t expected = 0; |
2097 | 22.2k | if (!lookup->running_request_number.compare_exchange_strong( |
2098 | 15.1k | expected, request_no, std::memory_order_acq_rel)) { |
2099 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(5) << "Lookup already running for tablet: " << tablet_id; |
2100 | 15.1k | return true; |
2101 | 15.1k | } |
2102 | 7.09k | } |
2103 | | |
2104 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) << "Start lookup for tablet " << tablet_id << ": " << request_no; |
2105 | | |
2106 | 7.09k | auto rpc = std::make_shared<LookupByIdRpc>( |
2107 | 7.09k | this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas); |
2108 | 7.09k | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
2109 | 7.09k | return true; |
2110 | 7.09k | } _ZN2yb6client8internal9MetaCache18DoLookupTabletByIdINS_10SharedLockINSt3__118shared_timed_mutexEEEEEbRKNS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEERKNS5_10shared_ptrIKNS0_7YBTableEEENS_17StronglyTypedBoolINS_6master19IncludeInactive_TagEEENS5_6chrono10time_pointINS_15CoarseMonoClockENSQ_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEENSM_INS0_12UseCache_TagEEEPNS5_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEE Line | Count | Source | 2056 | 1.17M | LookupTabletCallback* callback) { | 2057 | 1.17M | RemoteTabletPtr tablet; | 2058 | 1.17M | auto scope_exit = ScopeExit([callback, &tablet] { | 2059 | 1.17M | if (tablet) { | 2060 | 1.17M | (*callback)(tablet); | 2061 | 1.17M | } | 2062 | 1.17M | }); | 2063 | 1.17M | int64_t request_no; | 2064 | 1.17M | int64_t lookups_without_new_replicas = 0; | 2065 | 1.17M | { | 2066 | 1.17M | Lock lock(mutex_); | 2067 | | | 2068 | | // Fast path: lookup in the cache. | 2069 | 1.17M | tablet = LookupTabletByIdFastPathUnlocked(tablet_id); | 2070 | 1.17M | if (tablet) { | 2071 | 126 | VLOG_WITH_PREFIX(5) << "Fast lookup: candidate tablet " << AsString(tablet); | 2072 | 1.16M | if (use_cache && tablet->HasLeader()) { | 2073 | | // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with | 2074 | | // tablet_id is found on all replicas. | 2075 | 74 | VLOG_WITH_PREFIX(5) << "Fast lookup: found tablet " << tablet->tablet_id(); | 2076 | 1.14M | return true; | 2077 | 1.14M | } | 2078 | 15.2k | lookups_without_new_replicas = tablet->lookups_without_new_replicas(); | 2079 | 15.2k | tablet = nullptr; | 2080 | 15.2k | } | 2081 | | | 2082 | 22.1k | LookupDataGroup* lookup; | 2083 | 22.1k | { | 2084 | 22.1k | auto lookup_it = tablet_lookups_by_id_.find(tablet_id); | 2085 | 22.1k | if (lookup_it == tablet_lookups_by_id_.end()) { | 2086 | 8.22k | if (!IsUniqueLock(&lock)) { | 2087 | 8.22k | return false; | 2088 | 8.22k | } | 2089 | 18.4E | lookup = &tablet_lookups_by_id_[tablet_id]; | 2090 | 14.0k | } else { | 2091 | 14.0k | lookup = &lookup_it->second; | 2092 | 14.0k | } | 2093 | 22.1k | } | 2094 | 13.9k | lookup->lookups.Push(new LookupData(*callback, deadline, nullptr)); | 2095 | 13.9k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); | 2096 | 13.9k | int64_t expected = 0; | 2097 | 13.9k | if (!lookup->running_request_number.compare_exchange_strong( | 2098 | 15.0k | expected, request_no, std::memory_order_acq_rel)) { | 2099 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(5) << "Lookup already running for tablet: " << tablet_id; | 2100 | 15.0k | return true; | 2101 | 15.0k | } | 2102 | 18.4E | } | 2103 | | | 2104 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) << "Start lookup for tablet " << tablet_id << ": " << request_no; | 2105 | | | 2106 | 18.4E | auto rpc = std::make_shared<LookupByIdRpc>( | 2107 | 18.4E | this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas); | 2108 | 18.4E | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); | 2109 | 18.4E | return true; | 2110 | 18.4E | } |
_ZN2yb6client8internal9MetaCache18DoLookupTabletByIdINSt3__110lock_guardINS4_18shared_timed_mutexEEEEEbRKNS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEERKNS4_10shared_ptrIKNS0_7YBTableEEENS_17StronglyTypedBoolINS_6master19IncludeInactive_TagEEENS4_6chrono10time_pointINS_15CoarseMonoClockENSQ_8durationIxNS4_5ratioILl1ELl1000000000EEEEEEENSM_INS0_12UseCache_TagEEEPNS4_8functionIFvRKNS_6ResultI13scoped_refptrINS1_12RemoteTabletEEEEEEE Line | Count | Source | 2056 | 8.28k | LookupTabletCallback* callback) { | 2057 | 8.28k | RemoteTabletPtr tablet; | 2058 | 8.28k | auto scope_exit = ScopeExit([callback, &tablet] { | 2059 | 8.28k | if (tablet) { | 2060 | 8.28k | (*callback)(tablet); | 2061 | 8.28k | } | 2062 | 8.28k | }); | 2063 | 8.28k | int64_t request_no; | 2064 | 8.28k | int64_t lookups_without_new_replicas = 0; | 2065 | 8.28k | { | 2066 | 8.28k | Lock lock(mutex_); | 2067 | | | 2068 | | // Fast path: lookup in the cache. | 2069 | 8.28k | tablet = LookupTabletByIdFastPathUnlocked(tablet_id); | 2070 | 8.28k | if (tablet) { | 2071 | 0 | VLOG_WITH_PREFIX(5) << "Fast lookup: candidate tablet " << AsString(tablet); | 2072 | 615 | if (use_cache && tablet->HasLeader()) { | 2073 | | // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with | 2074 | | // tablet_id is found on all replicas. | 2075 | 0 | VLOG_WITH_PREFIX(5) << "Fast lookup: found tablet " << tablet->tablet_id(); | 2076 | 0 | return true; | 2077 | 0 | } | 2078 | 615 | lookups_without_new_replicas = tablet->lookups_without_new_replicas(); | 2079 | 615 | tablet = nullptr; | 2080 | 615 | } | 2081 | | | 2082 | 8.28k | LookupDataGroup* lookup; | 2083 | 8.28k | { | 2084 | 8.28k | auto lookup_it = tablet_lookups_by_id_.find(tablet_id); | 2085 | 8.28k | if (lookup_it == tablet_lookups_by_id_.end()) { | 2086 | 8.22k | if (!IsUniqueLock(&lock)) { | 2087 | 0 | return false; | 2088 | 0 | } | 2089 | 8.22k | lookup = &tablet_lookups_by_id_[tablet_id]; | 2090 | 59 | } else { | 2091 | 59 | lookup = &lookup_it->second; | 2092 | 59 | } | 2093 | 8.28k | } | 2094 | 8.28k | lookup->lookups.Push(new LookupData(*callback, deadline, nullptr)); | 2095 | 8.28k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); | 2096 | 8.28k | int64_t expected = 0; | 2097 | 8.28k | if (!lookup->running_request_number.compare_exchange_strong( | 2098 | 89 | expected, request_no, std::memory_order_acq_rel)) { | 2099 | 0 | VLOG_WITH_PREFIX_AND_FUNC(5) << "Lookup already running for tablet: " << tablet_id; | 2100 | 89 | return true; | 2101 | 89 | } | 2102 | 8.19k | } | 2103 | | | 2104 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) << "Start lookup for tablet " << tablet_id << ": " << request_no; | 2105 | | | 2106 | 8.19k | auto rpc = std::make_shared<LookupByIdRpc>( | 2107 | 8.19k | this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas); | 2108 | 8.19k | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); | 2109 | 8.19k | return true; | 2110 | 8.19k | } |
|
2111 | | |
2112 | | void MetaCache::LookupTabletById(const TabletId& tablet_id, |
2113 | | const std::shared_ptr<const YBTable>& table, |
2114 | | master::IncludeInactive include_inactive, |
2115 | | CoarseTimePoint deadline, |
2116 | | LookupTabletCallback callback, |
2117 | 1.17M | UseCache use_cache) { |
2118 | 1.02k | VLOG_WITH_PREFIX_AND_FUNC(5) << "(" << tablet_id << ", " << use_cache << ")"; |
2119 | | |
2120 | 1.17M | if (DoLookupTabletById<SharedLock<decltype(mutex_)>>( |
2121 | 1.16M | tablet_id, table, include_inactive, deadline, use_cache, &callback)) { |
2122 | 1.16M | return; |
2123 | 1.16M | } |
2124 | | |
2125 | 7.73k | auto result = DoLookupTabletById<std::lock_guard<decltype(mutex_)>>( |
2126 | 7.73k | tablet_id, table, include_inactive, deadline, use_cache, &callback); |
2127 | 18.4E | LOG_IF(DFATAL, !result) << "Lookup was not started for tablet " << tablet_id; |
2128 | 7.73k | } |
2129 | | |
2130 | | void MetaCache::MarkTSFailed(RemoteTabletServer* ts, |
2131 | 0 | const Status& status) { |
2132 | 0 | LOG_WITH_PREFIX(INFO) << "Marking tablet server " << ts->ToString() << " as failed."; |
2133 | 0 | SharedLock<decltype(mutex_)> lock(mutex_); |
2134 | |
|
2135 | 0 | Status ts_status = status.CloneAndPrepend("TS failed"); |
2136 | | |
2137 | | // TODO: replace with a ts->tablet multimap for faster lookup? |
2138 | 0 | for (const auto& tablet : tablets_by_id_) { |
2139 | | // We just loop on all tablets; if a tablet does not have a replica on this |
2140 | | // TS, MarkReplicaFailed() returns false and we ignore the return value. |
2141 | 0 | tablet.second->MarkReplicaFailed(ts, ts_status); |
2142 | 0 | } |
2143 | 0 | } |
2144 | | |
2145 | 56.6k | bool MetaCache::AcquireMasterLookupPermit() { |
2146 | 56.6k | return master_lookup_sem_.TryAcquire(); |
2147 | 56.6k | } |
2148 | | |
2149 | 56.5k | void MetaCache::ReleaseMasterLookupPermit() { |
2150 | 56.5k | master_lookup_sem_.Release(); |
2151 | 56.5k | } |
2152 | | |
2153 | | std::future<Result<internal::RemoteTabletPtr>> MetaCache::LookupTabletByKeyFuture( |
2154 | | const std::shared_ptr<YBTable>& table, |
2155 | | const PartitionKey& partition_key, |
2156 | 0 | CoarseTimePoint deadline) { |
2157 | 0 | return MakeFuture<Result<internal::RemoteTabletPtr>>([&](auto callback) { |
2158 | 0 | this->LookupTabletByKey(table, partition_key, deadline, std::move(callback)); |
2159 | 0 | }); |
2160 | 0 | } |
2161 | | |
2162 | 153 | LookupDataGroup::~LookupDataGroup() { |
2163 | 153 | std::vector<LookupData*> leftovers; |
2164 | 153 | while (auto* d = lookups.Pop()) { |
2165 | 0 | leftovers.push_back(d); |
2166 | 0 | } |
2167 | 153 | if (!leftovers.empty()) { |
2168 | 0 | LOG(DFATAL) << Format( |
2169 | 0 | "Destructing LookupDataGroup($0), running_request_number: $1 with non empty lookups: $2", |
2170 | 0 | static_cast<void*>(this), running_request_number, leftovers); |
2171 | 0 | } |
2172 | 153 | } |
2173 | | |
2174 | | void LookupDataGroup::Finished( |
2175 | 56.6k | int64_t request_no, const ToStringable& id, bool allow_absence) { |
2176 | 56.6k | int64_t expected = request_no; |
2177 | 56.6k | if (running_request_number.compare_exchange_strong(expected, 0, std::memory_order_acq_rel)) { |
2178 | 56.6k | max_completed_request_number = std::max(max_completed_request_number, request_no); |
2179 | 0 | VLOG_WITH_FUNC(2) << "Finished lookup for " << id.ToString() << ", no: " << request_no; |
2180 | 56.6k | return; |
2181 | 56.6k | } |
2182 | | |
2183 | 0 | if ((expected == 0 && max_completed_request_number <= request_no) && !allow_absence) { |
2184 | 0 | LOG(DFATAL) << "Lookup was not running for " << id.ToString() << ", expected: " << request_no; |
2185 | 0 | return; |
2186 | 0 | } |
2187 | | |
2188 | 0 | LOG(INFO) |
2189 | 0 | << "Finished lookup for " << id.ToString() << ": " << request_no << ", while " |
2190 | 0 | << expected << " was running, could happen during tablet split"; |
2191 | 0 | } |
2192 | | |
2193 | | TableData::TableData(const VersionedTablePartitionListPtr& partition_list_) |
2194 | 309k | : partition_list(partition_list_) { |
2195 | 309k | DCHECK_ONLY_NOTNULL(partition_list); |
2196 | 309k | } |
2197 | | |
2198 | 0 | std::string VersionedPartitionStartKey::ToString() const { |
2199 | 0 | return YB_STRUCT_TO_STRING(key, partition_list_version); |
2200 | 0 | } |
2201 | | |
2202 | 13 | std::string RemoteReplica::ToString() const { |
2203 | 13 | return Format("$0 ($1, $2)", |
2204 | 13 | ts->permanent_uuid(), |
2205 | 13 | PeerRole_Name(role), |
2206 | 13 | Failed() ? "FAILED" : "OK"); |
2207 | 13 | } |
2208 | | |
2209 | | } // namespace internal |
2210 | | } // namespace client |
2211 | | } // namespace yb |