/Users/deen/code/yugabyte-db/src/yb/client/meta_cache.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/client/meta_cache.h" |
34 | | |
35 | | #include <stdint.h> |
36 | | |
37 | | #include <atomic> |
38 | | #include <list> |
39 | | #include <memory> |
40 | | #include <shared_mutex> |
41 | | #include <string> |
42 | | #include <unordered_map> |
43 | | #include <unordered_set> |
44 | | #include <vector> |
45 | | |
46 | | #include <boost/optional/optional_io.hpp> |
47 | | #include <glog/logging.h> |
48 | | |
49 | | #include "yb/client/client.h" |
50 | | #include "yb/client/client_error.h" |
51 | | #include "yb/client/client_master_rpc.h" |
52 | | #include "yb/client/client-internal.h" |
53 | | #include "yb/client/schema.h" |
54 | | #include "yb/client/table.h" |
55 | | #include "yb/client/yb_table_name.h" |
56 | | |
57 | | #include "yb/common/wire_protocol.h" |
58 | | |
59 | | #include "yb/gutil/map-util.h" |
60 | | #include "yb/gutil/ref_counted.h" |
61 | | #include "yb/gutil/strings/substitute.h" |
62 | | |
63 | | #include "yb/master/master_client.proxy.h" |
64 | | |
65 | | #include "yb/rpc/rpc_fwd.h" |
66 | | |
67 | | #include "yb/tserver/local_tablet_server.h" |
68 | | #include "yb/tserver/tserver_service.proxy.h" |
69 | | |
70 | | #include "yb/util/async_util.h" |
71 | | #include "yb/util/atomic.h" |
72 | | #include "yb/util/flag_tags.h" |
73 | | #include "yb/util/locks.h" |
74 | | #include "yb/util/logging.h" |
75 | | #include "yb/util/metrics.h" |
76 | | #include "yb/util/monotime.h" |
77 | | #include "yb/util/net/dns_resolver.h" |
78 | | #include "yb/util/net/net_util.h" |
79 | | #include "yb/util/net/sockaddr.h" |
80 | | #include "yb/util/random_util.h" |
81 | | #include "yb/util/result.h" |
82 | | #include "yb/util/scope_exit.h" |
83 | | #include "yb/util/shared_lock.h" |
84 | | #include "yb/util/status_format.h" |
85 | | #include "yb/util/unique_lock.h" |
86 | | |
87 | | using std::map; |
88 | | using std::shared_ptr; |
89 | | using strings::Substitute; |
90 | | using namespace std::literals; |
91 | | using namespace std::placeholders; |
92 | | |
93 | | DEFINE_int32(max_concurrent_master_lookups, 500, |
94 | | "Maximum number of concurrent tablet location lookups from YB client to master"); |
95 | | |
96 | | DEFINE_test_flag(bool, verify_all_replicas_alive, false, |
97 | | "If set, when a RemoteTablet object is destroyed, we will verify that all its " |
98 | | "replicas are not marked as failed"); |
99 | | |
100 | | DEFINE_int32(retry_failed_replica_ms, 60 * 1000, |
101 | | "Time in milliseconds to wait for before retrying a failed replica"); |
102 | | |
103 | | DEFINE_int64(meta_cache_lookup_throttling_step_ms, 5, |
104 | | "Step to increment delay between calls during lookup throttling."); |
105 | | |
106 | | DEFINE_int64(meta_cache_lookup_throttling_max_delay_ms, 1000, |
107 | | "Max delay between calls during lookup throttling."); |
108 | | |
109 | | DEFINE_test_flag(bool, force_master_lookup_all_tablets, false, |
110 | | "If set, force the client to go to the master for all tablet lookup " |
111 | | "instead of reading from cache."); |
112 | | |
113 | | DEFINE_test_flag(double, simulate_lookup_timeout_probability, 0, |
114 | | "If set, mark an RPC as failed and force retry on the first attempt."); |
115 | | DEFINE_test_flag(double, simulate_lookup_partition_list_mismatch_probability, 0, |
116 | | "Probability for simulating the partition list mismatch error on tablet lookup."); |
117 | | |
118 | | METRIC_DEFINE_coarse_histogram( |
119 | | server, dns_resolve_latency_during_init_proxy, |
120 | | "yb.client.MetaCache.InitProxy DNS Resolve", |
121 | | yb::MetricUnit::kMicroseconds, |
122 | | "Microseconds spent resolving DNS requests during MetaCache::InitProxy"); |
123 | | |
124 | | DECLARE_string(placement_cloud); |
125 | | DECLARE_string(placement_region); |
126 | | |
127 | | namespace yb { |
128 | | |
129 | | using consensus::RaftPeerPB; |
130 | | using master::GetTableLocationsRequestPB; |
131 | | using master::GetTableLocationsResponsePB; |
132 | | using master::TabletLocationsPB; |
133 | | using master::TabletLocationsPB_ReplicaPB; |
134 | | using master::TSInfoPB; |
135 | | using rpc::Messenger; |
136 | | using rpc::Rpc; |
137 | | using tablet::RaftGroupStatePB; |
138 | | using tserver::LocalTabletServer; |
139 | | using tserver::TabletServerServiceProxy; |
140 | | using tserver::TabletServerForwardServiceProxy; |
141 | | |
142 | | namespace client { |
143 | | |
144 | | namespace internal { |
145 | | |
146 | | using ProcessedTablesMap = |
147 | | std::unordered_map<TableId, std::unordered_map<PartitionKey, RemoteTabletPtr>>; |
148 | | |
149 | | namespace { |
150 | | |
151 | | // We join tablet partitions to groups, so tablet state info in one group requested with single |
152 | | // RPC call to master. |
153 | | // kPartitionGroupSize defines size of this group. |
154 | | #ifdef NDEBUG |
155 | | const size_t kPartitionGroupSize = 64; |
156 | | #else |
157 | | const size_t kPartitionGroupSize = 4; |
158 | | #endif |
159 | | |
160 | | std::atomic<int64_t> lookup_serial_{1}; |
161 | | |
162 | | } // namespace |
163 | | |
164 | 0 | int64_t TEST_GetLookupSerial() { |
165 | 0 | return lookup_serial_.load(std::memory_order_acquire); |
166 | 0 | } |
167 | | |
168 | | //////////////////////////////////////////////////////////// |
169 | | |
170 | | RemoteTabletServer::RemoteTabletServer(const master::TSInfoPB& pb) |
171 | 22.6k | : uuid_(pb.permanent_uuid()) { |
172 | 22.6k | Update(pb); |
173 | 22.6k | } |
174 | | |
175 | | RemoteTabletServer::RemoteTabletServer(const string& uuid, |
176 | | const shared_ptr<TabletServerServiceProxy>& proxy, |
177 | | const LocalTabletServer* local_tserver) |
178 | | : uuid_(uuid), |
179 | | proxy_(proxy), |
180 | 7.97k | local_tserver_(local_tserver) { |
181 | 7.97k | LOG_IF(DFATAL, proxy && !IsLocal()) << "Local tserver has non-local proxy"0 ; |
182 | 7.97k | } |
183 | | |
184 | 163 | RemoteTabletServer::~RemoteTabletServer() = default; |
185 | | |
186 | 15.1M | Status RemoteTabletServer::InitProxy(YBClient* client) { |
187 | 15.1M | { |
188 | 15.1M | SharedLock<rw_spinlock> lock(mutex_); |
189 | | |
190 | 15.1M | if (proxy_15.1M ) { |
191 | | // Already have a proxy created. |
192 | 15.1M | return Status::OK(); |
193 | 15.1M | } |
194 | 15.1M | } |
195 | | |
196 | 18.4E | std::lock_guard<rw_spinlock> lock(mutex_); |
197 | | |
198 | 18.4E | if (proxy_) { |
199 | | // Already have a proxy created. |
200 | 6 | return Status::OK(); |
201 | 6 | } |
202 | | |
203 | 18.4E | if (!dns_resolve_histogram_) { |
204 | 10.4k | auto metric_entity = client->metric_entity(); |
205 | 10.4k | if (metric_entity) { |
206 | 10.4k | dns_resolve_histogram_ = METRIC_dns_resolve_latency_during_init_proxy.Instantiate( |
207 | 10.4k | metric_entity); |
208 | 10.4k | } |
209 | 10.4k | } |
210 | | |
211 | | // TODO: if the TS advertises multiple host/ports, pick the right one |
212 | | // based on some kind of policy. For now just use the first always. |
213 | 18.4E | auto hostport = HostPortFromPB(yb::DesiredHostPort( |
214 | 18.4E | public_rpc_hostports_, private_rpc_hostports_, cloud_info_pb_, |
215 | 18.4E | client->data_->cloud_info_pb_)); |
216 | 18.4E | CHECK(!hostport.host().empty()); |
217 | 18.4E | ScopedDnsTracker dns_tracker(dns_resolve_histogram_.get()); |
218 | 18.4E | proxy_.reset(new TabletServerServiceProxy(client->data_->proxy_cache_.get(), hostport)); |
219 | 18.4E | proxy_endpoint_ = hostport; |
220 | | |
221 | 18.4E | return Status::OK(); |
222 | 18.4E | } |
223 | | |
224 | 289k | void RemoteTabletServer::Update(const master::TSInfoPB& pb) { |
225 | 289k | CHECK_EQ(pb.permanent_uuid(), uuid_); |
226 | | |
227 | 289k | std::lock_guard<rw_spinlock> lock(mutex_); |
228 | 289k | private_rpc_hostports_ = pb.private_rpc_addresses(); |
229 | 289k | public_rpc_hostports_ = pb.broadcast_addresses(); |
230 | 289k | cloud_info_pb_ = pb.cloud_info(); |
231 | 289k | capabilities_.assign(pb.capabilities().begin(), pb.capabilities().end()); |
232 | 289k | std::sort(capabilities_.begin(), capabilities_.end()); |
233 | 289k | } |
234 | | |
235 | 11.7M | bool RemoteTabletServer::IsLocal() const { |
236 | 11.7M | return local_tserver_ != nullptr; |
237 | 11.7M | } |
238 | | |
239 | 1.82M | const std::string& RemoteTabletServer::permanent_uuid() const { |
240 | 1.82M | return uuid_; |
241 | 1.82M | } |
242 | | |
243 | 15.1M | shared_ptr<TabletServerServiceProxy> RemoteTabletServer::proxy() const { |
244 | 15.1M | SharedLock<rw_spinlock> lock(mutex_); |
245 | 15.1M | return proxy_; |
246 | 15.1M | } |
247 | | |
248 | 0 | ::yb::HostPort RemoteTabletServer::ProxyEndpoint() const { |
249 | 0 | std::shared_lock<rw_spinlock> lock(mutex_); |
250 | 0 | return proxy_endpoint_; |
251 | 0 | } |
252 | | |
253 | 252k | string RemoteTabletServer::ToString() const { |
254 | 252k | string ret = "{ uuid: " + uuid_; |
255 | 252k | SharedLock<rw_spinlock> lock(mutex_); |
256 | 252k | if (!private_rpc_hostports_.empty()) { |
257 | 252k | ret += Format(" private: $0", private_rpc_hostports_); |
258 | 252k | } |
259 | 252k | if (!public_rpc_hostports_.empty()) { |
260 | 0 | ret += Format(" public: $0", public_rpc_hostports_); |
261 | 0 | } |
262 | 252k | ret += Format(" cloud_info: $0", cloud_info_pb_); |
263 | 252k | return ret; |
264 | 252k | } |
265 | | |
266 | 39.4k | bool RemoteTabletServer::HasHostFrom(const std::unordered_set<std::string>& hosts) const { |
267 | 39.4k | SharedLock<rw_spinlock> lock(mutex_); |
268 | 39.4k | for (const auto& hp : private_rpc_hostports_) { |
269 | 39.4k | if (hosts.count(hp.host())) { |
270 | 0 | return true; |
271 | 0 | } |
272 | 39.4k | } |
273 | 39.4k | for (const auto& hp : public_rpc_hostports_) { |
274 | 0 | if (hosts.count(hp.host())) { |
275 | 0 | return true; |
276 | 0 | } |
277 | 0 | } |
278 | 39.4k | return false; |
279 | 39.4k | } |
280 | | |
281 | 12.3M | bool RemoteTabletServer::HasCapability(CapabilityId capability) const { |
282 | 12.3M | SharedLock<rw_spinlock> lock(mutex_); |
283 | 12.3M | return std::binary_search(capabilities_.begin(), capabilities_.end(), capability); |
284 | 12.3M | } |
285 | | |
286 | 93 | bool RemoteTabletServer::IsLocalRegion() const { |
287 | 93 | SharedLock<rw_spinlock> lock(mutex_); |
288 | 93 | return cloud_info_pb_.placement_cloud() == FLAGS_placement_cloud && |
289 | 93 | cloud_info_pb_.placement_region() == FLAGS_placement_region; |
290 | 93 | } |
291 | | |
292 | 39.4k | LocalityLevel RemoteTabletServer::LocalityLevelWith(const CloudInfoPB& cloud_info) const { |
293 | 39.4k | SharedLock<rw_spinlock> lock(mutex_); |
294 | 39.4k | if (!cloud_info_pb_.has_placement_region() || !cloud_info.has_placement_region()39.4k || |
295 | 39.4k | cloud_info_pb_.placement_region() != cloud_info.placement_region()39.4k ) { |
296 | 665 | return LocalityLevel::kNone; |
297 | 665 | } |
298 | 38.7k | if (38.7k !cloud_info_pb_.has_placement_zone()38.7k || !cloud_info.has_placement_zone() || |
299 | 38.7k | cloud_info_pb_.placement_zone() != cloud_info.placement_zone()) { |
300 | 9.61k | return LocalityLevel::kRegion; |
301 | 9.61k | } |
302 | 29.1k | return LocalityLevel::kZone; |
303 | 38.7k | } |
304 | | |
305 | 219 | HostPortPB RemoteTabletServer::DesiredHostPort(const CloudInfoPB& cloud_info) const { |
306 | 219 | SharedLock<rw_spinlock> lock(mutex_); |
307 | 219 | return yb::DesiredHostPort( |
308 | 219 | public_rpc_hostports_, private_rpc_hostports_, cloud_info_pb_, cloud_info); |
309 | 219 | } |
310 | | |
311 | 6.50k | std::string RemoteTabletServer::TEST_PlacementZone() const { |
312 | 6.50k | SharedLock<rw_spinlock> lock(mutex_); |
313 | 6.50k | return cloud_info_pb_.placement_zone(); |
314 | 6.50k | } |
315 | | |
316 | 0 | std::string ReplicasCount::ToString() { |
317 | 0 | return Format( |
318 | 0 | " live replicas $0, read replicas $1, expected live replicas $2, expected read replicas $3", |
319 | 0 | num_alive_live_replicas, num_alive_read_replicas, |
320 | 0 | expected_live_replicas, expected_read_replicas); |
321 | 0 | } |
322 | | |
323 | | //////////////////////////////////////////////////////////// |
324 | | |
325 | | RemoteTablet::RemoteTablet(std::string tablet_id, |
326 | | Partition partition, |
327 | | boost::optional<PartitionListVersion> partition_list_version, |
328 | | uint64 split_depth, |
329 | | const TabletId& split_parent_tablet_id) |
330 | | : tablet_id_(std::move(tablet_id)), |
331 | | log_prefix_(Format("T $0: ", tablet_id_)), |
332 | | partition_(std::move(partition)), |
333 | | partition_list_version_(partition_list_version), |
334 | | split_depth_(split_depth), |
335 | | split_parent_tablet_id_(split_parent_tablet_id), |
336 | 90.3k | stale_(false) { |
337 | 90.3k | } |
338 | | |
339 | 71 | RemoteTablet::~RemoteTablet() { |
340 | 71 | if (PREDICT_FALSE(FLAGS_TEST_verify_all_replicas_alive)) { |
341 | | // Let's verify that none of the replicas are marked as failed. The test should always wait |
342 | | // enough time so that the lookup cache can be refreshed after force_lookup_cache_refresh_secs. |
343 | 0 | for (const auto& replica : replicas_) { |
344 | 0 | if (replica.Failed()) { |
345 | 0 | LOG_WITH_PREFIX(FATAL) << "Remote tablet server " << replica.ts->ToString() |
346 | 0 | << " with role " << PeerRole_Name(replica.role) |
347 | 0 | << " is marked as failed"; |
348 | 0 | } |
349 | 0 | } |
350 | 0 | } |
351 | 71 | } |
352 | | |
353 | | void RemoteTablet::Refresh( |
354 | | const TabletServerMap& tservers, |
355 | 759k | const google::protobuf::RepeatedPtrField<TabletLocationsPB_ReplicaPB>& replicas) { |
356 | | // Adopt the data from the successful response. |
357 | 759k | std::lock_guard<rw_spinlock> lock(mutex_); |
358 | 759k | std::vector<std::string> old_uuids; |
359 | 759k | old_uuids.reserve(replicas_.size()); |
360 | 1.68M | for (const auto& replica : replicas_) { |
361 | 1.68M | old_uuids.push_back(replica.ts->permanent_uuid()); |
362 | 1.68M | } |
363 | 759k | std::sort(old_uuids.begin(), old_uuids.end()); |
364 | 759k | replicas_.clear(); |
365 | 759k | bool has_new_replica = false; |
366 | 1.95M | for (const TabletLocationsPB_ReplicaPB& r : replicas) { |
367 | 1.95M | auto it = tservers.find(r.ts_info().permanent_uuid()); |
368 | 1.95M | CHECK(it != tservers.end()); |
369 | 1.95M | replicas_.emplace_back(it->second.get(), r.role()); |
370 | 1.95M | has_new_replica = |
371 | 1.95M | has_new_replica || |
372 | 1.95M | !std::binary_search(old_uuids.begin(), old_uuids.end(), r.ts_info().permanent_uuid())1.77M ; |
373 | 1.95M | } |
374 | 759k | if (has_new_replica) { |
375 | 92.3k | lookups_without_new_replicas_ = 0; |
376 | 667k | } else { |
377 | 667k | ++lookups_without_new_replicas_; |
378 | 667k | } |
379 | 759k | stale_ = false; |
380 | 759k | refresh_time_.store(MonoTime::Now(), std::memory_order_release); |
381 | 759k | } |
382 | | |
383 | 68 | void RemoteTablet::MarkStale() { |
384 | 68 | std::lock_guard<rw_spinlock> lock(mutex_); |
385 | 68 | stale_ = true; |
386 | 68 | } |
387 | | |
388 | 23.2M | bool RemoteTablet::stale() const { |
389 | 23.2M | SharedLock<rw_spinlock> lock(mutex_); |
390 | 23.2M | return stale_; |
391 | 23.2M | } |
392 | | |
393 | 35 | void RemoteTablet::MarkAsSplit() { |
394 | 35 | std::lock_guard<rw_spinlock> lock(mutex_); |
395 | 35 | is_split_ = true; |
396 | 35 | } |
397 | | |
398 | 12.0M | bool RemoteTablet::is_split() const { |
399 | 12.0M | SharedLock<rw_spinlock> lock(mutex_); |
400 | 12.0M | return is_split_; |
401 | 12.0M | } |
402 | | |
403 | 4.22k | bool RemoteTablet::MarkReplicaFailed(RemoteTabletServer *ts, const Status& status) { |
404 | 4.22k | std::lock_guard<rw_spinlock> lock(mutex_); |
405 | 18.4E | VLOG_WITH_PREFIX(2) << "Current remote replicas in meta cache: " |
406 | 18.4E | << ReplicasAsStringUnlocked() << ". Replica " << ts->ToString() |
407 | 18.4E | << " has failed: " << status.ToString(); |
408 | 8.02k | for (RemoteReplica& rep : replicas_) { |
409 | 8.02k | if (rep.ts == ts) { |
410 | 4.22k | rep.MarkFailed(); |
411 | 4.22k | return true; |
412 | 4.22k | } |
413 | 8.02k | } |
414 | 18.4E | return false; |
415 | 4.22k | } |
416 | | |
417 | 3.36k | int RemoteTablet::GetNumFailedReplicas() const { |
418 | 3.36k | int failed = 0; |
419 | 3.36k | SharedLock<rw_spinlock> lock(mutex_); |
420 | 20.1k | for (const RemoteReplica& rep : replicas_) { |
421 | 20.1k | if (rep.Failed()) { |
422 | 29 | failed++; |
423 | 29 | } |
424 | 20.1k | } |
425 | 3.36k | return failed; |
426 | 3.36k | } |
427 | | |
428 | 3.59k | bool RemoteTablet::IsReplicasCountConsistent() const { |
429 | 3.59k | return replicas_count_.load(std::memory_order_acquire).IsReplicasCountConsistent(); |
430 | 3.59k | } |
431 | | |
432 | 0 | string RemoteTablet::ReplicasCountToString() const { |
433 | 0 | return replicas_count_.load(std::memory_order_acquire).ToString(); |
434 | 0 | } |
435 | | |
436 | 757k | void RemoteTablet::SetExpectedReplicas(int expected_live_replicas, int expected_read_replicas) { |
437 | 757k | ReplicasCount old_replicas_count = replicas_count_.load(std::memory_order_acquire); |
438 | 757k | for (;;) { |
439 | 757k | ReplicasCount new_replicas_count = old_replicas_count; |
440 | 757k | new_replicas_count.SetExpectedReplicas(expected_live_replicas, expected_read_replicas); |
441 | 757k | if (replicas_count_.compare_exchange_weak( |
442 | 757k | old_replicas_count, new_replicas_count, std::memory_order_acq_rel)) { |
443 | 757k | break; |
444 | 757k | } |
445 | 757k | } |
446 | 757k | } |
447 | | |
448 | 368k | void RemoteTablet::SetAliveReplicas(int alive_live_replicas, int alive_read_replicas) { |
449 | 368k | ReplicasCount old_replicas_count = replicas_count_.load(std::memory_order_acquire); |
450 | 368k | for (;;) { |
451 | 368k | ReplicasCount new_replicas_count = old_replicas_count; |
452 | 368k | new_replicas_count.SetAliveReplicas(alive_live_replicas, alive_read_replicas); |
453 | 368k | if (replicas_count_.compare_exchange_weak( |
454 | 368k | old_replicas_count, new_replicas_count, std::memory_order_acq_rel)) { |
455 | 368k | break; |
456 | 368k | } |
457 | 368k | } |
458 | 368k | } |
459 | | |
460 | 39.2M | RemoteTabletServer* RemoteTablet::LeaderTServer() const { |
461 | 39.2M | SharedLock<rw_spinlock> lock(mutex_); |
462 | 77.9M | for (const RemoteReplica& replica : replicas_) { |
463 | 77.9M | if (!replica.Failed() && replica.role == PeerRole::LEADER77.7M ) { |
464 | 39.1M | return replica.ts; |
465 | 39.1M | } |
466 | 77.9M | } |
467 | 22.9k | return nullptr; |
468 | 39.2M | } |
469 | | |
470 | 25.1M | bool RemoteTablet::HasLeader() const { |
471 | 25.1M | return LeaderTServer() != nullptr; |
472 | 25.1M | } |
473 | | |
474 | | void RemoteTablet::GetRemoteTabletServers( |
475 | 368k | std::vector<RemoteTabletServer*>* servers, IncludeFailedReplicas include_failed_replicas) { |
476 | 368k | DCHECK(servers->empty()); |
477 | 368k | struct ReplicaUpdate { |
478 | 368k | RemoteReplica* replica; |
479 | 368k | tablet::RaftGroupStatePB new_state; |
480 | 368k | bool clear_failed; |
481 | 368k | }; |
482 | 368k | std::vector<ReplicaUpdate> replica_updates; |
483 | 368k | { |
484 | 368k | SharedLock<rw_spinlock> lock(mutex_); |
485 | 368k | int num_alive_live_replicas = 0; |
486 | 368k | int num_alive_read_replicas = 0; |
487 | 1.11M | for (RemoteReplica& replica : replicas_) { |
488 | 1.11M | if (replica.Failed()) { |
489 | 22.0k | if (include_failed_replicas) { |
490 | 22 | servers->push_back(replica.ts); |
491 | 22 | continue; |
492 | 22 | } |
493 | 22.0k | ReplicaUpdate replica_update = {&replica, RaftGroupStatePB::UNKNOWN, false}; |
494 | 22.0k | VLOG_WITH_PREFIX13 (4) |
495 | 13 | << "Replica " << replica.ts->ToString() |
496 | 13 | << " failed, state: " << RaftGroupStatePB_Name(replica.state) |
497 | 13 | << ", is local: " << replica.ts->IsLocal() |
498 | 13 | << ", time since failure: " << (MonoTime::Now() - replica.last_failed_time); |
499 | 22.0k | switch (replica.state) { |
500 | 21.9k | case RaftGroupStatePB::UNKNOWN: FALLTHROUGH_INTENDED; |
501 | 21.9k | case RaftGroupStatePB::NOT_STARTED: FALLTHROUGH_INTENDED; |
502 | 21.9k | case RaftGroupStatePB::BOOTSTRAPPING: FALLTHROUGH_INTENDED; |
503 | 21.9k | case RaftGroupStatePB::RUNNING: |
504 | | // These are non-terminal states that may retry. Check and update failed local replica's |
505 | | // current state. For remote replica, just wait for some time before retrying. |
506 | 21.9k | if (replica.ts->IsLocal()) { |
507 | 1.08k | tserver::GetTabletStatusRequestPB req; |
508 | 1.08k | tserver::GetTabletStatusResponsePB resp; |
509 | 1.08k | req.set_tablet_id(tablet_id_); |
510 | 1.08k | const Status status = |
511 | 1.08k | CHECK_NOTNULL(replica.ts->local_tserver())->GetTabletStatus(&req, &resp); |
512 | 1.08k | if (!status.ok() || resp.has_error()527 ) { |
513 | 557 | LOG_WITH_PREFIX(ERROR) |
514 | 557 | << "Received error from GetTabletStatus: " |
515 | 557 | << (!status.ok() ? status : StatusFromPB(resp.error().status())0 ); |
516 | 557 | continue; |
517 | 557 | } |
518 | | |
519 | 527 | DCHECK_EQ(resp.tablet_status().tablet_id(), tablet_id_); |
520 | 527 | VLOG_WITH_PREFIX0 (3) << "GetTabletStatus returned status: " |
521 | 0 | << tablet::RaftGroupStatePB_Name(resp.tablet_status().state()) |
522 | 0 | << " for replica " << replica.ts->ToString(); |
523 | 527 | replica_update.new_state = resp.tablet_status().state(); |
524 | 527 | if (replica_update.new_state != tablet::RaftGroupStatePB::RUNNING) { |
525 | 60 | if (replica_update.new_state != replica.state) { |
526 | | // Cannot update replica here directly because holding only shared lock on mutex. |
527 | 58 | replica_updates.push_back(replica_update); // Update only state |
528 | 58 | } |
529 | 60 | continue; |
530 | 60 | } |
531 | 467 | if (!replica.ts->local_tserver()->LeaderAndReady( |
532 | 467 | tablet_id_, /* allow_stale */ true)) { |
533 | | // Should continue here because otherwise failed state will be cleared. |
534 | 451 | continue; |
535 | 451 | } |
536 | 20.8k | } else if ((MonoTime::Now() - replica.last_failed_time) < |
537 | 20.8k | FLAGS_retry_failed_replica_ms * 1ms) { |
538 | 20.8k | continue; |
539 | 20.8k | } |
540 | 30 | break; |
541 | 30 | case RaftGroupStatePB::FAILED: 0 FALLTHROUGH_INTENDED0 ; |
542 | 16 | case RaftGroupStatePB::QUIESCING: FALLTHROUGH_INTENDED; |
543 | 65 | case RaftGroupStatePB::SHUTDOWN: |
544 | | // These are terminal states, so we won't retry. |
545 | 65 | continue; |
546 | 22.0k | } |
547 | | |
548 | 20 | VLOG_WITH_PREFIX0 (3) << "Changing state of replica " << replica.ts->ToString() |
549 | 0 | << " from failed to not failed"; |
550 | 20 | replica_update.clear_failed = true; |
551 | | // Cannot update replica here directly because holding only shared lock on mutex. |
552 | 20 | replica_updates.push_back(replica_update); |
553 | 1.09M | } else { |
554 | 1.09M | if (replica.role == PeerRole::READ_REPLICA) { |
555 | 12.4k | num_alive_read_replicas++; |
556 | 1.08M | } else if (replica.role == PeerRole::FOLLOWER || replica.role == PeerRole::LEADER338k ) { |
557 | 1.07M | num_alive_live_replicas++; |
558 | 1.07M | } |
559 | 1.09M | } |
560 | 1.09M | servers->push_back(replica.ts); |
561 | 1.09M | } |
562 | 368k | SetAliveReplicas(num_alive_live_replicas, num_alive_read_replicas); |
563 | 368k | } |
564 | 368k | if (!replica_updates.empty()) { |
565 | 78 | std::lock_guard<rw_spinlock> lock(mutex_); |
566 | 78 | for (const auto& update : replica_updates) { |
567 | 78 | if (update.new_state != RaftGroupStatePB::UNKNOWN) { |
568 | 74 | update.replica->state = update.new_state; |
569 | 74 | } |
570 | 78 | if (update.clear_failed) { |
571 | 20 | update.replica->ClearFailed(); |
572 | 20 | } |
573 | 78 | } |
574 | 78 | } |
575 | 368k | } |
576 | | |
577 | 1.64k | bool RemoteTablet::MarkTServerAsLeader(const RemoteTabletServer* server) { |
578 | 1.64k | bool found = false; |
579 | 1.64k | std::lock_guard<rw_spinlock> lock(mutex_); |
580 | 5.16k | for (RemoteReplica& replica : replicas_) { |
581 | 5.16k | if (replica.ts == server) { |
582 | 1.64k | replica.role = PeerRole::LEADER; |
583 | 1.64k | found = true; |
584 | 3.52k | } else if (replica.role == PeerRole::LEADER) { |
585 | 175 | replica.role = PeerRole::FOLLOWER; |
586 | 175 | } |
587 | 5.16k | } |
588 | 1.64k | VLOG_WITH_PREFIX0 (3) << "Latest replicas: " << ReplicasAsStringUnlocked()0 ; |
589 | 1.64k | VLOG_IF_WITH_PREFIX0 (3, !found) << "Specified server not found: " << server->ToString() |
590 | 0 | << ". Replicas: " << ReplicasAsStringUnlocked(); |
591 | 1.64k | return found; |
592 | 1.64k | } |
593 | | |
594 | 3.36k | void RemoteTablet::MarkTServerAsFollower(const RemoteTabletServer* server) { |
595 | 3.36k | bool found = false; |
596 | 3.36k | std::lock_guard<rw_spinlock> lock(mutex_); |
597 | 10.2k | for (RemoteReplica& replica : replicas_) { |
598 | 10.2k | if (replica.ts == server) { |
599 | 3.36k | replica.role = PeerRole::FOLLOWER; |
600 | 3.36k | found = true; |
601 | 3.36k | } |
602 | 10.2k | } |
603 | 3.36k | VLOG_WITH_PREFIX0 (3) << "Latest replicas: " << ReplicasAsStringUnlocked()0 ; |
604 | 18.4E | DCHECK(found) << "Tablet " << tablet_id_ << ": Specified server not found: " |
605 | 18.4E | << server->ToString() << ". Replicas: " << ReplicasAsStringUnlocked(); |
606 | 3.36k | } |
607 | | |
608 | 3 | std::string RemoteTablet::ReplicasAsString() const { |
609 | 3 | SharedLock<rw_spinlock> lock(mutex_); |
610 | 3 | return ReplicasAsStringUnlocked(); |
611 | 3 | } |
612 | | |
613 | 3 | std::string RemoteTablet::ReplicasAsStringUnlocked() const { |
614 | 3 | DCHECK(mutex_.is_locked()); |
615 | 3 | string replicas_str; |
616 | 9 | for (const RemoteReplica& rep : replicas_) { |
617 | 9 | if (!replicas_str.empty()) replicas_str += ", "6 ; |
618 | 9 | replicas_str += rep.ToString(); |
619 | 9 | } |
620 | 3 | return replicas_str; |
621 | 3 | } |
622 | | |
623 | 5.08k | std::string RemoteTablet::ToString() const { |
624 | 5.08k | return YB_CLASS_TO_STRING(tablet_id, partition, partition_list_version, split_depth); |
625 | 5.08k | } |
626 | | |
627 | 285 | PartitionListVersion RemoteTablet::GetLastKnownPartitionListVersion() const { |
628 | 285 | SharedLock<rw_spinlock> lock(mutex_); |
629 | 285 | return last_known_partition_list_version_; |
630 | 285 | } |
631 | | |
632 | | void RemoteTablet::MakeLastKnownPartitionListVersionAtLeast( |
633 | 737k | PartitionListVersion partition_list_version) { |
634 | 737k | std::lock_guard<rw_spinlock> lock(mutex_); |
635 | 737k | last_known_partition_list_version_ = |
636 | 737k | std::max(last_known_partition_list_version_, partition_list_version); |
637 | 737k | } |
638 | | |
639 | 159k | void LookupCallbackVisitor::operator()(const LookupTabletCallback& tablet_callback) const { |
640 | 159k | if (error_status_) { |
641 | 1.32k | tablet_callback(*error_status_); |
642 | 1.32k | return; |
643 | 1.32k | } |
644 | 158k | auto remote_tablet = boost::get<RemoteTabletPtr>(param_); |
645 | 158k | if (remote_tablet == nullptr) { |
646 | 0 | static const Status error_status = STATUS( |
647 | 0 | TryAgain, "Tablet for requested partition is not yet running", |
648 | 0 | ClientError(ClientErrorCode::kTabletNotYetRunning)); |
649 | 0 | tablet_callback(error_status); |
650 | 0 | return; |
651 | 0 | } |
652 | 158k | tablet_callback(remote_tablet); |
653 | 158k | } |
654 | | |
655 | | void LookupCallbackVisitor::operator()( |
656 | 0 | const LookupTabletRangeCallback& tablet_range_callback) const { |
657 | 0 | if (error_status_) { |
658 | 0 | tablet_range_callback(*error_status_); |
659 | 0 | return; |
660 | 0 | } |
661 | 0 | auto result = boost::get<std::vector<RemoteTabletPtr>>(param_); |
662 | 0 | tablet_range_callback(result); |
663 | 0 | } |
664 | | |
665 | | //////////////////////////////////////////////////////////// |
666 | | |
667 | | MetaCache::MetaCache(YBClient* client) |
668 | | : client_(client), |
669 | | master_lookup_sem_(FLAGS_max_concurrent_master_lookups), |
670 | 30.7k | log_prefix_(Format("MetaCache($0): ", static_cast<void*>(this))) { |
671 | 30.7k | } |
672 | | |
673 | 691 | MetaCache::~MetaCache() { |
674 | 691 | } |
675 | | |
676 | | void MetaCache::SetLocalTabletServer(const string& permanent_uuid, |
677 | | const shared_ptr<TabletServerServiceProxy>& proxy, |
678 | 7.97k | const LocalTabletServer* local_tserver) { |
679 | 7.97k | const auto entry = ts_cache_.emplace(permanent_uuid, |
680 | 7.97k | std::make_unique<RemoteTabletServer>(permanent_uuid, |
681 | 7.97k | proxy, |
682 | 7.97k | local_tserver)); |
683 | 7.97k | CHECK(entry.second); |
684 | 7.97k | local_tserver_ = entry.first->second.get(); |
685 | 7.97k | } |
686 | | |
687 | 289k | void MetaCache::UpdateTabletServerUnlocked(const master::TSInfoPB& pb) { |
688 | 289k | const std::string& permanent_uuid = pb.permanent_uuid(); |
689 | 289k | auto it = ts_cache_.find(permanent_uuid); |
690 | 289k | if (it != ts_cache_.end()) { |
691 | 267k | it->second->Update(pb); |
692 | 267k | return; |
693 | 267k | } |
694 | | |
695 | 22.6k | VLOG_WITH_PREFIX0 (1) << "Client caching new TabletServer " << permanent_uuid0 ; |
696 | 22.6k | CHECK(ts_cache_.emplace(permanent_uuid, std::make_unique<RemoteTabletServer>(pb)).second); |
697 | 22.6k | } |
698 | | |
699 | | // A (table, partition_key) --> tablet lookup. May be in-flight to a master, or |
700 | | // may be handled locally. |
701 | | // |
702 | | // Keeps a reference on the owning metacache while alive. |
703 | | class LookupRpc : public internal::ClientMasterRpcBase, public RequestCleanup { |
704 | | public: |
705 | | LookupRpc(const scoped_refptr<MetaCache>& meta_cache, |
706 | | const std::shared_ptr<const YBTable>& table, |
707 | | int64_t request_no, |
708 | | CoarseTimePoint deadline); |
709 | | |
710 | | virtual ~LookupRpc(); |
711 | | |
712 | | void SendRpc() override; |
713 | | |
714 | 192k | MetaCache* meta_cache() { return meta_cache_.get(); } |
715 | 0 | YBClient* client() const { return meta_cache_->client_; } |
716 | | |
717 | | virtual void NotifyFailure(const Status& status) = 0; |
718 | | |
719 | | template <class Response> |
720 | | void DoProcessResponse(const Status& status, const Response& resp); |
721 | | |
722 | | // Subclasses can override VerifyResponse for implementing additional response checks. Called |
723 | | // from Finished if there are no errors passed in response. |
724 | 15.0k | virtual CHECKED_STATUS VerifyResponse() { return Status::OK(); } |
725 | | |
726 | 69.6k | int64_t request_no() const { |
727 | 69.6k | return request_no_; |
728 | 69.6k | } |
729 | | |
730 | 184k | const std::shared_ptr<const YBTable>& table() const { |
731 | 184k | return table_; |
732 | 184k | } |
733 | | |
734 | | // When we receive a response for a key lookup or full table rpc, add callbacks to be fired off |
735 | | // to the to_notify set corresponding to the rpc type. Modify tables pointer by removing lookups |
736 | | // as we add to the to_notify set. |
737 | | virtual void AddCallbacksToBeNotified( |
738 | | const ProcessedTablesMap& processed_tables, |
739 | | std::unordered_map<TableId, TableData>* tables, |
740 | | std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) = 0; |
741 | | |
742 | | // When looking up by key or full table, update the processe table with the returned location. |
743 | | virtual void UpdateProcessedTable(const TabletLocationsPB& loc, |
744 | | RemoteTabletPtr remote, |
745 | | ProcessedTablesMap::mapped_type* processed_table) = 0; |
746 | | |
747 | | private: |
748 | | virtual CHECKED_STATUS ProcessTabletLocations( |
749 | | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations, |
750 | | boost::optional<PartitionListVersion> table_partition_list_version) = 0; |
751 | | |
752 | | const int64_t request_no_; |
753 | | |
754 | | // Pointer back to the tablet cache. Populated with location information |
755 | | // if the lookup finishes successfully. |
756 | | // |
757 | | // When the RPC is destroyed, a master lookup permit is returned to the |
758 | | // cache if one was acquired in the first place. |
759 | | scoped_refptr<MetaCache> meta_cache_; |
760 | | |
761 | | // Table to lookup. Optional in case lookup by ID, but if specified used to check if table |
762 | | // partitions are stale. |
763 | | const std::shared_ptr<const YBTable> table_; |
764 | | |
765 | | // Whether this lookup has acquired a master lookup permit. |
766 | | bool has_permit_ = false; |
767 | | }; |
768 | | |
769 | | LookupRpc::LookupRpc(const scoped_refptr<MetaCache>& meta_cache, |
770 | | const std::shared_ptr<const YBTable>& table, |
771 | | int64_t request_no, |
772 | | CoarseTimePoint deadline) |
773 | | : ClientMasterRpcBase(meta_cache->client_, deadline), |
774 | | request_no_(request_no), |
775 | | meta_cache_(meta_cache), |
776 | 69.4k | table_(table) { |
777 | 69.4k | DCHECK(deadline != CoarseTimePoint()); |
778 | 69.4k | } |
779 | | |
780 | 69.3k | LookupRpc::~LookupRpc() { |
781 | 69.3k | if (has_permit_) { |
782 | 69.3k | meta_cache_->ReleaseMasterLookupPermit(); |
783 | 69.3k | } |
784 | 69.3k | } |
785 | | |
786 | 69.5k | void LookupRpc::SendRpc() { |
787 | 69.5k | if (!has_permit_) { |
788 | 69.4k | has_permit_ = meta_cache_->AcquireMasterLookupPermit(); |
789 | 69.4k | } |
790 | 69.5k | if (!has_permit_) { |
791 | | // Couldn't get a permit, try again in a little while. |
792 | 0 | ScheduleRetry(STATUS(TryAgain, "Client has too many outstanding requests to the master")); |
793 | 0 | return; |
794 | 0 | } |
795 | | |
796 | | // See YBClient::Data::SyncLeaderMasterRpc(). |
797 | 69.5k | auto now = CoarseMonoClock::Now(); |
798 | 69.5k | if (retrier().deadline() < now) { |
799 | 30 | Finished(STATUS_FORMAT(TimedOut, "Timed out after deadline expired, passed: $0", |
800 | 30 | MonoDelta(now - retrier().start()))); |
801 | 30 | return; |
802 | 30 | } |
803 | 69.5k | mutable_retrier()->PrepareController(); |
804 | | |
805 | 69.5k | ClientMasterRpcBase::SendRpc(); |
806 | 69.5k | } |
807 | | |
808 | | namespace { |
809 | | |
810 | 15.0k | CHECKED_STATUS GetFirstErrorForTabletById(const master::GetTabletLocationsResponsePB& resp) { |
811 | 15.0k | return resp.errors_size() > 0 ? StatusFromPB(resp.errors(0).status())360 : Status::OK()14.6k ; |
812 | 15.0k | } |
813 | | |
814 | 54.2k | CHECKED_STATUS GetFirstErrorForTabletById(const master::GetTableLocationsResponsePB& resp) { |
815 | | // There are no per-tablet lookup errors inside GetTableLocationsResponsePB. |
816 | 54.2k | return Status::OK(); |
817 | 54.2k | } |
818 | | |
819 | | template <class Response> |
820 | 69.3k | boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) { |
821 | 69.3k | return resp.has_partition_list_version() |
822 | 69.3k | ? boost::make_optional<PartitionListVersion>(resp.partition_list_version())56.3k |
823 | 69.3k | : boost::none12.9k ; |
824 | 69.3k | } meta_cache.cc:boost::optional<unsigned int> yb::client::internal::(anonymous namespace)::GetPartitionListVersion<yb::master::GetTableLocationsResponsePB>(yb::master::GetTableLocationsResponsePB const&) Line | Count | Source | 820 | 54.2k | boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) { | 821 | 54.2k | return resp.has_partition_list_version() | 822 | 54.2k | ? boost::make_optional<PartitionListVersion>(resp.partition_list_version())54.2k | 823 | 54.2k | : boost::none1 ; | 824 | 54.2k | } |
meta_cache.cc:boost::optional<unsigned int> yb::client::internal::(anonymous namespace)::GetPartitionListVersion<yb::master::GetTabletLocationsResponsePB>(yb::master::GetTabletLocationsResponsePB const&) Line | Count | Source | 820 | 15.0k | boost::optional<PartitionListVersion> GetPartitionListVersion(const Response& resp) { | 821 | 15.0k | return resp.has_partition_list_version() | 822 | 15.0k | ? boost::make_optional<PartitionListVersion>(resp.partition_list_version())2.08k | 823 | 15.0k | : boost::none12.9k ; | 824 | 15.0k | } |
|
825 | | |
826 | | } // namespace |
827 | | |
828 | | template <class Response> |
829 | 69.3k | void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) { |
830 | 69.3k | auto new_status = status; |
831 | 69.3k | if (new_status.ok()) { |
832 | 69.3k | new_status = VerifyResponse(); |
833 | 69.3k | } |
834 | | |
835 | | // Prefer response failures over no tablets found. |
836 | 69.3k | if (new_status.ok()) { |
837 | 69.3k | new_status = GetFirstErrorForTabletById(resp); |
838 | 69.3k | } |
839 | | |
840 | 69.3k | if (new_status.ok() && resp.tablet_locations_size() == 068.9k ) { |
841 | 0 | new_status = STATUS(NotFound, "No such tablet found"); |
842 | 0 | } |
843 | | |
844 | 69.3k | if (new_status.ok()) { |
845 | 68.9k | if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) { |
846 | 0 | const auto s = STATUS(TimedOut, "Simulate timeout for test."); |
847 | 0 | NotifyFailure(s); |
848 | 0 | return; |
849 | 0 | } |
850 | 68.9k | new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp)); |
851 | 68.9k | } |
852 | 69.3k | if (!new_status.ok()) { |
853 | 434 | YB_LOG_WITH_PREFIX_EVERY_N_SECS287 (WARNING, 1) << new_status287 ; |
854 | 434 | new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString())); |
855 | 434 | NotifyFailure(new_status); |
856 | 434 | } |
857 | 69.3k | } void yb::client::internal::LookupRpc::DoProcessResponse<yb::master::GetTableLocationsResponsePB>(yb::Status const&, yb::master::GetTableLocationsResponsePB const&) Line | Count | Source | 829 | 54.3k | void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) { | 830 | 54.3k | auto new_status = status; | 831 | 54.3k | if (new_status.ok()) { | 832 | 54.2k | new_status = VerifyResponse(); | 833 | 54.2k | } | 834 | | | 835 | | // Prefer response failures over no tablets found. | 836 | 54.3k | if (new_status.ok()) { | 837 | 54.2k | new_status = GetFirstErrorForTabletById(resp); | 838 | 54.2k | } | 839 | | | 840 | 54.3k | if (new_status.ok() && resp.tablet_locations_size() == 054.2k ) { | 841 | 0 | new_status = STATUS(NotFound, "No such tablet found"); | 842 | 0 | } | 843 | | | 844 | 54.3k | if (new_status.ok()) { | 845 | 54.2k | if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) { | 846 | 0 | const auto s = STATUS(TimedOut, "Simulate timeout for test."); | 847 | 0 | NotifyFailure(s); | 848 | 0 | return; | 849 | 0 | } | 850 | 54.2k | new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp)); | 851 | 54.2k | } | 852 | 54.3k | if (!new_status.ok()) { | 853 | 57 | YB_LOG_WITH_PREFIX_EVERY_N_SECS22 (WARNING, 1) << new_status22 ; | 854 | 57 | new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString())); | 855 | 57 | NotifyFailure(new_status); | 856 | 57 | } | 857 | 54.3k | } |
void yb::client::internal::LookupRpc::DoProcessResponse<yb::master::GetTabletLocationsResponsePB>(yb::Status const&, yb::master::GetTabletLocationsResponsePB const&) Line | Count | Source | 829 | 15.0k | void LookupRpc::DoProcessResponse(const Status& status, const Response& resp) { | 830 | 15.0k | auto new_status = status; | 831 | 15.0k | if (new_status.ok()) { | 832 | 15.0k | new_status = VerifyResponse(); | 833 | 15.0k | } | 834 | | | 835 | | // Prefer response failures over no tablets found. | 836 | 15.0k | if (new_status.ok()) { | 837 | 15.0k | new_status = GetFirstErrorForTabletById(resp); | 838 | 15.0k | } | 839 | | | 840 | 15.0k | if (new_status.ok() && resp.tablet_locations_size() == 014.6k ) { | 841 | 0 | new_status = STATUS(NotFound, "No such tablet found"); | 842 | 0 | } | 843 | | | 844 | 15.0k | if (new_status.ok()) { | 845 | 14.6k | if (RandomActWithProbability(FLAGS_TEST_simulate_lookup_timeout_probability)) { | 846 | 0 | const auto s = STATUS(TimedOut, "Simulate timeout for test."); | 847 | 0 | NotifyFailure(s); | 848 | 0 | return; | 849 | 0 | } | 850 | 14.6k | new_status = ProcessTabletLocations(resp.tablet_locations(), GetPartitionListVersion(resp)); | 851 | 14.6k | } | 852 | 15.0k | if (!new_status.ok()) { | 853 | 377 | YB_LOG_WITH_PREFIX_EVERY_N_SECS265 (WARNING, 1) << new_status265 ; | 854 | 377 | new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString())); | 855 | 377 | NotifyFailure(new_status); | 856 | 377 | } | 857 | 15.0k | } |
|
858 | | |
859 | | namespace { |
860 | | |
861 | | Status CheckTabletLocations( |
862 | 69.0k | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations) { |
863 | 69.0k | const std::string* prev_partition_end = nullptr; |
864 | 97.9k | for (const TabletLocationsPB& loc : locations) { |
865 | 97.9k | if (prev_partition_end && *prev_partition_end > loc.partition().partition_key_start()28.8k ) { |
866 | 0 | LOG(DFATAL) << "There should be no overlaps in tablet partitions and they should be sorted " |
867 | 0 | << "by partition_key_start. Prev partition end: " |
868 | 0 | << Slice(*prev_partition_end).ToDebugHexString() << ", current partition start: " |
869 | 0 | << Slice(loc.partition().partition_key_start()).ToDebugHexString() |
870 | 0 | << ". Tablet locations: " << [&locations] { |
871 | 0 | std::string result; |
872 | 0 | for (auto& loc : locations) { |
873 | 0 | result += "\n " + AsString(loc); |
874 | 0 | } |
875 | 0 | return result; |
876 | 0 | }(); |
877 | 0 | return STATUS(IllegalState, "Wrong order or overlaps in partitions"); |
878 | 0 | } |
879 | 97.9k | prev_partition_end = &loc.partition().partition_key_end(); |
880 | 97.9k | } |
881 | 69.0k | return Status::OK(); |
882 | 69.0k | } |
883 | | |
884 | | class TabletIdLookup : public ToStringable { |
885 | | public: |
886 | 15.0k | explicit TabletIdLookup(const TabletId& tablet_id) : tablet_id_(tablet_id) {} |
887 | | |
888 | 0 | std::string ToString() const override { |
889 | 0 | return Format("Tablet: $0", tablet_id_); |
890 | 0 | } |
891 | | |
892 | | private: |
893 | | const TabletId& tablet_id_; |
894 | | }; |
895 | | |
896 | | class TablePartitionLookup : public ToStringable { |
897 | | public: |
898 | | explicit TablePartitionLookup( |
899 | | const TableId& table_id, const VersionedPartitionGroupStartKey& partition_group_start) |
900 | 54.3k | : table_id_(table_id), partition_group_start_(partition_group_start) {} |
901 | | |
902 | 0 | std::string ToString() const override { |
903 | 0 | return Format("Table: $0, partition: $1, partition list version: $2", |
904 | 0 | table_id_, Slice(*partition_group_start_.key).ToDebugHexString(), |
905 | 0 | partition_group_start_.partition_list_version); |
906 | 0 | } |
907 | | |
908 | | private: |
909 | | const TableId& table_id_; |
910 | | const VersionedPartitionGroupStartKey partition_group_start_; |
911 | | }; |
912 | | |
913 | | class FullTableLookup : public ToStringable { |
914 | | public: |
915 | | explicit FullTableLookup(const TableId& table_id) |
916 | 0 | : table_id_(table_id) {} |
917 | | |
918 | 0 | std::string ToString() const override { |
919 | 0 | return Format("FullTableLookup: $0", table_id_); |
920 | 0 | } |
921 | | private: |
922 | | const TableId& table_id_; |
923 | | }; |
924 | | |
925 | | } // namespace |
926 | | |
927 | | Status MetaCache::ProcessTabletLocations( |
928 | | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations, |
929 | 69.0k | boost::optional<PartitionListVersion> table_partition_list_version, LookupRpc* lookup_rpc) { |
930 | 69.0k | if (VLOG_IS_ON(2)) { |
931 | 0 | VLOG_WITH_PREFIX_AND_FUNC(2) << "lookup_rpc: " << AsString(lookup_rpc); |
932 | 0 | for (const auto& loc : locations) { |
933 | 0 | for (const auto& table_id : loc.table_ids()) { |
934 | 0 | VLOG_WITH_PREFIX_AND_FUNC(2) << loc.tablet_id() << ", " << table_id; |
935 | 0 | } |
936 | 0 | } |
937 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) << AsString(locations); |
938 | 0 | } |
939 | | |
940 | 69.0k | RETURN_NOT_OK(CheckTabletLocations(locations)); |
941 | | |
942 | 69.0k | std::vector<std::pair<LookupCallback, LookupCallbackVisitor>> to_notify; |
943 | 69.0k | { |
944 | 69.0k | std::lock_guard<decltype(mutex_)> lock(mutex_); |
945 | 69.0k | ProcessedTablesMap processed_tables; |
946 | | |
947 | 97.9k | for (const TabletLocationsPB& loc : locations) { |
948 | 97.9k | const std::string& tablet_id = loc.tablet_id(); |
949 | | // Next, update the tablet caches. |
950 | 97.9k | RemoteTabletPtr remote = FindPtrOrNull(tablets_by_id_, tablet_id); |
951 | | |
952 | | // First, update the tserver cache, needed for the Refresh calls below. |
953 | 289k | for (const TabletLocationsPB_ReplicaPB& r : loc.replicas()) { |
954 | 289k | UpdateTabletServerUnlocked(r.ts_info()); |
955 | 289k | } |
956 | | |
957 | 97.9k | VersionedTablePartitionListPtr colocated_table_partition_list; |
958 | 97.9k | if (loc.table_ids_size() > 1 && lookup_rpc1.29k && lookup_rpc->table()1.29k ) { |
959 | | // When table_ids_size() == 1 we only receive info for the single table from the master |
960 | | // and we already have TableData initialized for it (this is done before sending an RPC to |
961 | | // the master). And when table_ids_size() > 1, it means we got response for lookup RPC for |
962 | | // co-located table and we can re-use TableData::partition_list from the table that was |
963 | | // requested by MetaCache::LookupTabletByKey caller for other tables co-located with this |
964 | | // one (since all co-located tables sharing the same set of tablets have the same table |
965 | | // partition list and now we have list of them returned by the master). |
966 | 1.28k | const auto lookup_table_it = tables_.find(lookup_rpc->table()->id()); |
967 | 1.28k | if (lookup_table_it != tables_.end()) { |
968 | 1.28k | colocated_table_partition_list = lookup_table_it->second.partition_list; |
969 | 1.28k | } else { |
970 | | // We don't want to crash the server in that case for production, since this is not a |
971 | | // correctness issue, but gives some performance degradation on first lookups for |
972 | | // co-located tables. |
973 | | // But we do want it to crash in debug, so we can more reliably catch this if it happens. |
974 | 0 | LOG_WITH_PREFIX(DFATAL) << Format( |
975 | 0 | "Internal error: got response for lookup RPC for co-located table, but MetaCache " |
976 | 0 | "table data wasn't initialized with partition list for this table. RPC: $0", |
977 | 0 | AsString(lookup_rpc)); |
978 | 0 | } |
979 | 1.28k | } |
980 | | |
981 | 757k | for (const std::string& table_id : loc.table_ids()) { |
982 | 757k | auto& processed_table = processed_tables[table_id]; |
983 | 757k | std::map<PartitionKey, RemoteTabletPtr>* tablets_by_key = nullptr; |
984 | | |
985 | 757k | auto table_it = tables_.find(table_id); |
986 | 757k | if (table_it == tables_.end() && loc.table_ids_size() > 1610k && |
987 | 757k | colocated_table_partition_list596k ) { |
988 | 590k | table_it = InitTableDataUnlocked(table_id, colocated_table_partition_list); |
989 | 590k | } |
990 | 757k | if (table_it != tables_.end()) { |
991 | 737k | auto& table_data = table_it->second; |
992 | | |
993 | 737k | const auto msg_formatter = [&] { |
994 | 0 | return Format( |
995 | 0 | "Received table $0 partitions version: $1, MetaCache's table partitions version: " |
996 | 0 | "$2", |
997 | 0 | table_id, table_partition_list_version, table_data.partition_list->version); |
998 | 0 | }; |
999 | 737k | VLOG_WITH_PREFIX_AND_FUNC0 (4) << msg_formatter()0 ; |
1000 | 737k | if (table_partition_list_version.has_value()) { |
1001 | 737k | if (table_partition_list_version.get() != table_data.partition_list->version) { |
1002 | 0 | return STATUS( |
1003 | 0 | TryAgain, msg_formatter(), |
1004 | 0 | ClientError(ClientErrorCode::kTablePartitionListIsStale)); |
1005 | 0 | } |
1006 | | // We need to guarantee that table_data.tablets_by_partition cache corresponds to |
1007 | | // table_data.partition_list (see comments for TableData::partitions). |
1008 | | // So, we don't update tablets_by_partition cache if we don't know table partitions |
1009 | | // version for both response and TableData. |
1010 | | // This only can happen for those LookupTabletById requests that don't specify table, |
1011 | | // because they don't care about partitions changing. |
1012 | 737k | tablets_by_key = &table_data.tablets_by_partition; |
1013 | 737k | } |
1014 | 737k | } |
1015 | | |
1016 | 757k | if (remote) { |
1017 | | // Partition should not have changed. |
1018 | 666k | DCHECK_EQ(loc.partition().partition_key_start(), |
1019 | 666k | remote->partition().partition_key_start()); |
1020 | 666k | DCHECK_EQ(loc.partition().partition_key_end(), |
1021 | 666k | remote->partition().partition_key_end()); |
1022 | | |
1023 | | // For colocated tables, RemoteTablet already exists because it was processed |
1024 | | // in a previous iteration of the for loop (for loc.table_ids()). |
1025 | | // We need to add this tablet to the current table's tablets_by_key map. |
1026 | 666k | if (tablets_by_key) { |
1027 | 660k | (*tablets_by_key)[remote->partition().partition_key_start()] = remote; |
1028 | 660k | } |
1029 | | |
1030 | 666k | VLOG_WITH_PREFIX0 (5) << "Refreshing tablet " << tablet_id << ": " |
1031 | 0 | << loc.ShortDebugString(); |
1032 | 666k | } else { |
1033 | 90.3k | VLOG_WITH_PREFIX0 (5) << "Caching tablet " << tablet_id << ": " << loc.ShortDebugString()0 ; |
1034 | | |
1035 | 90.3k | Partition partition; |
1036 | 90.3k | Partition::FromPB(loc.partition(), &partition); |
1037 | 90.3k | remote = new RemoteTablet( |
1038 | 90.3k | tablet_id, partition, table_partition_list_version, loc.split_depth(), |
1039 | 90.3k | loc.split_parent_tablet_id()); |
1040 | | |
1041 | 90.3k | CHECK(tablets_by_id_.emplace(tablet_id, remote).second); |
1042 | 90.3k | if (tablets_by_key) { |
1043 | 77.1k | auto emplace_result = tablets_by_key->emplace(partition.partition_key_start(), remote); |
1044 | 77.1k | if (!emplace_result.second) { |
1045 | 31 | const auto& old_tablet = emplace_result.first->second; |
1046 | 31 | if (old_tablet->split_depth() < remote->split_depth()) { |
1047 | | // Only replace with tablet of higher split_depth. |
1048 | 31 | emplace_result.first->second = remote; |
1049 | 31 | } else { |
1050 | | // If split_depth is the same - it should be the same tablet. |
1051 | 0 | if (old_tablet->split_depth() == loc.split_depth() |
1052 | 0 | && old_tablet->tablet_id() != tablet_id) { |
1053 | 0 | const auto error_msg = Format( |
1054 | 0 | "Can't replace tablet $0 with $1 at partition_key_start $2, split_depth $3", |
1055 | 0 | old_tablet->tablet_id(), tablet_id, loc.partition().partition_key_start(), |
1056 | 0 | old_tablet->split_depth()); |
1057 | 0 | LOG_WITH_PREFIX(DFATAL) << error_msg; |
1058 | | // Just skip updating this tablet for release build. |
1059 | 0 | } |
1060 | 0 | } |
1061 | 31 | } |
1062 | 77.1k | } |
1063 | 90.3k | MaybeUpdateClientRequests(*remote); |
1064 | 90.3k | } |
1065 | 757k | remote->Refresh(ts_cache_, loc.replicas()); |
1066 | 757k | remote->SetExpectedReplicas(loc.expected_live_replicas(), loc.expected_read_replicas()); |
1067 | 757k | if (table_partition_list_version.has_value()) { |
1068 | 737k | remote->MakeLastKnownPartitionListVersionAtLeast(*table_partition_list_version); |
1069 | 737k | } |
1070 | 757k | if (lookup_rpc) { |
1071 | 756k | lookup_rpc->UpdateProcessedTable(loc, remote, &processed_table); |
1072 | 756k | } |
1073 | 757k | } |
1074 | | |
1075 | 97.9k | auto it = tablet_lookups_by_id_.find(tablet_id); |
1076 | 97.9k | if (it != tablet_lookups_by_id_.end()) { |
1077 | 44.2k | while (auto* lookup = it->second.lookups.Pop()) { |
1078 | 27.6k | to_notify.emplace_back(std::move(lookup->callback), |
1079 | 27.6k | LookupCallbackVisitor(remote)); |
1080 | 27.6k | delete lookup; |
1081 | 27.6k | } |
1082 | 16.5k | } |
1083 | 97.9k | } |
1084 | 69.0k | if (lookup_rpc) { |
1085 | 68.9k | lookup_rpc->AddCallbacksToBeNotified(processed_tables, &tables_, &to_notify); |
1086 | 68.9k | lookup_rpc->CleanupRequest(); |
1087 | 68.9k | } |
1088 | 69.0k | } |
1089 | | |
1090 | 158k | for (const auto& callback_and_param : to_notify) { |
1091 | 158k | boost::apply_visitor(callback_and_param.second, callback_and_param.first); |
1092 | 158k | } |
1093 | | |
1094 | 69.0k | return Status::OK(); |
1095 | 69.0k | } |
1096 | | |
1097 | 90.3k | void MetaCache::MaybeUpdateClientRequests(const RemoteTablet& tablet) { |
1098 | 90.3k | VLOG_WITH_PREFIX_AND_FUNC0 (2) << "Tablet: " << tablet.tablet_id() |
1099 | 0 | << " split parent: " << tablet.split_parent_tablet_id(); |
1100 | 90.3k | if (tablet.split_parent_tablet_id().empty()) { |
1101 | 90.3k | VLOG_WITH_PREFIX0 (2) << "Tablet " << tablet.tablet_id() << " is not a result of split"0 ; |
1102 | 90.3k | return; |
1103 | 90.3k | } |
1104 | | // TODO: MetaCache is a friend of Client and tablet_requests_mutex_ with tablet_requests_ are |
1105 | | // public members of YBClient::Data. Consider refactoring that. |
1106 | 68 | std::lock_guard<simple_spinlock> request_lock(client_->data_->tablet_requests_mutex_); |
1107 | 68 | auto& tablet_requests = client_->data_->tablet_requests_; |
1108 | 68 | const auto requests_it = tablet_requests.find(tablet.split_parent_tablet_id()); |
1109 | 68 | if (requests_it == tablet_requests.end()) { |
1110 | 6 | VLOG_WITH_PREFIX0 (2) << "Can't find request_id_seq for tablet " |
1111 | 0 | << tablet.split_parent_tablet_id(); |
1112 | | // This can happen if client wasn't active (for example node was partitioned away) during |
1113 | | // sequence of splits that resulted in `tablet` creation, so we don't have info about `tablet` |
1114 | | // split parent. |
1115 | | // In this case we set request_id_seq to special value and will reset it on getting |
1116 | | // "request id is less than min" error. We will use min request ID plus 2^24 (there wouldn't be |
1117 | | // 2^24 client requests in progress from the same client to the same tablet, so it is safe to do |
1118 | | // this). |
1119 | 6 | tablet_requests.emplace( |
1120 | 6 | tablet.tablet_id(), |
1121 | 6 | YBClient::Data::TabletRequests { |
1122 | 6 | .request_id_seq = kInitializeFromMinRunning |
1123 | 6 | }); |
1124 | 6 | return; |
1125 | 6 | } |
1126 | 62 | VLOG_WITH_PREFIX0 (2) << "Setting request_id_seq for tablet " << tablet.tablet_id() |
1127 | 0 | << " from tablet " << tablet.split_parent_tablet_id() << " to " |
1128 | 0 | << requests_it->second.request_id_seq; |
1129 | 62 | tablet_requests[tablet.tablet_id()].request_id_seq = requests_it->second.request_id_seq; |
1130 | 62 | } |
1131 | | |
1132 | | std::unordered_map<TableId, TableData>::iterator MetaCache::InitTableDataUnlocked( |
1133 | 640k | const TableId& table_id, const VersionedTablePartitionListPtr& partitions) { |
1134 | 640k | VLOG_WITH_PREFIX_AND_FUNC0 (4) << Format( |
1135 | 0 | "MetaCache initializing TableData ($0 tables) for table $1: $2, " |
1136 | 0 | "partition_list_version: $3", |
1137 | 0 | tables_.size(), table_id, tables_.count(table_id), partitions->version); |
1138 | 640k | return tables_.emplace( |
1139 | 640k | std::piecewise_construct, std::forward_as_tuple(table_id), |
1140 | 640k | std::forward_as_tuple(partitions)).first; |
1141 | 640k | } |
1142 | | |
1143 | 19.2k | void MetaCache::InvalidateTableCache(const YBTable& table) { |
1144 | 19.2k | const auto& table_id = table.id(); |
1145 | 19.2k | const auto table_partition_list = table.GetVersionedPartitions(); |
1146 | 19.2k | VLOG_WITH_PREFIX_AND_FUNC0 (1) << Format( |
1147 | 0 | "table: $0, table.partition_list.version: $1", table_id, table_partition_list->version); |
1148 | | |
1149 | 19.2k | std::vector<LookupCallback> to_notify; |
1150 | | |
1151 | 19.2k | auto invalidate_needed = [this, &table_id, &table_partition_list](const auto& it) { |
1152 | 19.2k | const auto table_data_partition_list_version = it->second.partition_list->version; |
1153 | 19.2k | VLOG_WITH_PREFIX0 (1) << Format( |
1154 | 0 | "tables_[$0].partition_list.version: $1", table_id, table_data_partition_list_version); |
1155 | | // Only need to invalidate table cache, if it is has partition list version older than table's |
1156 | | // one. |
1157 | 19.2k | return table_data_partition_list_version < table_partition_list->version; |
1158 | 19.2k | }; |
1159 | | |
1160 | 19.2k | { |
1161 | 19.2k | SharedLock<decltype(mutex_)> lock(mutex_); |
1162 | 19.2k | auto it = tables_.find(table_id); |
1163 | 19.2k | if (it != tables_.end()) { |
1164 | 19.2k | if (!invalidate_needed(it)) { |
1165 | 19.1k | return; |
1166 | 19.1k | } |
1167 | 19.2k | } |
1168 | 19.2k | } |
1169 | | |
1170 | 31 | { |
1171 | 31 | UniqueLock<decltype(mutex_)> lock(mutex_); |
1172 | 31 | auto it = tables_.find(table_id); |
1173 | 31 | if (it != tables_.end()) { |
1174 | 31 | if (!invalidate_needed(it)) { |
1175 | 0 | return; |
1176 | 0 | } |
1177 | 31 | } else { |
1178 | 0 | it = InitTableDataUnlocked(table_id, table_partition_list); |
1179 | | // Nothing to invalidate, we have just initiliazed it first time. |
1180 | 0 | return; |
1181 | 0 | } |
1182 | | |
1183 | 31 | auto& table_data = it->second; |
1184 | | |
1185 | | // Some partitions could be mapped to tablets that have been split and we need to re-fetch |
1186 | | // info about tablets serving partitions. |
1187 | 68 | for (auto& tablet : table_data.tablets_by_partition) { |
1188 | 68 | tablet.second->MarkStale(); |
1189 | 68 | } |
1190 | | |
1191 | 31 | for (auto& tablet : table_data.all_tablets) { |
1192 | 0 | tablet->MarkStale(); |
1193 | 0 | } |
1194 | | // TODO(tsplit): Optimize to retry only necessary lookups inside ProcessTabletLocations, |
1195 | | // detect which need to be retried by GetTableLocationsResponsePB.partition_list_version. |
1196 | 33 | for (auto& group_lookups : table_data.tablet_lookups_by_group) { |
1197 | 33 | while (auto* lookup = group_lookups.second.lookups.Pop()) { |
1198 | 0 | to_notify.push_back(std::move(lookup->callback)); |
1199 | 0 | delete lookup; |
1200 | 0 | } |
1201 | 33 | } |
1202 | 31 | table_data.tablet_lookups_by_group.clear(); |
1203 | | |
1204 | | // Only update partitions here after invalidating TableData cache to avoid inconsistencies. |
1205 | | // See https://github.com/yugabyte/yugabyte-db/issues/6890. |
1206 | 31 | table_data.partition_list = table_partition_list; |
1207 | 31 | } |
1208 | 0 | for (const auto& callback : to_notify) { |
1209 | 0 | const auto s = STATUS_EC_FORMAT( |
1210 | 0 | TryAgain, ClientError(ClientErrorCode::kMetaCacheInvalidated), |
1211 | 0 | "MetaCache for table $0 has been invalidated.", table_id); |
1212 | 0 | boost::apply_visitor(LookupCallbackVisitor(s), callback); |
1213 | 0 | } |
1214 | 31 | } |
1215 | | |
1216 | | class MetaCache::CallbackNotifier { |
1217 | | public: |
1218 | 434 | explicit CallbackNotifier(const Status& status) : status_(status) {} |
1219 | | |
1220 | 1.32k | void Add(LookupCallback&& callback) { |
1221 | 1.32k | callbacks_.push_back(std::move(callback)); |
1222 | 1.32k | } |
1223 | | |
1224 | 2 | void SetStatus(const Status& status) { |
1225 | 2 | status_ = status; |
1226 | 2 | } |
1227 | | |
1228 | 434 | ~CallbackNotifier() { |
1229 | 1.32k | for (const auto& callback : callbacks_) { |
1230 | 1.32k | boost::apply_visitor(LookupCallbackVisitor(status_), callback); |
1231 | 1.32k | } |
1232 | 434 | } |
1233 | | private: |
1234 | | std::vector<LookupCallback> callbacks_; |
1235 | | Status status_; |
1236 | | }; |
1237 | | |
1238 | | CoarseTimePoint MetaCache::LookupFailed( |
1239 | | const Status& status, int64_t request_no, const ToStringable& lookup_id, |
1240 | | LookupDataGroup* lookup_data_group, |
1241 | 434 | CallbackNotifier* notifier) { |
1242 | 434 | std::vector<LookupData*> retry; |
1243 | 434 | auto now = CoarseMonoClock::Now(); |
1244 | 434 | CoarseTimePoint max_deadline; |
1245 | | |
1246 | 2.04k | while (auto* lookup = lookup_data_group->lookups.Pop()) { |
1247 | 1.60k | if (!status.IsTimedOut() || lookup->deadline <= now1.18k ) { |
1248 | 1.32k | notifier->Add(std::move(lookup->callback)); |
1249 | 1.32k | delete lookup; |
1250 | 1.32k | } else { |
1251 | 284 | max_deadline = std::max(max_deadline, lookup->deadline); |
1252 | 284 | retry.push_back(lookup); |
1253 | 284 | } |
1254 | 1.60k | } |
1255 | | |
1256 | 434 | if (retry.empty()) { |
1257 | 419 | lookup_data_group->Finished(request_no, lookup_id); |
1258 | 419 | } else { |
1259 | 284 | for (auto* lookup : retry) { |
1260 | 284 | lookup_data_group->lookups.Push(lookup); |
1261 | 284 | } |
1262 | 15 | } |
1263 | | |
1264 | 434 | return max_deadline; |
1265 | 434 | } |
1266 | | |
1267 | | class LookupByIdRpc : public LookupRpc { |
1268 | | public: |
1269 | | LookupByIdRpc(const scoped_refptr<MetaCache>& meta_cache, |
1270 | | const TabletId& tablet_id, |
1271 | | const std::shared_ptr<const YBTable>& table, |
1272 | | master::IncludeInactive include_inactive, |
1273 | | int64_t request_no, |
1274 | | CoarseTimePoint deadline, |
1275 | | int64_t lookups_without_new_replicas) |
1276 | | : LookupRpc(meta_cache, table, request_no, deadline), |
1277 | | tablet_id_(tablet_id), |
1278 | 15.1k | include_inactive_(include_inactive) { |
1279 | 15.1k | if (lookups_without_new_replicas != 0) { |
1280 | 1.44k | send_delay_ = std::min( |
1281 | 1.44k | lookups_without_new_replicas * FLAGS_meta_cache_lookup_throttling_step_ms, |
1282 | 1.44k | FLAGS_meta_cache_lookup_throttling_max_delay_ms) * 1ms; |
1283 | 1.44k | } |
1284 | 15.1k | } |
1285 | | |
1286 | 850 | std::string ToString() const override { |
1287 | 850 | return Format("LookupByIdRpc(tablet: $0, num_attempts: $1)", tablet_id_, num_attempts()); |
1288 | 850 | } |
1289 | | |
1290 | 16.6k | void SendRpc() override { |
1291 | 16.6k | if (send_delay_) { |
1292 | 1.44k | auto delay = send_delay_; |
1293 | 1.44k | send_delay_ = MonoDelta(); |
1294 | 1.44k | auto status = mutable_retrier()->DelayedRetry(this, Status::OK(), delay); |
1295 | 1.44k | if (!status.ok()) { |
1296 | 0 | Finished(status); |
1297 | 0 | } |
1298 | 1.44k | return; |
1299 | 1.44k | } |
1300 | | |
1301 | 15.1k | LookupRpc::SendRpc(); |
1302 | 15.1k | } |
1303 | | |
1304 | 15.1k | void CallRemoteMethod() override { |
1305 | | // Fill out the request. |
1306 | 15.1k | req_.clear_tablet_ids(); |
1307 | 15.1k | req_.add_tablet_ids(tablet_id_); |
1308 | 15.1k | if (table()) { |
1309 | 2.18k | req_.set_table_id(table()->id()); |
1310 | 2.18k | } |
1311 | 15.1k | req_.set_include_inactive(include_inactive_); |
1312 | | |
1313 | 15.1k | master_client_proxy()->GetTabletLocationsAsync( |
1314 | 15.1k | req_, &resp_, mutable_retrier()->mutable_controller(), |
1315 | 15.1k | std::bind(&LookupByIdRpc::Finished, this, Status::OK())); |
1316 | 15.1k | } |
1317 | | |
1318 | | void AddCallbacksToBeNotified( |
1319 | | const ProcessedTablesMap& processed_tables, |
1320 | | std::unordered_map<TableId, TableData>* tables, |
1321 | 14.6k | std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override { |
1322 | | // Nothing to do when looking up by id. |
1323 | 14.6k | return; |
1324 | 14.6k | } |
1325 | | |
1326 | | void UpdateProcessedTable(const TabletLocationsPB& loc, |
1327 | | RemoteTabletPtr remote, |
1328 | 23.8k | ProcessedTablesMap::mapped_type* processed_table) override { |
1329 | 23.8k | return; |
1330 | 23.8k | } |
1331 | | |
1332 | | private: |
1333 | 15.0k | void ProcessResponse(const Status& status) override { |
1334 | 15.0k | DoProcessResponse(status, resp_); |
1335 | 15.0k | } |
1336 | | |
1337 | 15.1k | Status ResponseStatus() override { |
1338 | 15.1k | return StatusFromResp(resp_); |
1339 | 15.1k | } |
1340 | | |
1341 | 14.6k | void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS { |
1342 | 14.6k | auto& lookups = meta_cache()->tablet_lookups_by_id_; |
1343 | 14.6k | auto it = lookups.find(tablet_id_); |
1344 | 14.6k | TabletIdLookup tablet_id_lookup(tablet_id_); |
1345 | 14.6k | if (it != lookups.end()) { |
1346 | 14.6k | it->second.Finished(request_no(), tablet_id_lookup); |
1347 | 14.6k | } else { |
1348 | 0 | LOG_WITH_PREFIX(INFO) << "Cleanup request for unknown tablet: " |
1349 | 0 | << tablet_id_lookup.ToString(); |
1350 | 0 | } |
1351 | 14.6k | } |
1352 | | |
1353 | 377 | void NotifyFailure(const Status& status) override { |
1354 | 377 | meta_cache()->LookupByIdFailed( |
1355 | 377 | tablet_id_, table(), include_inactive_, |
1356 | 377 | GetPartitionListVersion(resp_), request_no(), status); |
1357 | 377 | } |
1358 | | |
1359 | | Status ProcessTabletLocations( |
1360 | | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations, |
1361 | 14.6k | boost::optional<PartitionListVersion> table_partition_list_version) override { |
1362 | 14.6k | return meta_cache()->ProcessTabletLocations( |
1363 | 14.6k | locations, table_partition_list_version, this); |
1364 | 14.6k | } |
1365 | | |
1366 | | // Tablet to lookup. |
1367 | | const TabletId tablet_id_; |
1368 | | |
1369 | | // Whether or not to lookup inactive (hidden) tablets. |
1370 | | master::IncludeInactive include_inactive_; |
1371 | | |
1372 | | // Request body. |
1373 | | master::GetTabletLocationsRequestPB req_; |
1374 | | |
1375 | | // Response body. |
1376 | | master::GetTabletLocationsResponsePB resp_; |
1377 | | |
1378 | | MonoDelta send_delay_; |
1379 | | }; |
1380 | | |
1381 | | class LookupFullTableRpc : public LookupRpc { |
1382 | | public: |
1383 | | LookupFullTableRpc(const scoped_refptr<MetaCache>& meta_cache, |
1384 | | const std::shared_ptr<const YBTable>& table, |
1385 | | int64_t request_no, |
1386 | | CoarseTimePoint deadline) |
1387 | 0 | : LookupRpc(meta_cache, table, request_no, deadline) { |
1388 | 0 | } |
1389 | | |
1390 | 0 | std::string ToString() const override { |
1391 | 0 | return Format("LookupFullTableRpc($0, $1)", table()->name(), num_attempts()); |
1392 | 0 | } |
1393 | | |
1394 | 0 | void CallRemoteMethod() override { |
1395 | | // Fill out the request. |
1396 | 0 | req_.mutable_table()->set_table_id(table()->id()); |
1397 | | // The end partition key is left unset intentionally so that we'll prefetch |
1398 | | // some additional tablets. |
1399 | 0 | master_client_proxy()->GetTableLocationsAsync( |
1400 | 0 | req_, &resp_, mutable_retrier()->mutable_controller(), |
1401 | 0 | std::bind(&LookupFullTableRpc::Finished, this, Status::OK())); |
1402 | 0 | } |
1403 | | |
1404 | | void AddCallbacksToBeNotified( |
1405 | | const ProcessedTablesMap& processed_tables, |
1406 | | std::unordered_map<TableId, TableData>* tables, |
1407 | 0 | std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override { |
1408 | 0 | for (const auto& processed_table : processed_tables) { |
1409 | | // Handle tablet range |
1410 | 0 | const auto it = tables->find(processed_table.first); |
1411 | 0 | if (it == tables->end()) { |
1412 | 0 | continue; |
1413 | 0 | } |
1414 | 0 | auto& table_data = it->second; |
1415 | 0 | auto& full_table_lookups = table_data.full_table_lookups; |
1416 | 0 | while (auto* lookup = full_table_lookups.lookups.Pop()) { |
1417 | 0 | std::vector<internal::RemoteTabletPtr> remote_tablets; |
1418 | 0 | for (const auto& entry : processed_table.second) { |
1419 | 0 | remote_tablets.push_back(entry.second); |
1420 | 0 | } |
1421 | 0 | table_data.all_tablets = remote_tablets; |
1422 | 0 | to_notify->emplace_back(std::move(lookup->callback), |
1423 | 0 | LookupCallbackVisitor(std::move(remote_tablets))); |
1424 | 0 | delete lookup; |
1425 | 0 | } |
1426 | 0 | } |
1427 | 0 | } |
1428 | | |
1429 | | void UpdateProcessedTable(const TabletLocationsPB& loc, |
1430 | | RemoteTabletPtr remote, |
1431 | 0 | ProcessedTablesMap::mapped_type* processed_table) override { |
1432 | 0 | processed_table->emplace(loc.partition().partition_key_start(), remote); |
1433 | 0 | } |
1434 | | |
1435 | | private: |
1436 | 0 | void ProcessResponse(const Status& status) override { |
1437 | 0 | DoProcessResponse(status, resp_); |
1438 | 0 | } |
1439 | | |
1440 | 0 | Status ResponseStatus() override { |
1441 | 0 | return StatusFromResp(resp_); |
1442 | 0 | } |
1443 | | |
1444 | 0 | void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS { |
1445 | 0 | const auto it = meta_cache()->tables_.find(table()->id()); |
1446 | 0 | if (it == meta_cache()->tables_.end()) { |
1447 | 0 | return; |
1448 | 0 | } |
1449 | 0 | auto& table_data = it->second; |
1450 | 0 | auto full_table_lookup = FullTableLookup(table()->id()); |
1451 | 0 | table_data.full_table_lookups.Finished(request_no(), full_table_lookup, false); |
1452 | 0 | } |
1453 | | |
1454 | 0 | void NotifyFailure(const Status& status) override { |
1455 | 0 | meta_cache()->LookupFullTableFailed(table(), request_no(), status); |
1456 | 0 | } |
1457 | | |
1458 | | Status ProcessTabletLocations( |
1459 | | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations, |
1460 | 0 | boost::optional<PartitionListVersion> table_partition_list_version) override { |
1461 | 0 | return meta_cache()->ProcessTabletLocations( |
1462 | 0 | locations, table_partition_list_version, this); |
1463 | 0 | } |
1464 | | |
1465 | | // Request body. |
1466 | | GetTableLocationsRequestPB req_; |
1467 | | |
1468 | | // Response body. |
1469 | | GetTableLocationsResponsePB resp_; |
1470 | | }; |
1471 | | |
1472 | | class LookupByKeyRpc : public LookupRpc { |
1473 | | public: |
1474 | | LookupByKeyRpc(const scoped_refptr<MetaCache>& meta_cache, |
1475 | | const std::shared_ptr<const YBTable>& table, |
1476 | | const VersionedPartitionGroupStartKey& partition_group_start, |
1477 | | int64_t request_no, |
1478 | | CoarseTimePoint deadline) |
1479 | | : LookupRpc(meta_cache, table, request_no, deadline), |
1480 | 54.3k | partition_group_start_(partition_group_start) { |
1481 | 54.3k | } |
1482 | | |
1483 | 223 | std::string ToString() const override { |
1484 | 223 | return Format( |
1485 | 223 | "GetTableLocations { table_name: $0, table_id: $1, partition_start_key: $2, " |
1486 | 223 | "partition_list_version: $3, " |
1487 | 223 | "request_no: $4, num_attempts: $5 }", |
1488 | 223 | table()->name(), |
1489 | 223 | table()->id(), |
1490 | 223 | table()->partition_schema().PartitionKeyDebugString( |
1491 | 223 | *partition_group_start_.key, internal::GetSchema(table()->schema())), |
1492 | 223 | partition_group_start_.partition_list_version, |
1493 | 223 | request_no(), |
1494 | 223 | num_attempts()); |
1495 | 223 | } |
1496 | | |
1497 | 0 | const string& table_id() const { return table()->id(); } |
1498 | | |
1499 | 54.3k | void CallRemoteMethod() override { |
1500 | | // Fill out the request. |
1501 | 54.3k | req_.mutable_table()->set_table_id(table()->id()); |
1502 | 54.3k | req_.set_partition_key_start(*partition_group_start_.key); |
1503 | 54.3k | req_.set_max_returned_locations(kPartitionGroupSize); |
1504 | | |
1505 | | // The end partition key is left unset intentionally so that we'll prefetch |
1506 | | // some additional tablets. |
1507 | 54.3k | master_client_proxy()->GetTableLocationsAsync( |
1508 | 54.3k | req_, &resp_, mutable_retrier()->mutable_controller(), |
1509 | 54.3k | std::bind(&LookupByKeyRpc::Finished, this, Status::OK())); |
1510 | 54.3k | } |
1511 | | |
1512 | | void AddCallbacksToBeNotified( |
1513 | | const ProcessedTablesMap& processed_tables, |
1514 | | std::unordered_map<TableId, TableData>* tables, |
1515 | 54.2k | std::vector<std::pair<LookupCallback, LookupCallbackVisitor>>* to_notify) override { |
1516 | 704k | for (const auto& processed_table : processed_tables) { |
1517 | 704k | const auto table_it = tables->find(processed_table.first); |
1518 | 704k | if (table_it == tables->end()) { |
1519 | 0 | continue; |
1520 | 0 | } |
1521 | 704k | auto& table_data = table_it->second; |
1522 | | // This should be guaranteed by ProcessTabletLocations before we get here: |
1523 | 704k | DCHECK(table_data.partition_list->version == partition_group_start_.partition_list_version) |
1524 | 0 | << "table_data.partition_list->version: " << table_data.partition_list->version |
1525 | 0 | << " partition_group_start_.partition_list_version: " |
1526 | 0 | << partition_group_start_.partition_list_version; |
1527 | 704k | const auto lookup_by_group_iter = |
1528 | 704k | table_data.tablet_lookups_by_group.find(*partition_group_start_.key); |
1529 | 704k | if (lookup_by_group_iter != table_data.tablet_lookups_by_group.end()) { |
1530 | 55.6k | VLOG_WITH_PREFIX_AND_FUNC0 (4) |
1531 | 0 | << "Checking tablet_lookups_by_group for partition_group_start: " |
1532 | 0 | << Slice(*partition_group_start_.key).ToDebugHexString(); |
1533 | 55.6k | auto& lookups_group = lookup_by_group_iter->second; |
1534 | 186k | while (auto* lookup = lookups_group.lookups.Pop()) { |
1535 | 130k | auto remote_it = processed_table.second.find(*lookup->partition_start); |
1536 | 130k | auto lookup_visitor = LookupCallbackVisitor( |
1537 | 130k | remote_it != processed_table.second.end() ? remote_it->second : nullptr0 ); |
1538 | 130k | to_notify->emplace_back(std::move(lookup->callback), lookup_visitor); |
1539 | 130k | delete lookup; |
1540 | 130k | } |
1541 | 55.6k | } |
1542 | 704k | } |
1543 | 54.2k | } |
1544 | | |
1545 | | void UpdateProcessedTable(const TabletLocationsPB& loc, |
1546 | | RemoteTabletPtr remote, |
1547 | 732k | ProcessedTablesMap::mapped_type* processed_table) override { |
1548 | 732k | processed_table->emplace(loc.partition().partition_key_start(), remote); |
1549 | 732k | } |
1550 | | |
1551 | | private: |
1552 | 54.3k | void ProcessResponse(const Status& status) override { |
1553 | 54.3k | DoProcessResponse(status, resp_); |
1554 | 54.3k | } |
1555 | | |
1556 | 54.3k | Status ResponseStatus() override { |
1557 | 54.3k | return StatusFromResp(resp_); |
1558 | 54.3k | } |
1559 | | |
1560 | 54.2k | Status VerifyResponse() override { |
1561 | | // Note: if LookupByIdRpc response has no partition list version this means this response |
1562 | | // is from master that doesn't support tablet splitting and it is OK to treat it as version 0 |
1563 | | // (and 0 return value of resp_.partition_list_version() is OK). |
1564 | 54.2k | const auto req_partition_list_version = partition_group_start_.partition_list_version; |
1565 | 54.2k | const auto resp_partition_list_version = resp_.partition_list_version(); |
1566 | | |
1567 | 54.2k | const auto versions_formatter = [&] { |
1568 | 0 | return Format( |
1569 | 0 | "RPC: $0, response partition_list_version: $1", this->ToString(), |
1570 | 0 | resp_partition_list_version); |
1571 | 0 | }; |
1572 | | |
1573 | 54.2k | if (resp_partition_list_version < req_partition_list_version) { |
1574 | | // This means an issue on master side, table partition list version shouldn't decrease on |
1575 | | // master. |
1576 | 0 | const auto msg = Format("Ignoring response for obsolete RPC call. $0", versions_formatter()); |
1577 | 0 | LOG_WITH_PREFIX(DFATAL) << msg; |
1578 | 0 | return STATUS(IllegalState, msg); |
1579 | 0 | } |
1580 | 54.2k | if (resp_partition_list_version > req_partition_list_version) { |
1581 | | // This request is for partition_group_start calculated based on obsolete table partition |
1582 | | // list. |
1583 | 0 | const auto msg = Format( |
1584 | 0 | "Cached table partition list version is obsolete, refresh required. $0", |
1585 | 0 | versions_formatter()); |
1586 | 0 | VLOG_WITH_PREFIX_AND_FUNC(3) << msg; |
1587 | 0 | return STATUS(TryAgain, msg, ClientError(ClientErrorCode::kTablePartitionListIsStale)); |
1588 | 0 | } |
1589 | | // resp_partition_list_version == req_partition_list_version |
1590 | 54.2k | return Status::OK(); |
1591 | 54.2k | } |
1592 | | |
1593 | 54.2k | void CleanupRequest() override NO_THREAD_SAFETY_ANALYSIS { |
1594 | 54.2k | const auto it = meta_cache()->tables_.find(table()->id()); |
1595 | 54.2k | if (it == meta_cache()->tables_.end()) { |
1596 | 0 | return; |
1597 | 0 | } |
1598 | 54.2k | auto& table_data = it->second; |
1599 | | // This should be guaranteed by ProcessTabletLocations before we get here: |
1600 | 54.2k | DCHECK(table_data.partition_list->version == partition_group_start_.partition_list_version) |
1601 | 0 | << "table_data.partition_list->version: " << table_data.partition_list->version |
1602 | 0 | << " partition_group_start_.partition_list_version: " |
1603 | 0 | << partition_group_start_.partition_list_version; |
1604 | 54.2k | const auto lookup_by_group_iter = table_data.tablet_lookups_by_group.find( |
1605 | 54.2k | *partition_group_start_.key); |
1606 | 54.2k | TablePartitionLookup tablet_partition_lookup(table()->id(), partition_group_start_); |
1607 | 54.2k | if (lookup_by_group_iter != table_data.tablet_lookups_by_group.end()) { |
1608 | 54.2k | lookup_by_group_iter->second.Finished(request_no(), tablet_partition_lookup, false); |
1609 | 54.2k | } else { |
1610 | 0 | LOG_WITH_PREFIX(INFO) << "Cleanup request for unknown partition group: " |
1611 | 0 | << tablet_partition_lookup.ToString(); |
1612 | 0 | } |
1613 | 54.2k | } |
1614 | | |
1615 | 57 | void NotifyFailure(const Status& status) override { |
1616 | 57 | meta_cache()->LookupByKeyFailed( |
1617 | 57 | table(), partition_group_start_, resp_.partition_list_version(), request_no(), status); |
1618 | 57 | } |
1619 | | |
1620 | | Status ProcessTabletLocations( |
1621 | | const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& locations, |
1622 | 54.2k | boost::optional<PartitionListVersion> table_partition_list_version) override { |
1623 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(2) << "partition_group_start: " << partition_group_start_.ToString(); |
1624 | | // This condition is guaranteed by VerifyResponse function: |
1625 | 54.2k | CHECK(resp_.partition_list_version() == partition_group_start_.partition_list_version); |
1626 | | |
1627 | 54.2k | return meta_cache()->ProcessTabletLocations(locations, table_partition_list_version, this); |
1628 | 54.2k | } |
1629 | | |
1630 | | // Encoded partition group start key to lookup. |
1631 | | VersionedPartitionGroupStartKey partition_group_start_; |
1632 | | |
1633 | | // Request body. |
1634 | | GetTableLocationsRequestPB req_; |
1635 | | |
1636 | | // Response body. |
1637 | | GetTableLocationsResponsePB resp_; |
1638 | | }; |
1639 | | |
1640 | | void MetaCache::LookupByKeyFailed( |
1641 | | const std::shared_ptr<const YBTable>& table, |
1642 | | const VersionedPartitionGroupStartKey& partition_group_start, |
1643 | | PartitionListVersion response_partition_list_version, |
1644 | 57 | int64_t request_no, const Status& status) { |
1645 | 57 | const auto req_partition_list_version = partition_group_start.partition_list_version; |
1646 | 57 | VLOG_WITH_PREFIX0 (1) << "Lookup for table " << table->id() << " and partition group start " |
1647 | 0 | << Slice(*partition_group_start.key).ToDebugHexString() |
1648 | 0 | << ", request partition list version: " << req_partition_list_version |
1649 | 0 | << ", response partition list version: " << response_partition_list_version |
1650 | 0 | << " failed with: " << status; |
1651 | | |
1652 | 57 | CallbackNotifier notifier(status); |
1653 | 57 | CoarseTimePoint max_deadline; |
1654 | 57 | { |
1655 | 57 | std::lock_guard<decltype(mutex_)> lock(mutex_); |
1656 | 57 | auto it = tables_.find(table->id()); |
1657 | 57 | if (it == tables_.end()) { |
1658 | 0 | return; |
1659 | 0 | } |
1660 | | |
1661 | 57 | auto& table_data = it->second; |
1662 | 57 | const auto table_data_partition_list_version = table_data.partition_list->version; |
1663 | 57 | const auto versions_formatter = [&] { |
1664 | 0 | return Format( |
1665 | 0 | "MetaCache's table $0 partition list version: $1, stored in RPC call: $2, received: $3", |
1666 | 0 | table->id(), table_data_partition_list_version, |
1667 | 0 | req_partition_list_version, response_partition_list_version); |
1668 | 0 | }; |
1669 | | |
1670 | 57 | if (table_data_partition_list_version < req_partition_list_version) { |
1671 | | // MetaCache partition list version is older than stored in LookupByKeyRpc for which we've |
1672 | | // received an answer. |
1673 | | // This shouldn't happen, because MetaCache partition list version for each table couldn't |
1674 | | // decrease and we store it inside LookupByKeyRpc when creating an RPC. |
1675 | 0 | LOG_WITH_PREFIX(FATAL) << Format( |
1676 | 0 | "Cached table partition list version is older than stored in RPC call. $0", |
1677 | 0 | versions_formatter()); |
1678 | 57 | } else if (table_data_partition_list_version > req_partition_list_version) { |
1679 | | // MetaCache table partition list version has updated since we've sent this RPC. |
1680 | | // We've already failed and cleaned all registered lookups for the table on MetaCache table |
1681 | | // partition list update, so we should ignore responses as well. |
1682 | 0 | VLOG_WITH_PREFIX_AND_FUNC(3) << Format( |
1683 | 0 | "Cached table partition list version is newer than stored in RPC call, ignoring " |
1684 | 0 | "failure response. $0", versions_formatter()); |
1685 | 0 | return; |
1686 | 0 | } |
1687 | | |
1688 | | // Since table_data_partition_list_version == req_partition_list_version, |
1689 | | // we will get tablet lookups for this exact RPC call there: |
1690 | 57 | auto key_lookup_iterator = table_data.tablet_lookups_by_group.find(*partition_group_start.key); |
1691 | 57 | if (key_lookup_iterator != table_data.tablet_lookups_by_group.end()) { |
1692 | 57 | auto& lookup_data_group = key_lookup_iterator->second; |
1693 | 57 | max_deadline = LookupFailed(status, request_no, |
1694 | 57 | TablePartitionLookup(table->id(), partition_group_start), |
1695 | 57 | &lookup_data_group, ¬ifier); |
1696 | 57 | } |
1697 | 57 | } |
1698 | | |
1699 | 57 | if (max_deadline != CoarseTimePoint()) { |
1700 | 10 | auto rpc = std::make_shared<LookupByKeyRpc>( |
1701 | 10 | this, table, partition_group_start, request_no, max_deadline); |
1702 | 10 | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
1703 | 10 | } |
1704 | 57 | } |
1705 | | |
1706 | | void MetaCache::LookupFullTableFailed(const std::shared_ptr<const YBTable>& table, |
1707 | 0 | int64_t request_no, const Status& status) { |
1708 | 0 | VLOG_WITH_PREFIX(1) << "Lookup for table " << table->id() << " failed with: " << status; |
1709 | |
|
1710 | 0 | CallbackNotifier notifier(status); |
1711 | 0 | CoarseTimePoint max_deadline; |
1712 | 0 | { |
1713 | 0 | std::lock_guard<decltype(mutex_)> lock(mutex_); |
1714 | 0 | auto it = tables_.find(table->id()); |
1715 | 0 | if (it == tables_.end()) { |
1716 | 0 | return; |
1717 | 0 | } |
1718 | | |
1719 | 0 | max_deadline = LookupFailed(status, request_no, FullTableLookup(table->id()), |
1720 | 0 | &it->second.full_table_lookups, ¬ifier); |
1721 | 0 | } |
1722 | | |
1723 | 0 | if (max_deadline != CoarseTimePoint()) { |
1724 | 0 | auto rpc = std::make_shared<LookupFullTableRpc>(this, table, request_no, max_deadline); |
1725 | 0 | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
1726 | 0 | } |
1727 | 0 | } |
1728 | | |
1729 | | void MetaCache::LookupByIdFailed( |
1730 | | const TabletId& tablet_id, |
1731 | | const std::shared_ptr<const YBTable>& table, |
1732 | | master::IncludeInactive include_inactive, |
1733 | | const boost::optional<PartitionListVersion>& response_partition_list_version, |
1734 | | int64_t request_no, |
1735 | 377 | const Status& status) { |
1736 | 377 | VLOG_WITH_PREFIX1 (1) << "Lookup for tablet " << tablet_id << ", failed with: " << status1 ; |
1737 | | |
1738 | 377 | CallbackNotifier notifier(status); |
1739 | 377 | CoarseTimePoint max_deadline; |
1740 | 377 | { |
1741 | 377 | std::lock_guard<decltype(mutex_)> lock(mutex_); |
1742 | 377 | if (status.IsNotFound() && response_partition_list_version.has_value()360 ) { |
1743 | 305 | auto tablet = LookupTabletByIdFastPathUnlocked(tablet_id); |
1744 | 305 | if (tablet) { |
1745 | 285 | const auto tablet_last_known_table_partition_list_version = |
1746 | 285 | tablet->GetLastKnownPartitionListVersion(); |
1747 | 285 | if (tablet_last_known_table_partition_list_version < |
1748 | 285 | response_partition_list_version.value()) { |
1749 | 2 | const auto msg_formatter = [&] { |
1750 | 2 | return Format( |
1751 | 2 | "Received table $0 ($1) partitions version: $2, last known by MetaCache for " |
1752 | 2 | "tablet $3: $4", |
1753 | 2 | table->id(), table->name(), response_partition_list_version, |
1754 | 2 | tablet_id, tablet_last_known_table_partition_list_version); |
1755 | 2 | }; |
1756 | 2 | VLOG_WITH_PREFIX_AND_FUNC0 (3) << msg_formatter()0 ; |
1757 | 2 | notifier.SetStatus(STATUS( |
1758 | 2 | TryAgain, msg_formatter(), ClientError(ClientErrorCode::kTablePartitionListIsStale))); |
1759 | 2 | } |
1760 | 285 | } |
1761 | 305 | } |
1762 | | |
1763 | 377 | auto table_lookup_iterator = tablet_lookups_by_id_.find(tablet_id); |
1764 | 377 | if (table_lookup_iterator != tablet_lookups_by_id_.end()) { |
1765 | 377 | auto& lookup_data_group = table_lookup_iterator->second; |
1766 | 377 | max_deadline = LookupFailed(status, request_no, TabletIdLookup(tablet_id), |
1767 | 377 | &lookup_data_group, ¬ifier); |
1768 | 377 | } |
1769 | 377 | } |
1770 | | |
1771 | 377 | if (max_deadline != CoarseTimePoint()) { |
1772 | 5 | auto rpc = std::make_shared<LookupByIdRpc>( |
1773 | 5 | this, tablet_id, table, include_inactive, request_no, max_deadline, 0); |
1774 | 5 | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
1775 | 5 | } |
1776 | 377 | } |
1777 | | |
1778 | | RemoteTabletPtr MetaCache::LookupTabletByKeyFastPathUnlocked( |
1779 | 23.3M | const TableId& table_id, const VersionedPartitionStartKey& versioned_partition_start_key) { |
1780 | 23.3M | auto it = tables_.find(table_id); |
1781 | 23.3M | if (PREDICT_FALSE(it == tables_.end())) { |
1782 | | // No cache available for this table. |
1783 | 99.3k | return nullptr; |
1784 | 99.3k | } |
1785 | | |
1786 | 23.2M | const auto& table_data = it->second; |
1787 | | |
1788 | 23.2M | if (PREDICT_FALSE( |
1789 | 23.2M | table_data.partition_list->version != |
1790 | 23.2M | versioned_partition_start_key.partition_list_version)) { |
1791 | | // TableData::partition_list version in cache does not match partition_list_version used to |
1792 | | // calculate partition_key_start, can't use cache. |
1793 | 1 | return nullptr; |
1794 | 1 | } |
1795 | | |
1796 | 23.2M | const auto& partition_start_key = *versioned_partition_start_key.key; |
1797 | | |
1798 | 23.2M | DCHECK_EQ( |
1799 | 23.2M | partition_start_key, |
1800 | 23.2M | *client::FindPartitionStart(table_data.partition_list, partition_start_key)); |
1801 | 23.2M | auto tablet_it = table_data.tablets_by_partition.find(partition_start_key); |
1802 | 23.2M | if (PREDICT_FALSE(tablet_it == it->second.tablets_by_partition.end())) { |
1803 | | // No tablets with a start partition key lower than 'partition_key'. |
1804 | 75.1k | return nullptr; |
1805 | 75.1k | } |
1806 | | |
1807 | 23.2M | const auto& result = tablet_it->second; |
1808 | | |
1809 | | // Stale entries must be re-fetched. |
1810 | 23.2M | if (result->stale()) { |
1811 | 1.68k | return nullptr; |
1812 | 1.68k | } |
1813 | | |
1814 | 23.2M | if (result->partition().partition_key_end().compare(partition_start_key) > 0 || |
1815 | 23.2M | result->partition().partition_key_end().empty()9.04M ) { |
1816 | | // partition_start_key < partition.end OR tablet does not end. |
1817 | 23.1M | return result; |
1818 | 23.1M | } |
1819 | | |
1820 | 7.11k | return nullptr; |
1821 | 23.2M | } |
1822 | | |
1823 | | boost::optional<std::vector<RemoteTabletPtr>> MetaCache::FastLookupAllTabletsUnlocked( |
1824 | 0 | const std::shared_ptr<const YBTable>& table) { |
1825 | 0 | auto tablets = std::vector<RemoteTabletPtr>(); |
1826 | 0 | auto it = tables_.find(table->id()); |
1827 | 0 | if (PREDICT_FALSE(it == tables_.end())) { |
1828 | | // No cache available for this table. |
1829 | 0 | return boost::none; |
1830 | 0 | } |
1831 | | |
1832 | 0 | for (const auto& tablet : it->second.all_tablets) { |
1833 | 0 | if (tablet->stale()) { |
1834 | 0 | return boost::none; |
1835 | 0 | } |
1836 | 0 | tablets.push_back(tablet); |
1837 | 0 | } |
1838 | | |
1839 | 0 | if (tablets.empty()) { |
1840 | 0 | return boost::none; |
1841 | 0 | } |
1842 | 0 | return tablets; |
1843 | 0 | } |
1844 | | |
1845 | | // We disable thread safety analysis in this function due to manual conditional locking. |
1846 | | RemoteTabletPtr MetaCache::FastLookupTabletByKeyUnlocked( |
1847 | 23.3M | const TableId& table_id, const VersionedPartitionStartKey& partition_start) { |
1848 | | // Fast path: lookup in the cache. |
1849 | 23.3M | auto result = LookupTabletByKeyFastPathUnlocked(table_id, partition_start); |
1850 | 23.3M | if (result && result->HasLeader()23.1M ) { |
1851 | 23.1M | VLOG_WITH_PREFIX997 (5) << "Fast lookup: found tablet " << result->tablet_id()997 ; |
1852 | 23.1M | return result; |
1853 | 23.1M | } |
1854 | | |
1855 | 184k | return nullptr; |
1856 | 23.3M | } |
1857 | | |
1858 | | template <class Mutex> |
1859 | 116k | bool IsUniqueLock(const std::lock_guard<Mutex>*) { |
1860 | 116k | return true; |
1861 | 116k | } |
1862 | | |
1863 | | template <class Mutex> |
1864 | 66.7k | bool IsUniqueLock(const SharedLock<Mutex>*) { |
1865 | 66.7k | return false; |
1866 | 66.7k | } |
1867 | | |
1868 | | // partition_group_start should be not nullptr and points to PartitionGroupStartKeyPtr that will be |
1869 | | // initialized only if it is nullptr. This is an optimization to avoid recalculation of |
1870 | | // partition_group_start in subsequent call of this function (see MetaCache::LookupTabletByKey). |
1871 | | template <class Lock> |
1872 | | bool MetaCache::DoLookupTabletByKey( |
1873 | | const std::shared_ptr<const YBTable>& table, const VersionedTablePartitionListPtr& partitions, |
1874 | | const PartitionKeyPtr& partition_start, CoarseTimePoint deadline, |
1875 | 23.3M | LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) { |
1876 | 23.3M | DCHECK_ONLY_NOTNULL(partition_group_start); |
1877 | 23.3M | RemoteTabletPtr tablet; |
1878 | 23.3M | auto scope_exit = ScopeExit([callback, &tablet] { |
1879 | 23.3M | if (tablet) { |
1880 | 23.1M | (*callback)(tablet); |
1881 | 23.1M | } |
1882 | 23.3M | }); bool yb::client::internal::MetaCache::DoLookupTabletByKey<yb::SharedLock<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> const&, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const>*)::'lambda'()::operator()() const Line | Count | Source | 1878 | 23.3M | auto scope_exit = ScopeExit([callback, &tablet] { | 1879 | 23.3M | if (tablet) { | 1880 | 23.1M | (*callback)(tablet); | 1881 | 23.1M | } | 1882 | 23.3M | }); |
bool yb::client::internal::MetaCache::DoLookupTabletByKey<std::__1::lock_guard<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> const&, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const>*)::'lambda'()::operator()() const Line | Count | Source | 1878 | 53.1k | auto scope_exit = ScopeExit([callback, &tablet] { | 1879 | 53.1k | if (tablet) { | 1880 | 0 | (*callback)(tablet); | 1881 | 0 | } | 1882 | 53.1k | }); |
|
1883 | 23.3M | int64_t request_no; |
1884 | 23.3M | { |
1885 | 23.3M | Lock lock(mutex_); |
1886 | 23.3M | tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version}); |
1887 | 23.3M | if (tablet) { |
1888 | 23.1M | return true; |
1889 | 23.1M | } |
1890 | | |
1891 | 153k | auto table_it = tables_.find(table->id()); |
1892 | 153k | TableData* table_data; |
1893 | 153k | if (table_it == tables_.end()) { |
1894 | 99.4k | VLOG_WITH_PREFIX_AND_FUNC147 (4) << Format( |
1895 | 147 | "missed table_id $0", table->id()); |
1896 | 99.4k | if (!IsUniqueLock(&lock)) { |
1897 | 49.6k | return false; |
1898 | 49.6k | } |
1899 | 49.8k | table_it = InitTableDataUnlocked(table->id(), partitions); |
1900 | 49.8k | } |
1901 | 104k | table_data = &table_it->second; |
1902 | | |
1903 | 104k | if (table_data->partition_list->version != partitions->version || |
1904 | 134k | (PREDICT_FALSE(RandomActWithProbability( |
1905 | 134k | FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) && |
1906 | 134k | table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE0 )) { |
1907 | 1 | (*callback)(STATUS( |
1908 | 1 | TryAgain, |
1909 | 1 | Format( |
1910 | 1 | "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, " |
1911 | 1 | "refresh required", |
1912 | 1 | table->ToString(), table_data->partition_list->version, partitions->version), |
1913 | 1 | ClientError(ClientErrorCode::kTablePartitionListIsStale))); |
1914 | 1 | return true; |
1915 | 1 | } |
1916 | | |
1917 | 131k | if (104k !*partition_group_start104k ) { |
1918 | 131k | *partition_group_start = client::FindPartitionStart( |
1919 | 131k | partitions, *partition_start, kPartitionGroupSize); |
1920 | 131k | } |
1921 | | |
1922 | 104k | auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group; |
1923 | 104k | LookupDataGroup* lookups_group; |
1924 | 104k | { |
1925 | 104k | auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start); |
1926 | 104k | if (lookups_group_it == tablet_lookups_by_group.end()) { |
1927 | 56.3k | if (!IsUniqueLock(&lock)) { |
1928 | 3.31k | return false; |
1929 | 3.31k | } |
1930 | 53.0k | lookups_group = &tablet_lookups_by_group[**partition_group_start]; |
1931 | 53.0k | } else { |
1932 | 47.7k | lookups_group = &lookups_group_it->second; |
1933 | 47.7k | } |
1934 | 104k | } |
1935 | 100k | lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start)); |
1936 | 100k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); |
1937 | 100k | int64_t expected = 0; |
1938 | 100k | if (!lookups_group->running_request_number.compare_exchange_strong( |
1939 | 100k | expected, request_no, std::memory_order_acq_rel)) { |
1940 | 77.0k | VLOG_WITH_PREFIX_AND_FUNC1 (5) |
1941 | 1 | << "Lookup is already running for table: " << table->ToString() |
1942 | 1 | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() |
1943 | 1 | << ", partition_list_version: " << partitions->version |
1944 | 1 | << ", request_no: " << expected; |
1945 | 77.0k | return true; |
1946 | 77.0k | } |
1947 | 100k | } |
1948 | | |
1949 | 23.7k | auto rpc = std::make_shared<LookupByKeyRpc>( |
1950 | 23.7k | this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version}, |
1951 | 23.7k | request_no, deadline); |
1952 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) |
1953 | 18.4E | << "Started lookup for table: " << table->ToString() |
1954 | 18.4E | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() |
1955 | 18.4E | << ", rpc: " << AsString(rpc); |
1956 | 23.7k | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
1957 | 23.7k | return true; |
1958 | 100k | } bool yb::client::internal::MetaCache::DoLookupTabletByKey<yb::SharedLock<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> const&, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const>*) Line | Count | Source | 1875 | 23.3M | LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) { | 1876 | 23.3M | DCHECK_ONLY_NOTNULL(partition_group_start); | 1877 | 23.3M | RemoteTabletPtr tablet; | 1878 | 23.3M | auto scope_exit = ScopeExit([callback, &tablet] { | 1879 | 23.3M | if (tablet) { | 1880 | 23.3M | (*callback)(tablet); | 1881 | 23.3M | } | 1882 | 23.3M | }); | 1883 | 23.3M | int64_t request_no; | 1884 | 23.3M | { | 1885 | 23.3M | Lock lock(mutex_); | 1886 | 23.3M | tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version}); | 1887 | 23.3M | if (tablet) { | 1888 | 23.1M | return true; | 1889 | 23.1M | } | 1890 | | | 1891 | 100k | auto table_it = tables_.find(table->id()); | 1892 | 100k | TableData* table_data; | 1893 | 100k | if (table_it == tables_.end()) { | 1894 | 49.6k | VLOG_WITH_PREFIX_AND_FUNC147 (4) << Format( | 1895 | 147 | "missed table_id $0", table->id()); | 1896 | 49.6k | if (!IsUniqueLock(&lock)) { | 1897 | 49.6k | return false; | 1898 | 49.6k | } | 1899 | 83 | table_it = InitTableDataUnlocked(table->id(), partitions); | 1900 | 83 | } | 1901 | 51.1k | table_data = &table_it->second; | 1902 | | | 1903 | 51.1k | if (table_data->partition_list->version != partitions->version || | 1904 | 81.6k | (PREDICT_FALSE(RandomActWithProbability( | 1905 | 81.6k | FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) && | 1906 | 81.6k | table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE0 )) { | 1907 | 1 | (*callback)(STATUS( | 1908 | 1 | TryAgain, | 1909 | 1 | Format( | 1910 | 1 | "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, " | 1911 | 1 | "refresh required", | 1912 | 1 | table->ToString(), table_data->partition_list->version, partitions->version), | 1913 | 1 | ClientError(ClientErrorCode::kTablePartitionListIsStale))); | 1914 | 1 | return true; | 1915 | 1 | } | 1916 | | | 1917 | 81.6k | if (51.1k !*partition_group_start51.1k ) { | 1918 | 81.6k | *partition_group_start = client::FindPartitionStart( | 1919 | 81.6k | partitions, *partition_start, kPartitionGroupSize); | 1920 | 81.6k | } | 1921 | | | 1922 | 51.1k | auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group; | 1923 | 51.1k | LookupDataGroup* lookups_group; | 1924 | 51.1k | { | 1925 | 51.1k | auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start); | 1926 | 51.1k | if (lookups_group_it == tablet_lookups_by_group.end()) { | 1927 | 3.31k | if (!IsUniqueLock(&lock)) { | 1928 | 3.31k | return false; | 1929 | 3.31k | } | 1930 | 0 | lookups_group = &tablet_lookups_by_group[**partition_group_start]; | 1931 | 47.8k | } else { | 1932 | 47.8k | lookups_group = &lookups_group_it->second; | 1933 | 47.8k | } | 1934 | 51.1k | } | 1935 | 47.8k | lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start)); | 1936 | 47.8k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); | 1937 | 47.8k | int64_t expected = 0; | 1938 | 47.8k | if (!lookups_group->running_request_number.compare_exchange_strong( | 1939 | 77.0k | expected, request_no, std::memory_order_acq_rel)) { | 1940 | 77.0k | VLOG_WITH_PREFIX_AND_FUNC1 (5) | 1941 | 1 | << "Lookup is already running for table: " << table->ToString() | 1942 | 1 | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() | 1943 | 1 | << ", partition_list_version: " << partitions->version | 1944 | 1 | << ", request_no: " << expected; | 1945 | 77.0k | return true; | 1946 | 77.0k | } | 1947 | 47.8k | } | 1948 | | | 1949 | 18.4E | auto rpc = std::make_shared<LookupByKeyRpc>( | 1950 | 18.4E | this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version}, | 1951 | 18.4E | request_no, deadline); | 1952 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC18.4E (4) | 1953 | 18.4E | << "Started lookup for table: " << table->ToString() | 1954 | 18.4E | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() | 1955 | 18.4E | << ", rpc: " << AsString(rpc); | 1956 | 18.4E | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); | 1957 | 18.4E | return true; | 1958 | 47.8k | } |
bool yb::client::internal::MetaCache::DoLookupTabletByKey<std::__1::lock_guard<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::shared_ptr<yb::client::VersionedTablePartitionList const> const&, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*, std::__1::shared_ptr<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const>*) Line | Count | Source | 1875 | 53.0k | LookupTabletCallback* callback, PartitionGroupStartKeyPtr* partition_group_start) { | 1876 | 53.0k | DCHECK_ONLY_NOTNULL(partition_group_start); | 1877 | 53.0k | RemoteTabletPtr tablet; | 1878 | 53.0k | auto scope_exit = ScopeExit([callback, &tablet] { | 1879 | 53.0k | if (tablet) { | 1880 | 53.0k | (*callback)(tablet); | 1881 | 53.0k | } | 1882 | 53.0k | }); | 1883 | 53.0k | int64_t request_no; | 1884 | 53.0k | { | 1885 | 53.0k | Lock lock(mutex_); | 1886 | 53.0k | tablet = FastLookupTabletByKeyUnlocked(table->id(), {partition_start, partitions->version}); | 1887 | 53.0k | if (tablet) { | 1888 | 0 | return true; | 1889 | 0 | } | 1890 | | | 1891 | 53.0k | auto table_it = tables_.find(table->id()); | 1892 | 53.0k | TableData* table_data; | 1893 | 53.0k | if (table_it == tables_.end()) { | 1894 | 49.7k | VLOG_WITH_PREFIX_AND_FUNC0 (4) << Format( | 1895 | 0 | "missed table_id $0", table->id()); | 1896 | 49.7k | if (!IsUniqueLock(&lock)) { | 1897 | 0 | return false; | 1898 | 0 | } | 1899 | 49.7k | table_it = InitTableDataUnlocked(table->id(), partitions); | 1900 | 49.7k | } | 1901 | 53.0k | table_data = &table_it->second; | 1902 | | | 1903 | 53.0k | if (table_data->partition_list->version != partitions->version || | 1904 | 53.1k | (PREDICT_FALSE(RandomActWithProbability( | 1905 | 53.1k | FLAGS_TEST_simulate_lookup_partition_list_mismatch_probability)) && | 1906 | 53.1k | table->table_type() != YBTableType::TRANSACTION_STATUS_TABLE_TYPE0 )) { | 1907 | 0 | (*callback)(STATUS( | 1908 | 0 | TryAgain, | 1909 | 0 | Format( | 1910 | 0 | "MetaCache's table $0 partitions version does not match, cached: $1, got: $2, " | 1911 | 0 | "refresh required", | 1912 | 0 | table->ToString(), table_data->partition_list->version, partitions->version), | 1913 | 0 | ClientError(ClientErrorCode::kTablePartitionListIsStale))); | 1914 | 0 | return true; | 1915 | 0 | } | 1916 | | | 1917 | 53.0k | if (!*partition_group_start) { | 1918 | 49.8k | *partition_group_start = client::FindPartitionStart( | 1919 | 49.8k | partitions, *partition_start, kPartitionGroupSize); | 1920 | 49.8k | } | 1921 | | | 1922 | 53.0k | auto& tablet_lookups_by_group = table_data->tablet_lookups_by_group; | 1923 | 53.0k | LookupDataGroup* lookups_group; | 1924 | 53.0k | { | 1925 | 53.0k | auto lookups_group_it = tablet_lookups_by_group.find(**partition_group_start); | 1926 | 53.0k | if (lookups_group_it == tablet_lookups_by_group.end()53.0k ) { | 1927 | 53.0k | if (!IsUniqueLock(&lock)) { | 1928 | 0 | return false; | 1929 | 0 | } | 1930 | 53.0k | lookups_group = &tablet_lookups_by_group[**partition_group_start]; | 1931 | 18.4E | } else { | 1932 | 18.4E | lookups_group = &lookups_group_it->second; | 1933 | 18.4E | } | 1934 | 53.0k | } | 1935 | 53.0k | lookups_group->lookups.Push(new LookupData(*callback, deadline, partition_start)); | 1936 | 53.0k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); | 1937 | 53.0k | int64_t expected = 0; | 1938 | 53.0k | if (!lookups_group->running_request_number.compare_exchange_strong( | 1939 | 53.0k | expected, request_no, std::memory_order_acq_rel)) { | 1940 | 76 | VLOG_WITH_PREFIX_AND_FUNC0 (5) | 1941 | 0 | << "Lookup is already running for table: " << table->ToString() | 1942 | 0 | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() | 1943 | 0 | << ", partition_list_version: " << partitions->version | 1944 | 0 | << ", request_no: " << expected; | 1945 | 76 | return true; | 1946 | 76 | } | 1947 | 53.0k | } | 1948 | | | 1949 | 52.9k | auto rpc = std::make_shared<LookupByKeyRpc>( | 1950 | 52.9k | this, table, VersionedPartitionGroupStartKey{*partition_group_start, partitions->version}, | 1951 | 52.9k | request_no, deadline); | 1952 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) | 1953 | 18.4E | << "Started lookup for table: " << table->ToString() | 1954 | 18.4E | << ", partition_group_start: " << Slice(**partition_group_start).ToDebugHexString() | 1955 | 18.4E | << ", rpc: " << AsString(rpc); | 1956 | 52.9k | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); | 1957 | 52.9k | return true; | 1958 | 53.0k | } |
|
1959 | | |
1960 | | template <class Lock> |
1961 | | bool MetaCache::DoLookupAllTablets(const std::shared_ptr<const YBTable>& table, |
1962 | | CoarseTimePoint deadline, |
1963 | 0 | LookupTabletRangeCallback* callback) { |
1964 | 0 | VLOG_WITH_PREFIX(3) << "DoLookupAllTablets() for table: " << table->ToString(); |
1965 | 0 | int64_t request_no; |
1966 | 0 | { |
1967 | 0 | Lock lock(mutex_); |
1968 | 0 | if (PREDICT_TRUE(!FLAGS_TEST_force_master_lookup_all_tablets)) { |
1969 | 0 | auto tablets = FastLookupAllTabletsUnlocked(table); |
1970 | 0 | if (tablets.has_value()) { |
1971 | 0 | VLOG_WITH_PREFIX(4) << "tablets has value"; |
1972 | 0 | (*callback)(*tablets); |
1973 | 0 | return true; |
1974 | 0 | } |
1975 | 0 | } |
1976 | | |
1977 | 0 | if (!IsUniqueLock(&lock)) { |
1978 | 0 | return false; |
1979 | 0 | } |
1980 | 0 | auto table_it = tables_.find(table->id()); |
1981 | 0 | if (table_it == tables_.end()) { |
1982 | 0 | table_it = InitTableDataUnlocked(table->id(), table->GetVersionedPartitions()); |
1983 | 0 | } |
1984 | 0 | auto& table_data = table_it->second; |
1985 | |
|
1986 | 0 | auto& full_table_lookups = table_data.full_table_lookups; |
1987 | 0 | full_table_lookups.lookups.Push(new LookupData(*callback, deadline, nullptr)); |
1988 | 0 | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); |
1989 | 0 | int64_t expected = 0; |
1990 | 0 | if (!full_table_lookups.running_request_number.compare_exchange_strong( |
1991 | 0 | expected, request_no, std::memory_order_acq_rel)) { |
1992 | 0 | VLOG_WITH_PREFIX_AND_FUNC(5) |
1993 | 0 | << "Lookup is already running for table: " << table->ToString(); |
1994 | 0 | return true; |
1995 | 0 | } |
1996 | 0 | } |
1997 | | |
1998 | 0 | VLOG_WITH_PREFIX_AND_FUNC(4) |
1999 | 0 | << "Start lookup for table: " << table->ToString(); |
2000 | |
|
2001 | 0 | auto rpc = std::make_shared<LookupFullTableRpc>(this, table, request_no, deadline); |
2002 | 0 | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
2003 | 0 | return true; |
2004 | 0 | } Unexecuted instantiation: bool yb::client::internal::MetaCache::DoLookupAllTablets<yb::SharedLock<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<std::__1::vector<scoped_refptr<yb::client::internal::RemoteTablet>, std::__1::allocator<scoped_refptr<yb::client::internal::RemoteTablet> > > > const&)>*) Unexecuted instantiation: bool yb::client::internal::MetaCache::DoLookupAllTablets<std::__1::lock_guard<std::__1::shared_timed_mutex> >(std::__1::shared_ptr<yb::client::YBTable const> const&, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, std::__1::function<void (yb::Result<std::__1::vector<scoped_refptr<yb::client::internal::RemoteTablet>, std::__1::allocator<scoped_refptr<yb::client::internal::RemoteTablet> > > > const&)>*) |
2005 | | |
2006 | | // We disable thread safety analysis in this function due to manual conditional locking. |
2007 | | void MetaCache::LookupTabletByKey(const std::shared_ptr<YBTable>& table, |
2008 | | const PartitionKey& partition_key, |
2009 | | CoarseTimePoint deadline, |
2010 | 23.3M | LookupTabletCallback callback) { |
2011 | 23.3M | if (table->ArePartitionsStale()) { |
2012 | 19.2k | table->RefreshPartitions(client_, [this, table, partition_key, deadline, |
2013 | 19.2k | callback = std::move(callback)](const Status& status) { |
2014 | 19.2k | if (!status.ok()) { |
2015 | 0 | callback(status); |
2016 | 0 | return; |
2017 | 0 | } |
2018 | 19.2k | InvalidateTableCache(*table); |
2019 | 19.2k | LookupTabletByKey(table, partition_key, deadline, std::move(callback)); |
2020 | 19.2k | }); |
2021 | 19.2k | return; |
2022 | 19.2k | } |
2023 | | |
2024 | 23.3M | const auto table_partition_list = table->GetVersionedPartitions(); |
2025 | 23.3M | const auto partition_start = client::FindPartitionStart(table_partition_list, partition_key); |
2026 | 23.3M | VLOG_WITH_PREFIX_AND_FUNC7.96k (5) << "Table: " << table->ToString() |
2027 | 7.96k | << ", table_partition_list: " << table_partition_list->ToString() |
2028 | 7.96k | << ", partition_key: " << Slice(partition_key).ToDebugHexString() |
2029 | 7.96k | << ", partition_start: " << Slice(*partition_start).ToDebugHexString(); |
2030 | | |
2031 | 23.3M | PartitionGroupStartKeyPtr partition_group_start; |
2032 | 23.3M | if (DoLookupTabletByKey<SharedLock<std::shared_timed_mutex>>( |
2033 | 23.3M | table, table_partition_list, partition_start, deadline, &callback, |
2034 | 23.3M | &partition_group_start)) { |
2035 | 23.2M | return; |
2036 | 23.2M | } |
2037 | | |
2038 | 51.2k | bool result = DoLookupTabletByKey<std::lock_guard<std::shared_timed_mutex>>( |
2039 | 51.2k | table, table_partition_list, partition_start, deadline, &callback, &partition_group_start); |
2040 | 18.4E | LOG_IF(DFATAL, !result) |
2041 | 18.4E | << "Lookup was not started for table " << table->ToString() |
2042 | 18.4E | << ", partition_key: " << Slice(partition_key).ToDebugHexString(); |
2043 | 51.2k | } |
2044 | | |
2045 | | void MetaCache::LookupAllTablets(const std::shared_ptr<const YBTable>& table, |
2046 | | CoarseTimePoint deadline, |
2047 | 0 | LookupTabletRangeCallback callback) { |
2048 | | // We first want to check the cache in read-only mode, and only if we can't find anything |
2049 | | // do a lookup in write mode. |
2050 | 0 | if (DoLookupAllTablets<SharedLock<std::shared_timed_mutex>>(table, deadline, &callback)) { |
2051 | 0 | return; |
2052 | 0 | } |
2053 | | |
2054 | 0 | bool result = DoLookupAllTablets<std::lock_guard<std::shared_timed_mutex>>( |
2055 | 0 | table, deadline, &callback); |
2056 | 0 | LOG_IF(DFATAL, !result) |
2057 | 0 | << "Full table lookup was not started for table " << table->ToString(); |
2058 | 0 | } |
2059 | | |
2060 | 1.95M | RemoteTabletPtr MetaCache::LookupTabletByIdFastPathUnlocked(const TabletId& tablet_id) { |
2061 | 1.95M | auto it = tablets_by_id_.find(tablet_id); |
2062 | 1.95M | if (it != tablets_by_id_.end()) { |
2063 | 1.92M | return it->second; |
2064 | 1.92M | } |
2065 | 29.7k | return nullptr; |
2066 | 1.95M | } |
2067 | | |
2068 | | template <class Lock> |
2069 | | bool MetaCache::DoLookupTabletById( |
2070 | | const TabletId& tablet_id, |
2071 | | const std::shared_ptr<const YBTable>& table, |
2072 | | master::IncludeInactive include_inactive, |
2073 | | CoarseTimePoint deadline, |
2074 | | UseCache use_cache, |
2075 | 1.95M | LookupTabletCallback* callback) { |
2076 | 1.95M | RemoteTabletPtr tablet; |
2077 | 1.95M | auto scope_exit = ScopeExit([callback, &tablet] { |
2078 | 1.95M | if (tablet) { |
2079 | 1.91M | (*callback)(tablet); |
2080 | 1.91M | } |
2081 | 1.95M | }); bool yb::client::internal::MetaCache::DoLookupTabletById<yb::SharedLock<std::__1::shared_timed_mutex> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::client::YBTable const> const&, yb::StronglyTypedBool<yb::master::IncludeInactive_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::UseCache_Tag>, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*)::'lambda'()::operator()() const Line | Count | Source | 2077 | 1.94M | auto scope_exit = ScopeExit([callback, &tablet] { | 2078 | 1.94M | if (tablet) { | 2079 | 1.91M | (*callback)(tablet); | 2080 | 1.91M | } | 2081 | 1.94M | }); |
bool yb::client::internal::MetaCache::DoLookupTabletById<std::__1::lock_guard<std::__1::shared_timed_mutex> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::client::YBTable const> const&, yb::StronglyTypedBool<yb::master::IncludeInactive_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::UseCache_Tag>, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*)::'lambda'()::operator()() const Line | Count | Source | 2077 | 13.7k | auto scope_exit = ScopeExit([callback, &tablet] { | 2078 | 13.7k | if (tablet) { | 2079 | 0 | (*callback)(tablet); | 2080 | 0 | } | 2081 | 13.7k | }); |
|
2082 | 1.95M | int64_t request_no; |
2083 | 1.95M | int64_t lookups_without_new_replicas = 0; |
2084 | 1.95M | { |
2085 | 1.95M | Lock lock(mutex_); |
2086 | | |
2087 | | // Fast path: lookup in the cache. |
2088 | 1.95M | tablet = LookupTabletByIdFastPathUnlocked(tablet_id); |
2089 | 1.95M | if (tablet) { |
2090 | 1.93M | VLOG_WITH_PREFIX177 (5) << "Fast lookup: candidate tablet " << AsString(tablet)177 ; |
2091 | 1.93M | if (use_cache && tablet->HasLeader()1.93M ) { |
2092 | | // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with |
2093 | | // tablet_id is found on all replicas. |
2094 | 1.91M | VLOG_WITH_PREFIX184 (5) << "Fast lookup: found tablet " << tablet->tablet_id()184 ; |
2095 | 1.91M | return true; |
2096 | 1.91M | } |
2097 | 14.4k | lookups_without_new_replicas = tablet->lookups_without_new_replicas(); |
2098 | 14.4k | tablet = nullptr; |
2099 | 14.4k | } |
2100 | | |
2101 | 38.3k | LookupDataGroup* lookup; |
2102 | 38.3k | { |
2103 | 38.3k | auto lookup_it = tablet_lookups_by_id_.find(tablet_id); |
2104 | 38.3k | if (lookup_it == tablet_lookups_by_id_.end()) { |
2105 | 27.0k | if (!IsUniqueLock(&lock)) { |
2106 | 13.5k | return false; |
2107 | 13.5k | } |
2108 | 13.5k | lookup = &tablet_lookups_by_id_[tablet_id]; |
2109 | 13.5k | } else { |
2110 | 11.2k | lookup = &lookup_it->second; |
2111 | 11.2k | } |
2112 | 38.3k | } |
2113 | 24.8k | lookup->lookups.Push(new LookupData(*callback, deadline, nullptr)); |
2114 | 24.8k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); |
2115 | 24.8k | int64_t expected = 0; |
2116 | 24.8k | if (!lookup->running_request_number.compare_exchange_strong( |
2117 | 24.8k | expected, request_no, std::memory_order_acq_rel)) { |
2118 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(5) << "Lookup already running for tablet: " << tablet_id; |
2119 | 13.2k | return true; |
2120 | 13.2k | } |
2121 | 24.8k | } |
2122 | | |
2123 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) << "Start lookup for tablet " << tablet_id << ": " << request_no; |
2124 | | |
2125 | 11.5k | auto rpc = std::make_shared<LookupByIdRpc>( |
2126 | 11.5k | this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas); |
2127 | 11.5k | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); |
2128 | 11.5k | return true; |
2129 | 24.8k | } bool yb::client::internal::MetaCache::DoLookupTabletById<yb::SharedLock<std::__1::shared_timed_mutex> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::client::YBTable const> const&, yb::StronglyTypedBool<yb::master::IncludeInactive_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::UseCache_Tag>, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*) Line | Count | Source | 2075 | 1.94M | LookupTabletCallback* callback) { | 2076 | 1.94M | RemoteTabletPtr tablet; | 2077 | 1.94M | auto scope_exit = ScopeExit([callback, &tablet] { | 2078 | 1.94M | if (tablet) { | 2079 | 1.94M | (*callback)(tablet); | 2080 | 1.94M | } | 2081 | 1.94M | }); | 2082 | 1.94M | int64_t request_no; | 2083 | 1.94M | int64_t lookups_without_new_replicas = 0; | 2084 | 1.94M | { | 2085 | 1.94M | Lock lock(mutex_); | 2086 | | | 2087 | | // Fast path: lookup in the cache. | 2088 | 1.94M | tablet = LookupTabletByIdFastPathUnlocked(tablet_id); | 2089 | 1.94M | if (tablet) { | 2090 | 1.92M | VLOG_WITH_PREFIX177 (5) << "Fast lookup: candidate tablet " << AsString(tablet)177 ; | 2091 | 1.92M | if (use_cache && tablet->HasLeader()1.92M ) { | 2092 | | // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with | 2093 | | // tablet_id is found on all replicas. | 2094 | 1.91M | VLOG_WITH_PREFIX184 (5) << "Fast lookup: found tablet " << tablet->tablet_id()184 ; | 2095 | 1.91M | return true; | 2096 | 1.91M | } | 2097 | 13.5k | lookups_without_new_replicas = tablet->lookups_without_new_replicas(); | 2098 | 13.5k | tablet = nullptr; | 2099 | 13.5k | } | 2100 | | | 2101 | 24.6k | LookupDataGroup* lookup; | 2102 | 24.6k | { | 2103 | 24.6k | auto lookup_it = tablet_lookups_by_id_.find(tablet_id); | 2104 | 24.6k | if (lookup_it == tablet_lookups_by_id_.end()) { | 2105 | 13.5k | if (!IsUniqueLock(&lock)13.4k ) { | 2106 | 13.5k | return false; | 2107 | 13.5k | } | 2108 | 18.4E | lookup = &tablet_lookups_by_id_[tablet_id]; | 2109 | 18.4E | } else { | 2110 | 11.1k | lookup = &lookup_it->second; | 2111 | 11.1k | } | 2112 | 24.6k | } | 2113 | 11.1k | lookup->lookups.Push(new LookupData(*callback, deadline, nullptr)); | 2114 | 11.1k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); | 2115 | 11.1k | int64_t expected = 0; | 2116 | 11.1k | if (!lookup->running_request_number.compare_exchange_strong( | 2117 | 13.1k | expected, request_no, std::memory_order_acq_rel)) { | 2118 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(5) << "Lookup already running for tablet: " << tablet_id; | 2119 | 13.1k | return true; | 2120 | 13.1k | } | 2121 | 11.1k | } | 2122 | | | 2123 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC18.4E (4) << "Start lookup for tablet " << tablet_id << ": " << request_no18.4E ; | 2124 | | | 2125 | 18.4E | auto rpc = std::make_shared<LookupByIdRpc>( | 2126 | 18.4E | this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas); | 2127 | 18.4E | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); | 2128 | 18.4E | return true; | 2129 | 11.1k | } |
bool yb::client::internal::MetaCache::DoLookupTabletById<std::__1::lock_guard<std::__1::shared_timed_mutex> >(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<yb::client::YBTable const> const&, yb::StronglyTypedBool<yb::master::IncludeInactive_Tag>, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::StronglyTypedBool<yb::client::UseCache_Tag>, std::__1::function<void (yb::Result<scoped_refptr<yb::client::internal::RemoteTablet> > const&)>*) Line | Count | Source | 2075 | 13.6k | LookupTabletCallback* callback) { | 2076 | 13.6k | RemoteTabletPtr tablet; | 2077 | 13.6k | auto scope_exit = ScopeExit([callback, &tablet] { | 2078 | 13.6k | if (tablet) { | 2079 | 13.6k | (*callback)(tablet); | 2080 | 13.6k | } | 2081 | 13.6k | }); | 2082 | 13.6k | int64_t request_no; | 2083 | 13.6k | int64_t lookups_without_new_replicas = 0; | 2084 | 13.6k | { | 2085 | 13.6k | Lock lock(mutex_); | 2086 | | | 2087 | | // Fast path: lookup in the cache. | 2088 | 13.6k | tablet = LookupTabletByIdFastPathUnlocked(tablet_id); | 2089 | 13.6k | if (tablet) { | 2090 | 936 | VLOG_WITH_PREFIX0 (5) << "Fast lookup: candidate tablet " << AsString(tablet)0 ; | 2091 | 936 | if (use_cache && tablet->HasLeader()769 ) { | 2092 | | // tablet->HasLeader() check makes MetaCache send RPC to master in case of no tablet with | 2093 | | // tablet_id is found on all replicas. | 2094 | 0 | VLOG_WITH_PREFIX(5) << "Fast lookup: found tablet " << tablet->tablet_id(); | 2095 | 0 | return true; | 2096 | 0 | } | 2097 | 936 | lookups_without_new_replicas = tablet->lookups_without_new_replicas(); | 2098 | 936 | tablet = nullptr; | 2099 | 936 | } | 2100 | | | 2101 | 13.6k | LookupDataGroup* lookup; | 2102 | 13.6k | { | 2103 | 13.6k | auto lookup_it = tablet_lookups_by_id_.find(tablet_id); | 2104 | 13.6k | if (lookup_it == tablet_lookups_by_id_.end()) { | 2105 | 13.5k | if (!IsUniqueLock(&lock)) { | 2106 | 0 | return false; | 2107 | 0 | } | 2108 | 13.5k | lookup = &tablet_lookups_by_id_[tablet_id]; | 2109 | 13.5k | } else { | 2110 | 110 | lookup = &lookup_it->second; | 2111 | 110 | } | 2112 | 13.6k | } | 2113 | 13.6k | lookup->lookups.Push(new LookupData(*callback, deadline, nullptr)); | 2114 | 13.6k | request_no = lookup_serial_.fetch_add(1, std::memory_order_acq_rel); | 2115 | 13.6k | int64_t expected = 0; | 2116 | 13.6k | if (!lookup->running_request_number.compare_exchange_strong( | 2117 | 13.6k | expected, request_no, std::memory_order_acq_rel)) { | 2118 | 129 | VLOG_WITH_PREFIX_AND_FUNC0 (5) << "Lookup already running for tablet: " << tablet_id0 ; | 2119 | 129 | return true; | 2120 | 129 | } | 2121 | 13.6k | } | 2122 | | | 2123 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) << "Start lookup for tablet " << tablet_id << ": " << request_no; | 2124 | | | 2125 | 13.5k | auto rpc = std::make_shared<LookupByIdRpc>( | 2126 | 13.5k | this, tablet_id, table, include_inactive, request_no, deadline, lookups_without_new_replicas); | 2127 | 13.5k | client_->data_->rpcs_.RegisterAndStart(rpc, rpc->RpcHandle()); | 2128 | 13.5k | return true; | 2129 | 13.6k | } |
|
2130 | | |
2131 | | void MetaCache::LookupTabletById(const TabletId& tablet_id, |
2132 | | const std::shared_ptr<const YBTable>& table, |
2133 | | master::IncludeInactive include_inactive, |
2134 | | CoarseTimePoint deadline, |
2135 | | LookupTabletCallback callback, |
2136 | 1.94M | UseCache use_cache) { |
2137 | 1.94M | VLOG_WITH_PREFIX_AND_FUNC804 (5) << "(" << tablet_id << ", " << use_cache << ")"804 ; |
2138 | | |
2139 | 1.94M | if (DoLookupTabletById<SharedLock<decltype(mutex_)>>( |
2140 | 1.94M | tablet_id, table, include_inactive, deadline, use_cache, &callback)) { |
2141 | 1.93M | return; |
2142 | 1.93M | } |
2143 | | |
2144 | 13.1k | auto result = DoLookupTabletById<std::lock_guard<decltype(mutex_)>>( |
2145 | 13.1k | tablet_id, table, include_inactive, deadline, use_cache, &callback); |
2146 | 18.4E | LOG_IF(DFATAL, !result) << "Lookup was not started for tablet " << tablet_id; |
2147 | 13.1k | } |
2148 | | |
2149 | | void MetaCache::MarkTSFailed(RemoteTabletServer* ts, |
2150 | 0 | const Status& status) { |
2151 | 0 | LOG_WITH_PREFIX(INFO) << "Marking tablet server " << ts->ToString() << " as failed."; |
2152 | 0 | SharedLock<decltype(mutex_)> lock(mutex_); |
2153 | |
|
2154 | 0 | Status ts_status = status.CloneAndPrepend("TS failed"); |
2155 | | |
2156 | | // TODO: replace with a ts->tablet multimap for faster lookup? |
2157 | 0 | for (const auto& tablet : tablets_by_id_) { |
2158 | | // We just loop on all tablets; if a tablet does not have a replica on this |
2159 | | // TS, MarkReplicaFailed() returns false and we ignore the return value. |
2160 | 0 | tablet.second->MarkReplicaFailed(ts, ts_status); |
2161 | 0 | } |
2162 | 0 | } |
2163 | | |
2164 | 69.4k | bool MetaCache::AcquireMasterLookupPermit() { |
2165 | 69.4k | return master_lookup_sem_.TryAcquire(); |
2166 | 69.4k | } |
2167 | | |
2168 | 69.3k | void MetaCache::ReleaseMasterLookupPermit() { |
2169 | 69.3k | master_lookup_sem_.Release(); |
2170 | 69.3k | } |
2171 | | |
2172 | | std::future<Result<internal::RemoteTabletPtr>> MetaCache::LookupTabletByKeyFuture( |
2173 | | const std::shared_ptr<YBTable>& table, |
2174 | | const PartitionKey& partition_key, |
2175 | 0 | CoarseTimePoint deadline) { |
2176 | 0 | return MakeFuture<Result<internal::RemoteTabletPtr>>([&](auto callback) { |
2177 | 0 | this->LookupTabletByKey(table, partition_key, deadline, std::move(callback)); |
2178 | 0 | }); |
2179 | 0 | } |
2180 | | |
2181 | 122 | LookupDataGroup::~LookupDataGroup() { |
2182 | 122 | std::vector<LookupData*> leftovers; |
2183 | 122 | while (auto* d = lookups.Pop()) { |
2184 | 0 | leftovers.push_back(d); |
2185 | 0 | } |
2186 | 122 | if (!leftovers.empty()) { |
2187 | 0 | LOG(DFATAL) << Format( |
2188 | 0 | "Destructing LookupDataGroup($0), running_request_number: $1 with non empty lookups: $2", |
2189 | 0 | static_cast<void*>(this), running_request_number, leftovers); |
2190 | 0 | } |
2191 | 122 | } |
2192 | | |
2193 | | void LookupDataGroup::Finished( |
2194 | 69.3k | int64_t request_no, const ToStringable& id, bool allow_absence) { |
2195 | 69.3k | int64_t expected = request_no; |
2196 | 69.3k | if (running_request_number.compare_exchange_strong(expected, 0, std::memory_order_acq_rel)) { |
2197 | 69.3k | max_completed_request_number = std::max(max_completed_request_number, request_no); |
2198 | 69.3k | VLOG_WITH_FUNC0 (2) << "Finished lookup for " << id.ToString() << ", no: " << request_no0 ; |
2199 | 69.3k | return; |
2200 | 69.3k | } |
2201 | | |
2202 | 0 | if ((expected == 0 && max_completed_request_number <= request_no) && !allow_absence) { |
2203 | 0 | LOG(DFATAL) << "Lookup was not running for " << id.ToString() << ", expected: " << request_no; |
2204 | 0 | return; |
2205 | 0 | } |
2206 | | |
2207 | 0 | LOG(INFO) |
2208 | 0 | << "Finished lookup for " << id.ToString() << ": " << request_no << ", while " |
2209 | 0 | << expected << " was running, could happen during tablet split"; |
2210 | 0 | } |
2211 | | |
2212 | | TableData::TableData(const VersionedTablePartitionListPtr& partition_list_) |
2213 | 640k | : partition_list(partition_list_) { |
2214 | 640k | DCHECK_ONLY_NOTNULL(partition_list); |
2215 | 640k | } |
2216 | | |
2217 | 0 | std::string VersionedPartitionStartKey::ToString() const { |
2218 | 0 | return YB_STRUCT_TO_STRING(key, partition_list_version); |
2219 | 0 | } |
2220 | | |
2221 | 9 | std::string RemoteReplica::ToString() const { |
2222 | 9 | return Format("$0 ($1, $2)", |
2223 | 9 | ts->permanent_uuid(), |
2224 | 9 | PeerRole_Name(role), |
2225 | 9 | Failed() ? "FAILED"0 : "OK"); |
2226 | 9 | } |
2227 | | |
2228 | | } // namespace internal |
2229 | | } // namespace client |
2230 | | } // namespace yb |