/Users/deen/code/yugabyte-db/src/yb/rpc/service_pool.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/service_pool.h" |
34 | | |
35 | | #include <pthread.h> |
36 | | #include <sys/types.h> |
37 | | |
38 | | #include <functional> |
39 | | #include <memory> |
40 | | #include <queue> |
41 | | #include <string> |
42 | | #include <vector> |
43 | | |
44 | | #include <boost/asio/strand.hpp> |
45 | | #include <cds/container/basket_queue.h> |
46 | | #include <cds/gc/dhp.h> |
47 | | #include <glog/logging.h> |
48 | | |
49 | | #include "yb/gutil/atomicops.h" |
50 | | #include "yb/gutil/ref_counted.h" |
51 | | #include "yb/gutil/strings/substitute.h" |
52 | | |
53 | | #include "yb/rpc/inbound_call.h" |
54 | | #include "yb/rpc/scheduler.h" |
55 | | #include "yb/rpc/service_if.h" |
56 | | |
57 | | #include "yb/util/countdown_latch.h" |
58 | | #include "yb/util/flag_tags.h" |
59 | | #include "yb/util/lockfree.h" |
60 | | #include "yb/util/logging.h" |
61 | | #include "yb/util/metrics.h" |
62 | | #include "yb/util/monotime.h" |
63 | | #include "yb/util/net/sockaddr.h" |
64 | | #include "yb/util/scope_exit.h" |
65 | | #include "yb/util/status.h" |
66 | | #include "yb/util/trace.h" |
67 | | |
68 | | using namespace std::literals; |
69 | | using namespace std::placeholders; |
70 | | using std::shared_ptr; |
71 | | using strings::Substitute; |
72 | | |
73 | | DEFINE_int64(max_time_in_queue_ms, 6000, |
74 | | "Fail calls that get stuck in the queue longer than the specified amount of time " |
75 | | "(in ms)"); |
76 | | TAG_FLAG(max_time_in_queue_ms, advanced); |
77 | | TAG_FLAG(max_time_in_queue_ms, runtime); |
78 | | DEFINE_int64(backpressure_recovery_period_ms, 600000, |
79 | | "Once we hit a backpressure/service-overflow we will consider dropping stale requests " |
80 | | "for this duration (in ms)"); |
81 | | TAG_FLAG(backpressure_recovery_period_ms, advanced); |
82 | | TAG_FLAG(backpressure_recovery_period_ms, runtime); |
83 | | DEFINE_test_flag(bool, enable_backpressure_mode_for_testing, false, |
84 | | "For testing purposes. Enables the rpc's to be considered timed out in the queue even " |
85 | | "when we have not had any backpressure in the recent past."); |
86 | | |
87 | | METRIC_DEFINE_coarse_histogram(server, rpc_incoming_queue_time, |
88 | | "RPC Queue Time", |
89 | | yb::MetricUnit::kMicroseconds, |
90 | | "Number of microseconds incoming RPC requests spend in the worker queue"); |
91 | | |
92 | | METRIC_DEFINE_counter(server, rpcs_timed_out_in_queue, |
93 | | "RPC Queue Timeouts", |
94 | | yb::MetricUnit::kRequests, |
95 | | "Number of RPCs whose timeout elapsed while waiting " |
96 | | "in the service queue, and thus were not processed. " |
97 | | "Does not include calls that were expired before we tried to execute them."); |
98 | | |
99 | | METRIC_DEFINE_counter(server, rpcs_timed_out_early_in_queue, |
100 | | "RPC Queue Timeouts", |
101 | | yb::MetricUnit::kRequests, |
102 | | "Number of RPCs whose timeout elapsed while waiting " |
103 | | "in the service queue, and thus were not processed. " |
104 | | "Timeout for those calls were detected before the calls tried to execute."); |
105 | | |
106 | | METRIC_DEFINE_counter(server, rpcs_queue_overflow, |
107 | | "RPC Queue Overflows", |
108 | | yb::MetricUnit::kRequests, |
109 | | "Number of RPCs dropped because the service queue " |
110 | | "was full."); |
111 | | |
112 | | namespace yb { |
113 | | namespace rpc { |
114 | | |
115 | | namespace { |
116 | | |
117 | | const CoarseDuration kTimeoutCheckGranularity = 100ms; |
118 | | const char* const kTimedOutInQueue = "Call waited in the queue past deadline"; |
119 | | |
120 | | } // namespace |
121 | | |
122 | | class ServicePoolImpl final : public InboundCallHandler { |
123 | | public: |
124 | | ServicePoolImpl(size_t max_tasks, |
125 | | ThreadPool* thread_pool, |
126 | | Scheduler* scheduler, |
127 | | ServiceIfPtr service, |
128 | | const scoped_refptr<MetricEntity>& entity) |
129 | | : max_queued_calls_(max_tasks), |
130 | | thread_pool_(*thread_pool), |
131 | | scheduler_(*scheduler), |
132 | | service_(std::move(service)), |
133 | | incoming_queue_time_(METRIC_rpc_incoming_queue_time.Instantiate(entity)), |
134 | | rpcs_timed_out_in_queue_(METRIC_rpcs_timed_out_in_queue.Instantiate(entity)), |
135 | | rpcs_timed_out_early_in_queue_( |
136 | | METRIC_rpcs_timed_out_early_in_queue.Instantiate(entity)), |
137 | | rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)), |
138 | | check_timeout_strand_(scheduler->io_service()), |
139 | 209k | log_prefix_(Format("$0: ", service_->service_name())) { |
140 | | |
141 | | // Create per service counter for rpcs_in_queue_. |
142 | 209k | auto id = Format("rpcs_in_queue_$0", service_->service_name()); |
143 | 209k | EscapeMetricNameForPrometheus(&id); |
144 | 209k | string description = id + " metric for ServicePoolImpl"; |
145 | 209k | rpcs_in_queue_ = entity->FindOrCreateGauge( |
146 | 209k | std::unique_ptr<GaugePrototype<int64_t>>(new OwningGaugePrototype<int64_t>( |
147 | 209k | entity->prototype().name(), std::move(id), |
148 | 209k | description, MetricUnit::kRequests, description, MetricLevel::kInfo)), |
149 | 209k | static_cast<int64>(0) /* initial_value */); |
150 | | |
151 | 209k | LOG_WITH_PREFIX(INFO) << "yb::rpc::ServicePoolImpl created at " << this; |
152 | 209k | } |
153 | | |
154 | 2.23k | ~ServicePoolImpl() { |
155 | 2.23k | StartShutdown(); |
156 | 2.23k | CompleteShutdown(); |
157 | 2.23k | } |
158 | | |
159 | 4.56k | void CompleteShutdown() { |
160 | 4.56k | shutdown_complete_latch_.Wait(); |
161 | 4.64k | while (scheduled_tasks_.load(std::memory_order_acquire) != 0) { |
162 | 74 | std::this_thread::sleep_for(10ms); |
163 | 74 | } |
164 | 4.56k | } |
165 | | |
166 | 4.62k | void StartShutdown() { |
167 | 4.62k | bool closing_state = false; |
168 | 4.62k | if (closing_.compare_exchange_strong(closing_state, true)) { |
169 | 2.30k | service_->Shutdown(); |
170 | | |
171 | 2.30k | auto check_timeout_task = check_timeout_task_.load(std::memory_order_acquire); |
172 | 2.30k | if (check_timeout_task != kUninitializedScheduledTaskId) { |
173 | 352 | scheduler_.Abort(check_timeout_task); |
174 | 352 | } |
175 | | |
176 | 2.30k | check_timeout_strand_.dispatch([this] { |
177 | 2.29k | std::weak_ptr<InboundCall> inbound_call_wrapper; |
178 | 43.4k | while (pre_check_timeout_queue_.pop(inbound_call_wrapper)) {}41.1k |
179 | 2.29k | shutdown_complete_latch_.CountDown(); |
180 | 2.29k | }); |
181 | 2.30k | } |
182 | 4.62k | } |
183 | | |
184 | 90.4M | void Enqueue(const InboundCallPtr& call) { |
185 | 90.4M | TRACE_TO(call->trace(), "Inserting onto call queue"); |
186 | | |
187 | 90.4M | auto task = call->BindTask(this); |
188 | 90.4M | if (!task) { |
189 | 1.26k | Overflow(call, "service", queued_calls_.load(std::memory_order_relaxed)); |
190 | 1.26k | return; |
191 | 1.26k | } |
192 | | |
193 | 90.4M | auto call_deadline = call->GetClientDeadline(); |
194 | 90.4M | if (call_deadline != CoarseTimePoint::max()) { |
195 | 90.2M | pre_check_timeout_queue_.push(call); |
196 | 90.2M | ScheduleCheckTimeout(call_deadline); |
197 | 90.2M | } |
198 | | |
199 | 90.4M | thread_pool_.Enqueue(task); |
200 | 90.4M | } |
201 | | |
202 | 1 | const Counter* RpcsTimedOutInQueueMetricForTests() const { |
203 | 1 | return rpcs_timed_out_early_in_queue_.get(); |
204 | 1 | } |
205 | | |
206 | 0 | const Counter* RpcsQueueOverflowMetric() const { |
207 | 0 | return rpcs_queue_overflow_.get(); |
208 | 0 | } |
209 | | |
210 | 0 | std::string service_name() const { |
211 | 0 | return service_->service_name(); |
212 | 0 | } |
213 | | |
214 | 0 | ServiceIfPtr TEST_get_service() const { |
215 | 0 | return service_; |
216 | 0 | } |
217 | | |
218 | 1.26k | void Overflow(const InboundCallPtr& call, const char* type, size_t limit) { |
219 | 1.26k | const auto err_msg = |
220 | 1.26k | Format("$0 request on $1 from $2 dropped due to backpressure. " |
221 | 1.26k | "The $3 queue is full, it has $4 items.", |
222 | 1.26k | call->method_name().ToBuffer(), |
223 | 1.26k | service_->service_name(), |
224 | 1.26k | call->remote_address(), |
225 | 1.26k | type, |
226 | 1.26k | limit); |
227 | 1.26k | YB_LOG_EVERY_N_SECS(WARNING, 3) << LogPrefix() << err_msg54 ; |
228 | 1.26k | const auto response_status = STATUS(ServiceUnavailable, err_msg); |
229 | 1.26k | rpcs_queue_overflow_->Increment(); |
230 | 1.26k | call->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, response_status); |
231 | 1.26k | last_backpressure_at_.store( |
232 | 1.26k | CoarseMonoClock::Now().time_since_epoch(), std::memory_order_release); |
233 | 1.26k | } |
234 | | |
235 | 16 | void Failure(const InboundCallPtr& call, const Status& status) override { |
236 | 16 | if (!call->TryStartProcessing()) { |
237 | 3 | return; |
238 | 3 | } |
239 | | |
240 | 13 | if (status.IsServiceUnavailable()) { |
241 | 0 | Overflow(call, "global", thread_pool_.options().queue_limit); |
242 | 0 | return; |
243 | 0 | } |
244 | 13 | YB_LOG_EVERY_N_SECS(WARNING, 1) |
245 | 4 | << LogPrefix() |
246 | 4 | << call->method_name() << " request on " << service_->service_name() << " from " |
247 | 4 | << call->remote_address() << " dropped because of: " << status.ToString(); |
248 | 13 | const auto response_status = STATUS(ServiceUnavailable, "Service is shutting down"); |
249 | 13 | call->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, response_status); |
250 | 13 | } |
251 | | |
252 | 208k | void FillEndpoints(const RpcServicePtr& service, RpcEndpointMap* map) { |
253 | 208k | service_->FillEndpoints(service, map); |
254 | 208k | } |
255 | | |
256 | 90.5M | void Handle(InboundCallPtr incoming) override { |
257 | 90.5M | incoming->RecordHandlingStarted(incoming_queue_time_); |
258 | 90.5M | ADOPT_TRACE(incoming->trace()); |
259 | | |
260 | 90.5M | const char* error_message; |
261 | 90.5M | if (PREDICT_FALSE(incoming->ClientTimedOut())) { |
262 | 3.89k | error_message = kTimedOutInQueue; |
263 | 90.5M | } else if (PREDICT_FALSE(ShouldDropRequestDuringHighLoad(incoming))) { |
264 | 0 | error_message = "The server is overloaded. Call waited in the queue past max_time_in_queue."; |
265 | 90.5M | } else { |
266 | 90.5M | TRACE_TO(incoming->trace(), "Handling call $0", yb::ToString(incoming->method_name())); |
267 | | |
268 | 90.5M | if (incoming->TryStartProcessing()) { |
269 | 90.5M | service_->Handle(std::move(incoming)); |
270 | 90.5M | } |
271 | 90.5M | return; |
272 | 90.5M | } |
273 | | |
274 | 3.89k | TRACE_TO(incoming->trace(), error_message); |
275 | 3.89k | VLOG_WITH_PREFIX1 (4) |
276 | 1 | << "Timing out call " << incoming->ToString() << " due to: " << error_message; |
277 | | |
278 | | // Respond as a failure, even though the client will probably ignore |
279 | | // the response anyway. |
280 | 3.89k | TimedOut(incoming.get(), error_message, rpcs_timed_out_in_queue_.get()); |
281 | 3.89k | } |
282 | | |
283 | | private: |
284 | 19.5k | void TimedOut(InboundCall* call, const char* error_message, Counter* metric) { |
285 | 19.5k | if (call->RespondTimedOutIfPending(error_message)) { |
286 | 3.89k | metric->Increment(); |
287 | 3.89k | } |
288 | 19.5k | } |
289 | | |
290 | 90.5M | bool ShouldDropRequestDuringHighLoad(const InboundCallPtr& incoming) { |
291 | 90.5M | CoarseTimePoint last_backpressure_at(last_backpressure_at_.load(std::memory_order_acquire)); |
292 | | |
293 | | // For testing purposes. |
294 | 90.5M | if (GetAtomicFlag(&FLAGS_TEST_enable_backpressure_mode_for_testing)) { |
295 | 0 | last_backpressure_at = CoarseMonoClock::Now(); |
296 | 0 | } |
297 | | |
298 | | // Test for a sentinel value, to avoid reading the clock. |
299 | 90.5M | if (last_backpressure_at == CoarseTimePoint()) { |
300 | 89.2M | return false; |
301 | 89.2M | } |
302 | | |
303 | 1.29M | auto now = CoarseMonoClock::Now(); |
304 | 1.29M | if (now > last_backpressure_at + FLAGS_backpressure_recovery_period_ms * 1ms) { |
305 | 0 | last_backpressure_at_.store(CoarseTimePoint().time_since_epoch(), std::memory_order_release); |
306 | 0 | return false; |
307 | 0 | } |
308 | | |
309 | 1.29M | return incoming->GetTimeInQueue().ToMilliseconds() > FLAGS_max_time_in_queue_ms; |
310 | 1.29M | } |
311 | | |
312 | 1.59M | void CheckTimeout(ScheduledTaskId task_id, CoarseTimePoint time, const Status& status) { |
313 | 1.59M | auto se = ScopeExit([this, task_id, time] { |
314 | 1.59M | auto expected_duration = time.time_since_epoch(); |
315 | 1.59M | next_check_timeout_.compare_exchange_strong( |
316 | 1.59M | expected_duration, CoarseTimePoint::max().time_since_epoch(), |
317 | 1.59M | std::memory_order_acq_rel); |
318 | 1.59M | auto expected_task_id = task_id; |
319 | 1.59M | check_timeout_task_.compare_exchange_strong( |
320 | 1.59M | expected_task_id, kUninitializedScheduledTaskId, std::memory_order_acq_rel); |
321 | 1.59M | scheduled_tasks_.fetch_sub(1, std::memory_order_acq_rel); |
322 | 1.59M | }); |
323 | 1.59M | if (!status.ok()) { |
324 | 403k | return; |
325 | 403k | } |
326 | | |
327 | 1.18M | auto now = CoarseMonoClock::now(); |
328 | 1.18M | { |
329 | 1.18M | std::weak_ptr<InboundCall> weak_inbound_call; |
330 | 79.4M | while (pre_check_timeout_queue_.pop(weak_inbound_call)) { |
331 | 78.2M | auto inbound_call = weak_inbound_call.lock(); |
332 | 78.2M | if (!inbound_call) { |
333 | 78.1M | continue; |
334 | 78.1M | } |
335 | 84.6k | if (now > inbound_call->GetClientDeadline()) { |
336 | 10.8k | TimedOut(inbound_call.get(), kTimedOutInQueue, rpcs_timed_out_early_in_queue_.get()); |
337 | 73.8k | } else { |
338 | 73.8k | check_timeout_queue_.emplace(inbound_call); |
339 | 73.8k | } |
340 | 84.6k | } |
341 | 1.18M | } |
342 | | |
343 | 1.24M | while (!check_timeout_queue_.empty() && now > check_timeout_queue_.top().time91.3k ) { |
344 | 52.6k | auto call = check_timeout_queue_.top().call.lock(); |
345 | 52.6k | if (call) { |
346 | 4.82k | TimedOut(call.get(), kTimedOutInQueue, rpcs_timed_out_early_in_queue_.get()); |
347 | 4.82k | } |
348 | 52.6k | check_timeout_queue_.pop(); |
349 | 52.6k | } |
350 | | |
351 | 1.18M | if (!check_timeout_queue_.empty()) { |
352 | 38.7k | ScheduleCheckTimeout(check_timeout_queue_.top().time); |
353 | 38.7k | } |
354 | 1.18M | } |
355 | | |
356 | 90.3M | void ScheduleCheckTimeout(CoarseTimePoint time) { |
357 | 90.3M | if (closing_.load(std::memory_order_acquire)) { |
358 | 0 | return; |
359 | 0 | } |
360 | 90.3M | CoarseDuration next_check_timeout = next_check_timeout_.load(std::memory_order_acquire); |
361 | 90.3M | time += kTimeoutCheckGranularity; |
362 | 90.3M | while (CoarseTimePoint(next_check_timeout) > time) { |
363 | 1.63M | if (next_check_timeout_.compare_exchange_weak( |
364 | 1.63M | next_check_timeout, time.time_since_epoch(), std::memory_order_acq_rel)) { |
365 | 1.63M | check_timeout_strand_.dispatch([this, time] { |
366 | 1.63M | auto check_timeout_task = check_timeout_task_.load(std::memory_order_acquire); |
367 | 1.63M | if (check_timeout_task != kUninitializedScheduledTaskId) { |
368 | 403k | scheduler_.Abort(check_timeout_task); |
369 | 403k | } |
370 | 1.63M | scheduled_tasks_.fetch_add(1, std::memory_order_acq_rel); |
371 | 1.63M | auto task_id = scheduler_.Schedule( |
372 | 1.63M | [this, time](ScheduledTaskId task_id, const Status& status) { |
373 | 1.59M | check_timeout_strand_.dispatch([this, time, task_id, status] { |
374 | 1.59M | CheckTimeout(task_id, time, status); |
375 | 1.59M | }); |
376 | 1.59M | }, |
377 | 1.63M | ToSteady(time)); |
378 | 1.63M | check_timeout_task_.store(task_id, std::memory_order_release); |
379 | 1.63M | }); |
380 | 1.63M | break; |
381 | 1.63M | } |
382 | 1.63M | } |
383 | 90.3M | } |
384 | | |
385 | 209k | const std::string& LogPrefix() const { |
386 | 209k | return log_prefix_; |
387 | 209k | } |
388 | | |
389 | 90.5M | bool CallQueued() override { |
390 | 90.5M | auto queued_calls = queued_calls_.fetch_add(1, std::memory_order_acq_rel); |
391 | 90.5M | if (queued_calls < 0) { |
392 | 0 | YB_LOG_EVERY_N_SECS(DFATAL, 5) << "Negative number of queued calls: " << queued_calls; |
393 | 0 | } |
394 | | |
395 | 90.5M | if (implicit_cast<size_t>(queued_calls) >= max_queued_calls_) { |
396 | 1.25k | queued_calls_.fetch_sub(1, std::memory_order_relaxed); |
397 | 1.25k | return false; |
398 | 1.25k | } |
399 | | |
400 | 90.5M | rpcs_in_queue_->Increment(); |
401 | 90.5M | return true; |
402 | 90.5M | } |
403 | | |
404 | 90.5M | void CallDequeued() override { |
405 | 90.5M | queued_calls_.fetch_sub(1, std::memory_order_relaxed); |
406 | 90.5M | rpcs_in_queue_->Decrement(); |
407 | 90.5M | } |
408 | | |
409 | | const size_t max_queued_calls_; |
410 | | ThreadPool& thread_pool_; |
411 | | Scheduler& scheduler_; |
412 | | ServiceIfPtr service_; |
413 | | scoped_refptr<Histogram> incoming_queue_time_; |
414 | | scoped_refptr<Counter> rpcs_timed_out_in_queue_; |
415 | | scoped_refptr<Counter> rpcs_timed_out_early_in_queue_; |
416 | | scoped_refptr<Counter> rpcs_queue_overflow_; |
417 | | scoped_refptr<AtomicGauge<int64_t>> rpcs_in_queue_; |
418 | | // Have to use CoarseDuration here, since CoarseTimePoint does not work with clang + libstdc++ |
419 | | std::atomic<CoarseDuration> last_backpressure_at_{CoarseTimePoint().time_since_epoch()}; |
420 | | std::atomic<int64_t> queued_calls_{0}; |
421 | | |
422 | | // It is too expensive to update timeout priority queue when each call is received. |
423 | | // So we are doing the following trick. |
424 | | // All calls are added to pre_check_timeout_queue_, w/o priority. |
425 | | // Then before timeout check we move calls from this queue to priority queue. |
426 | | typedef cds::container::BasketQueue<cds::gc::DHP, std::weak_ptr<InboundCall>> |
427 | | PreCheckTimeoutQueue; |
428 | | PreCheckTimeoutQueue pre_check_timeout_queue_; |
429 | | |
430 | | // Used to track scheduled time, to avoid unnecessary rescheduling. |
431 | | std::atomic<CoarseDuration> next_check_timeout_{CoarseTimePoint::max().time_since_epoch()}; |
432 | | |
433 | | // Last scheduled task, required to abort scheduled task during reschedule. |
434 | | std::atomic<ScheduledTaskId> check_timeout_task_{kUninitializedScheduledTaskId}; |
435 | | |
436 | | std::atomic<int> scheduled_tasks_{0}; |
437 | | |
438 | | // Timeout checking synchronization. |
439 | | IoService::strand check_timeout_strand_; |
440 | | |
441 | | struct QueuedCheckDeadline { |
442 | | CoarseTimePoint time; |
443 | | // We use weak pointer to avoid retaining call that was already processed. |
444 | | std::weak_ptr<InboundCall> call; |
445 | | |
446 | | explicit QueuedCheckDeadline(const InboundCallPtr& inp) |
447 | 84.6k | : time(inp->GetClientDeadline()), call(inp) { |
448 | 84.6k | } |
449 | | }; |
450 | | |
451 | | // Priority queue puts the geatest value on top, so we invert comparison. |
452 | 578k | friend bool operator<(const QueuedCheckDeadline& lhs, const QueuedCheckDeadline& rhs) { |
453 | 578k | return lhs.time > rhs.time; |
454 | 578k | } |
455 | | |
456 | | std::priority_queue<QueuedCheckDeadline> check_timeout_queue_; |
457 | | |
458 | | std::atomic<bool> closing_ = {false}; |
459 | | CountDownLatch shutdown_complete_latch_{1}; |
460 | | std::string log_prefix_; |
461 | | }; |
462 | | |
463 | | ServicePool::ServicePool(size_t max_tasks, |
464 | | ThreadPool* thread_pool, |
465 | | Scheduler* scheduler, |
466 | | ServiceIfPtr service, |
467 | | const scoped_refptr<MetricEntity>& metric_entity) |
468 | | : impl_(new ServicePoolImpl( |
469 | 209k | max_tasks, thread_pool, scheduler, std::move(service), metric_entity)) { |
470 | 209k | } |
471 | | |
472 | 2.23k | ServicePool::~ServicePool() { |
473 | 2.23k | } |
474 | | |
475 | 2.38k | void ServicePool::StartShutdown() { |
476 | 2.38k | impl_->StartShutdown(); |
477 | 2.38k | } |
478 | | |
479 | 2.33k | void ServicePool::CompleteShutdown() { |
480 | 2.33k | impl_->CompleteShutdown(); |
481 | 2.33k | } |
482 | | |
483 | 90.5M | void ServicePool::QueueInboundCall(InboundCallPtr call) { |
484 | 90.5M | impl_->Enqueue(std::move(call)); |
485 | 90.5M | } |
486 | | |
487 | 0 | void ServicePool::Handle(InboundCallPtr call) { |
488 | 0 | impl_->Handle(std::move(call)); |
489 | 0 | } |
490 | | |
491 | 208k | void ServicePool::FillEndpoints(RpcEndpointMap* map) { |
492 | 208k | impl_->FillEndpoints(RpcServicePtr(this), map); |
493 | 208k | } |
494 | | |
495 | 1 | const Counter* ServicePool::RpcsTimedOutInQueueMetricForTests() const { |
496 | 1 | return impl_->RpcsTimedOutInQueueMetricForTests(); |
497 | 1 | } |
498 | | |
499 | 0 | const Counter* ServicePool::RpcsQueueOverflowMetric() const { |
500 | 0 | return impl_->RpcsQueueOverflowMetric(); |
501 | 0 | } |
502 | | |
503 | 0 | std::string ServicePool::service_name() const { |
504 | 0 | return impl_->service_name(); |
505 | 0 | } |
506 | | |
507 | 0 | ServiceIfPtr ServicePool::TEST_get_service() const { |
508 | 0 | return impl_->TEST_get_service(); |
509 | 0 | } |
510 | | |
511 | | } // namespace rpc |
512 | | } // namespace yb |